The Complete Guide to Supabase Webhooks: Building a reliable Payment & Auth Context with RBAC PART-2
- Setting Up Webhook Database Triggers
- Docker Deployment and Environment Setup
- Advanced Error Handling and Retry Logic
- Real-time Client Updates
- Common Webhook Errors and Solutions
- Performance Optimization and Monitoring
Setting Up Webhook Database Triggers
Database triggers are the foundation of Supabase webhooks. They define exactly when and how your webhooks fire, what data they include, and how they handle errors. Creating effective triggers requires understanding both SQL and your application's business logic. Well-designed triggers can significantly improve your application's performance by reducing the need for polling and ensuring data consistency.
Our trigger system will handle various scenarios including cascading updates, conditional firing based on specific field changes, and complex business rules that involve multiple tables. We'll also implement triggers that can handle batch operations efficiently, which is mandatory for applications that process large amounts of data.
-- Database setup for webhook triggers -- Create the webhook notification function CREATE OR REPLACE FUNCTION notify_webhook() RETURNS trigger LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE payload json; webhook_url text; BEGIN -- Get the webhook URL from configuration SELECT value INTO webhook_url FROM app_config WHERE key = 'webhook_url'; -- Build the payload payload = json_build_object( 'type', TG_OP, 'table', TG_TABLE_NAME, 'schema', TG_TABLE_SCHEMA, 'record', CASE WHEN TG_OP = 'DELETE' THEN NULL ELSE row_to_json(NEW) END, 'old_record', CASE WHEN TG_OP IN ('UPDATE', 'DELETE') THEN row_to_json(OLD) ELSE NULL END, 'created_at', now(), 'user_id', COALESCE( CASE WHEN TG_OP = 'DELETE' THEN OLD.user_id ELSE NEW.user_id END, auth.uid() ) ); -- Send the webhook notification PERFORM pg_notify('webhook_event', payload::text); RETURN COALESCE(NEW, OLD); END; $$; -- Create triggers for payment events CREATE TRIGGER payments_webhook_trigger AFTER INSERT OR UPDATE OR DELETE ON payments FOR EACH ROW EXECUTE FUNCTION notify_webhook(); -- Create triggers for subscription events CREATE TRIGGER subscriptions_webhook_trigger AFTER INSERT OR UPDATE OR DELETE ON subscriptions FOR EACH ROW EXECUTE FUNCTION notify_webhook(); -- Create triggers for user role changes CREATE TRIGGER user_roles_webhook_trigger AFTER INSERT OR UPDATE OR DELETE ON user_roles FOR EACH ROW EXECUTE FUNCTION notify_webhook(); -- Create a more sophisticated trigger for user profile changes CREATE OR REPLACE FUNCTION notify_profile_webhook() RETURNS trigger LANGUAGE plpgsql SECURITY DEFINER AS $ DECLARE payload json; changed_fields text[]; BEGIN -- Only fire webhook if specific fields changed IF TG_OP = 'UPDATE' THEN changed_fields = ARRAY[]::text[]; IF OLD.email != NEW.email THEN changed_fields = array_append(changed_fields, 'email'); END IF; IF OLD.subscription_status != NEW.subscription_status THEN changed_fields = array_append(changed_fields, 'subscription_status'); END IF; IF OLD.role_level != NEW.role_level THEN changed_fields = array_append(changed_fields, 'role_level'); END IF; -- Only proceed if important fields changed IF array_length(changed_fields, 1) IS NULL THEN RETURN NEW; END IF; END IF; -- Build enhanced payload with change information payload = json_build_object( 'type', TG_OP, 'table', TG_TABLE_NAME, 'schema', TG_TABLE_SCHEMA, 'record', CASE WHEN TG_OP = 'DELETE' THEN NULL ELSE row_to_json(NEW) END, 'old_record', CASE WHEN TG_OP IN ('UPDATE', 'DELETE') THEN row_to_json(OLD) ELSE NULL END, 'changed_fields', changed_fields, 'created_at', now(), 'user_id', COALESCE( CASE WHEN TG_OP = 'DELETE' THEN OLD.id ELSE NEW.id END, auth.uid() ) ); PERFORM pg_notify('webhook_event', payload::text); RETURN COALESCE(NEW, OLD); END; $; -- Apply the enhanced trigger to user profiles CREATE TRIGGER user_profiles_webhook_trigger AFTER INSERT OR UPDATE OR DELETE ON user_profiles FOR EACH ROW EXECUTE FUNCTION notify_profile_webhook();
Docker Deployment and Environment Setup
Deploying webhook endpoints requires careful consideration of scalability, reliability, and security. Docker containers provide an excellent foundation for webhook services because they ensure consistent environments across development, staging, and production. Our Docker setup will include proper environment variable management, health checks, logging configuration, and security best practices.
The containerized approach allows us to easily scale our webhook processing capacity by spinning up multiple instances behind a load balancer. We'll also implement proper graceful shutdown handling to ensure that webhook processing completes before containers are terminated during deployments. This prevents data loss and ensures that all webhook events are processed reliably.
# Dockerfile FROM node:18-alpine AS base # Install dependencies only when needed FROM base AS deps RUN apk add --no-cache libc6-compat WORKDIR /app # Copy package files COPY package.json package-lock.json* ./ RUN npm ci --only=production # Rebuild the source code only when needed FROM base AS builder WORKDIR /app COPY /app/node_modules ./node_modules COPY . . # Build the application ENV NEXT_TELEMETRY_DISABLED 1 RUN npm run build # Production image FROM base AS runner WORKDIR /app ENV NODE_ENV production ENV NEXT_TELEMETRY_DISABLED 1 # Create nextjs user RUN addgroup --system --gid 1001 nodejs RUN adduser --system --uid 1001 nextjs # Copy built application COPY /app/public ./public COPY /app/.next/standalone ./ COPY /app/.next/static ./.next/static # Health check endpoint COPY /app/healthcheck.js ./ USER nextjs EXPOSE 3000 ENV PORT 3000 ENV HOSTNAME "0.0.0.0" # Health check HEALTHCHECK \ CMD node healthcheck.js CMD ["node", "server.js"]
# docker-compose.yml version: '3.8' services: webhook-app: build: context: . dockerfile: Dockerfile ports: - "3000:3000" environment: - NODE_ENV=production - NEXT_PUBLIC_SUPABASE_URL=${NEXT_PUBLIC_SUPABASE_URL} - SUPABASE_SERVICE_KEY=${SUPABASE_SERVICE_KEY} - SUPABASE_WEBHOOK_SECRET=${SUPABASE_WEBHOOK_SECRET} - WEBHOOK_RETRY_ATTEMPTS=3 - WEBHOOK_TIMEOUT=30000 restart: unless-stopped healthcheck: test: ["CMD", "node", "healthcheck.js"] interval: 30s timeout: 10s retries: 3 start_period: 40s logging: driver: "json-file" options: max-size: "10m" max-file: "3" networks: - webhook-network redis: image: redis:7-alpine restart: unless-stopped command: redis-server --appendonly yes volumes: - redis-data:/data networks: - webhook-network healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 30s timeout: 3s retries: 3 nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./ssl:/etc/nginx/ssl depends_on: - webhook-app restart: unless-stopped networks: - webhook-network volumes: redis-data: networks: webhook-network: driver: bridge
Advanced Error Handling and Retry Logic
reliable webhook systems must handle failures gracefully because network issues, temporary service outages, and processing errors are inevitable in distributed systems. Our error handling strategy implements exponential backoff, dead letter queues, circuit breakers, and comprehensive logging to ensure maximum reliability while preventing system overload during failure scenarios.
The retry mechanism intelligently categorizes errors into retriable and non-retriable types. Temporary network errors or service unavailability trigger retries with increasing delays, while authentication failures or malformed payloads are immediately sent to dead letter queues for manual investigation. This approach prevents wasted resources while ensuring legitimate webhook events are eventually processed.
// lib/error-handling.ts import { createServerClient } from '@supabase/ssr'; export interface WebhookError { id: string; webhook_payload: any; error_message: string; error_stack?: string; attempt_count: number; max_attempts: number; next_retry_at?: string; status: 'pending' | 'retrying' | 'failed' | 'resolved'; created_at: string; updated_at: string; } export class WebhookErrorHandler { private supabase; private maxRetries: number; private baseDelay: number; constructor(supabaseUrl: string, serviceKey: string) { this.supabase = createServerClient(supabaseUrl, serviceKey, { cookies: { get: () => '', set: () => {}, remove: () => {} }, }); this.maxRetries = parseInt(process.env.WEBHOOK_RETRY_ATTEMPTS || '3'); this.baseDelay = parseInt(process.env.WEBHOOK_BASE_DELAY || '1000'); } async handleError(payload: any, error: Error, attemptCount = 1): Promise<void> { const errorRecord: Partial<WebhookError> = { webhook_payload: payload, error_message: error.message, error_stack: error.stack, attempt_count: attemptCount, max_attempts: this.maxRetries, status: attemptCount >= this.maxRetries ? 'failed' : 'pending', created_at: new Date().toISOString(), updated_at: new Date().toISOString(), }; // Calculate next retry time with exponential backoff if (attemptCount < this.maxRetries) { const delay = this.calculateBackoffDelay(attemptCount); errorRecord.next_retry_at = new Date(Date.now() + delay).toISOString(); } // Store the error for tracking and potential retry const { data: storedError } = await this.supabase .from('webhook_errors') .insert(errorRecord) .select() .single(); // Log structured error information console.error('Webhook processing failed:', { errorId: storedError?.id, payload: this.sanitizePayload(payload), error: error.message, attemptCount, nextRetry: errorRecord.next_retry_at, }); // Send alerts for critical failures if (this.isCriticalError(error) || attemptCount >= this.maxRetries) { await this.sendErrorAlert(payload, error, attemptCount); } } private calculateBackoffDelay(attemptCount: number): number { // Exponential backoff with jitter const exponentialDelay = this.baseDelay * Math.pow(2, attemptCount - 1); const jitter = Math.random() * 0.1 * exponentialDelay; return Math.min(exponentialDelay + jitter, 300000); // Max 5 minutes } private isCriticalError(error: Error): boolean { const criticalPatterns = [ 'authentication failed', 'invalid signature', 'database connection', 'payment processing', 'user data corruption', ]; return criticalPatterns.some(pattern => error.message.toLowerCase().includes(pattern) ); } private sanitizePayload(payload: any): any { // Remove sensitive information from logs const sanitized = { ...payload }; const sensitiveFields = ['password', 'token', 'secret', 'key', 'credit_card']; function recursiveSanitize(obj: any): any { if (typeof obj !== 'object' || obj === null) return obj; const result: any = Array.isArray(obj) ? [] : {}; for (const [key, value] of Object.entries(obj)) { if (sensitiveFields.some(field => key.toLowerCase().includes(field))) { result[key] = '[REDACTED]'; } else if (typeof value === 'object' && value !== null) { result[key] = recursiveSanitize(value); } else { result[key] = value; } } return result; } return recursiveSanitize(sanitized); } async retryFailedWebhooks(): Promise<void> { const { data: failedWebhooks } = await this.supabase .from('webhook_errors') .select('*') .eq('status', 'pending') .lte('next_retry_at', new Date().toISOString()) .limit(10); if (!failedWebhooks?.length) return; for (const failedWebhook of failedWebhooks) { try { // Mark as retrying await this.supabase .from('webhook_errors') .update({ status: 'retrying', updated_at: new Date().toISOString(), }) .eq('id', failedWebhook.id); // Attempt to reprocess the webhook const { handleWebhookEvent } = await import('./webhook-dispatcher'); await handleWebhookEvent(failedWebhook.webhook_payload); // Mark as resolved await this.supabase .from('webhook_errors') .update({ status: 'resolved', updated_at: new Date().toISOString(), }) .eq('id', failedWebhook.id); console.log(`Webhook retry successful: ${failedWebhook.id}`); } catch (error) { // Handle retry failure const newAttemptCount = failedWebhook.attempt_count + 1; await this.handleError( failedWebhook.webhook_payload, error as Error, newAttemptCount ); } } } } // Retry job that can be run periodically export async function processRetryQueue() { const errorHandler = new WebhookErrorHandler( process.env.NEXT_PUBLIC_SUPABASE_URL!, process.env.SUPABASE_SERVICE_KEY! ); await errorHandler.retryFailedWebhooks(); }
Real-time Client Updates
The true power of webhooks shines when combined with real-time client updates. While webhooks handle server-side processing, your frontend needs to reflect these changes immediately to provide users with responsive, live experiences. Our real-time system uses Supabase's built-in real-time subscriptions alongside webhook-triggered updates to create a seamless bidirectional communication system.
This approach ensures that when a webhook processes a payment, updates user permissions, or modifies subscription status, all connected clients receive instant updates. The system handles connection management, automatic reconnection, and efficient data synchronization to minimize bandwidth usage while maximizing responsiveness.
// hooks/useRealtimeWebhooks.ts import { useEffect, useCallback, useRef } from 'react'; import { createClient } from '@supabase/supabase-js'; import { useAuth } from '@/contexts/AuthContext'; interface RealtimeWebhookConfig { table: string; filter?: string; onInsert?: (payload: any) => void; onUpdate?: (payload: any) => void; onDelete?: (payload: any) => void; } export function useRealtimeWebhooks(configs: RealtimeWebhookConfig[]) { const { user } = useAuth(); const subscriptionsRef = useRef<any[]>([]); const supabase = createClient( process.env.NEXT_PUBLIC_SUPABASE_URL!, process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY! ); const setupSubscriptions = useCallback(() => { if (!user) return; // Clean up existing subscriptions subscriptionsRef.current.forEach(sub => { supabase.removeChannel(sub); }); subscriptionsRef.current = []; // Setup new subscriptions configs.forEach(config => { const channel = supabase .channel(`realtime:${config.table}:${user.id}`) .on( 'postgres_changes', { event: '*', schema: 'public', table: config.table, filter: config.filter || `user_id=eq.${user.id}`, }, (payload) => { console.log(`Realtime update for ${config.table}:`, payload); switch (payload.eventType) { case 'INSERT': config.onInsert?.(payload); break; case 'UPDATE': config.onUpdate?.(payload); break; case 'DELETE': config.onDelete?.(payload); break; } } ) .subscribe((status) => { console.log(`Subscription status for ${config.table}:`, status); }); subscriptionsRef.current.push(channel); }); }, [user, configs, supabase]); useEffect(() => { setupSubscriptions(); return () => { subscriptionsRef.current.forEach(sub => { supabase.removeChannel(sub); }); }; }, [setupSubscriptions]); } // contexts/WebhookContext.tsx 'use client'; import { createContext, useContext, useReducer, useEffect, ReactNode } from 'react'; import { useRealtimeWebhooks } from '@/hooks/useRealtimeWebhooks'; import { useAuth } from './AuthContext'; interface WebhookState { payments: any[]; subscriptions: any[]; userRoles: any[]; notifications: any[]; isLoading: boolean; } interface WebhookContextType extends WebhookState { updatePayment: (payment: any) => void; updateSubscription: (subscription: any) => void; updateUserRole: (role: any) => void; addNotification: (notification: any) => void; clearNotifications: () => void; } const WebhookContext = createContext<WebhookContextType | undefined>(undefined); const webhookReducer = (state: WebhookState, action: any): WebhookState => { switch (action.type) { case 'SET_LOADING': return { ...state, isLoading: action.payload }; case 'UPDATE_PAYMENT': return { ...state, payments: state.payments.map(p => p.id === action.payload.id ? { ...p, ...action.payload } : p ), }; case 'ADD_PAYMENT': return { ...state, payments: [...state.payments, action.payload], }; case 'UPDATE_SUBSCRIPTION': return { ...state, subscriptions: state.subscriptions.map(s => s.id === action.payload.id ? { ...s, ...action.payload } : s ), }; case 'UPDATE_USER_ROLE': return { ...state, userRoles: state.userRoles.map(r => r.id === action.payload.id ? { ...r, ...action.payload } : r ), }; case 'ADD_NOTIFICATION': return { ...state, notifications: [action.payload, ...state.notifications.slice(0, 9)], // Keep last 10 }; case 'CLEAR_NOTIFICATIONS': return { ...state, notifications: [], }; default: return state; } }; const initialState: WebhookState = { payments: [], subscriptions: [], userRoles: [], notifications: [], isLoading: false, }; export function WebhookProvider({ children }: { children: ReactNode }) { const [state, dispatch] = useReducer(webhookReducer, initialState); const { user } = useAuth(); // Setup realtime subscriptions useRealtimeWebhooks([ { table: 'payments', onInsert: (payload) => { dispatch({ type: 'ADD_PAYMENT', payload: payload.new }); dispatch({ type: 'ADD_NOTIFICATION', payload: { id: Date.now(), type: 'payment', message: `Payment ${payload.new.status}`, timestamp: new Date().toISOString(), } }); }, onUpdate: (payload) => { dispatch({ type: 'UPDATE_PAYMENT', payload: payload.new }); if (payload.old.status !== payload.new.status) { dispatch({ type: 'ADD_NOTIFICATION', payload: { id: Date.now(), type: 'payment', message: `Payment status changed to ${payload.new.status}`, timestamp: new Date().toISOString(), } }); } }, }, { table: 'subscriptions', onUpdate: (payload) => { dispatch({ type: 'UPDATE_SUBSCRIPTION', payload: payload.new }); if (payload.old.status !== payload.new.status) { dispatch({ type: 'ADD_NOTIFICATION', payload: { id: Date.now(), type: 'subscription', message: `Subscription ${payload.new.status}`, timestamp: new Date().toISOString(), } }); } }, }, { table: 'user_roles', onInsert: (payload) => { dispatch({ type: 'ADD_NOTIFICATION', payload: { id: Date.now(), type: 'role', message: 'New role assigned', timestamp: new Date().toISOString(), } }); }, onUpdate: (payload) => { dispatch({ type: 'UPDATE_USER_ROLE', payload: payload.new }); }, }, ]); const contextValue: WebhookContextType = { ...state, updatePayment: (payment) => dispatch({ type: 'UPDATE_PAYMENT', payload: payment }), updateSubscription: (subscription) => dispatch({ type: 'UPDATE_SUBSCRIPTION', payload: subscription }), updateUserRole: (role) => dispatch({ type: 'UPDATE_USER_ROLE', payload: role }), addNotification: (notification) => dispatch({ type: 'ADD_NOTIFICATION', payload: notification }), clearNotifications: () => dispatch({ type: 'CLEAR_NOTIFICATIONS' }), }; return ( <WebhookContext.Provider value={contextValue}> {children} </WebhookContext.Provider> ); } export function useWebhooks() { const context = useContext(WebhookContext); if (context === undefined) { throw new Error('useWebhooks must be used within a WebhookProvider'); } return context; }
Common Webhook Errors and Solutions
Understanding and preparing for common webhook errors can save hours of debugging and prevent system downtime. The most frequent issues include signature verification failures, timeout errors, payload parsing problems, database connection issues, and race conditions. Each category requires specific diagnostic approaches and targeted solutions.
Our comprehensive error handling system categorizes errors automatically and provides specific remediation strategies for each type. This systematic approach reduces mean time to resolution and helps prevent recurring issues through proactive monitoring and alerting.
// lib/webhook-diagnostics.ts export class WebhookDiagnostics { private static commonErrors = { SIGNATURE_VERIFICATION_FAILED: { pattern: /signature.*verification.*failed/i, category: 'security', severity: 'high', solution: 'Check webhook secret configuration and signature calculation', autoResolve: false, }, TIMEOUT_ERROR: { pattern: /timeout|timed out/i, category: 'performance', severity: 'medium', solution: 'Optimize webhook processing logic or increase timeout limits', autoResolve: true, }, JSON_PARSE_ERROR: { pattern: /unexpected token|invalid json|json parse/i, category: 'payload', severity: 'medium', solution: 'Validate webhook payload format and encoding', autoResolve: false, }, DATABASE_CONNECTION_ERROR: { pattern: /connection.*refused|database.*unavailable/i, category: 'infrastructure', severity: 'critical', solution: 'Check database connectivity and connection pool settings', autoResolve: true, }, RATE_LIMIT_EXCEEDED: { pattern: /rate.*limit|too many requests/i, category: 'throttling', severity: 'low', solution: 'Implement exponential backoff and request queuing', autoResolve: true, }, DUPLICATE_PROCESSING: { pattern: /duplicate.*key|already.*processed/i, category: 'idempotency', severity: 'low', solution: 'Implement proper idempotency checks', autoResolve: true, }, }; static diagnoseError(error: Error): { type: string; category: string; severity: string; solution: string; autoResolve: boolean; } { const errorMessage = error.message.toLowerCase(); for (const [type, config] of Object.entries(this.commonErrors)) { if (config.pattern.test(errorMessage)) { return { type, category: config.category, severity: config.severity, solution: config.solution, autoResolve: config.autoResolve, }; } } return { type: 'UNKNOWN_ERROR', category: 'general', severity: 'medium', solution: 'Review error details and webhook logs for specific cause', autoResolve: false, }; } static generateDiagnosticReport(errors: WebhookError[]): { summary: any; recommendations: string[]; criticalIssues: any[]; } { const errorsByType = new Map<string, number>(); const errorsByCategory = new Map<string, number>(); const criticalIssues: any[] = []; errors.forEach(error => { const diagnosis = this.diagnoseError(new Error(error.error_message)); errorsByType.set(diagnosis.type, (errorsByType.get(diagnosis.type) || 0) + 1); errorsByCategory.set(diagnosis.category, (errorsByCategory.get(diagnosis.category) || 0) + 1); if (diagnosis.severity === 'critical' || diagnosis.severity === 'high') { criticalIssues.push({ ...error, diagnosis, }); } }); const recommendations = this.generateRecommendations(errorsByCategory); return { summary: { totalErrors: errors.length, errorsByType: Object.fromEntries(errorsByType), errorsByCategory: Object.fromEntries(errorsByCategory), criticalErrorsCount: criticalIssues.length, }, recommendations, criticalIssues, }; } private static generateRecommendations(errorsByCategory: Map<string, number>): string[] { const recommendations: string[] = []; if (errorsByCategory.get('security') && errorsByCategory.get('security')! > 0) { recommendations.push('Review webhook security configuration and signature verification'); } if (errorsByCategory.get('performance') && errorsByCategory.get('performance')! > 5) { recommendations.push('Consider optimizing webhook processing performance'); } if (errorsByCategory.get('infrastructure') && errorsByCategory.get('infrastructure')! > 0) { recommendations.push('Check system infrastructure and dependencies'); } if (errorsByCategory.get('throttling') && errorsByCategory.get('throttling')! > 3) { recommendations.push('Implement better rate limiting and request queuing'); } return recommendations; } } // Enhanced error handling in main webhook route export async function POST(request: NextRequest) { const startTime = Date.now(); let errorDiagnosis: any = null; try { const payload = await validateAndParseWebhook(request); await processWebhook(payload); console.log('Webhook processed successfully', { processingTime: Date.now() - startTime, payloadType: payload.type, table: payload.table, }); return NextResponse.json({ success: true }); } catch (error) { errorDiagnosis = WebhookDiagnostics.diagnoseError(error as Error); console.error('Webhook processing failed', { error: (error as Error).message, diagnosis: errorDiagnosis, processingTime: Date.now() - startTime, stack: (error as Error).stack, }); if (errorDiagnosis.autoResolve && errorDiagnosis.severity !== 'critical') { await scheduleWebhookRetry(request, error as Error); return NextResponse.json( { error: 'Processing failed, scheduled for retry' }, { status: 202 } ); } const statusCode = errorDiagnosis.severity === 'critical' ? 500 : 400; return NextResponse.json( { error: 'Webhook processing failed', diagnosis: errorDiagnosis.solution, }, { status: statusCode } ); } }
Performance Optimization and Monitoring
High-performance webhook systems require careful optimization at multiple levels - from database queries and network calls to memory usage and processing algorithms. Our optimization strategy focuses on reducing latency, increasing throughput, and maintaining system stability under high load conditions.
Monitoring is equally mandatory for maintaining webhook reliability. Our comprehensive monitoring system tracks processing times, error rates, payload sizes, and system resource usage. This data enables proactive optimization and helps identify performance bottlenecks before they impact users.
// lib/webhook-performance.ts export class WebhookPerformanceMonitor { private static metrics = new Map<string, any>(); private static readonly METRIC_RETENTION_MS = 24 * 60 * 60 * 1000; // 24 hours static startTimer(webhookId: string): string { const timerId = `${webhookId}-${Date.now()}`; this.metrics.set(timerId, { startTime: Date.now(), webhookId, type: 'timer', }); return timerId; } static endTimer(timerId: string, metadata: any = {}) { const timer = this.metrics.get(timerId); if (!timer) return; const processingTime = Date.now() - timer.startTime; // Update metrics this.updateProcessingTimeMetrics(timer.webhookId, processingTime); // Log performance data console.log('Webhook performance:', { webhookId: timer.webhookId, processingTime, ...metadata, }); // Clean up timer this.metrics.delete(timerId); } private static updateProcessingTimeMetrics(webhookId: string, processingTime: number) { const key = `processing_times_${webhookId}`; const existing = this.metrics.get(key) || { count: 0, totalTime: 0, minTime: Infinity, maxTime: 0, recentTimes: [], }; existing.count++; existing.totalTime += processingTime; existing.minTime = Math.min(existing.minTime, processingTime); existing.maxTime = Math.max(existing.maxTime, processingTime); existing.recentTimes.push(processingTime); // Keep only recent times (last 100 calls) if (existing.recentTimes.length > 100) { existing.recentTimes = existing.recentTimes.slice(-100); } this.metrics.set(key, existing); } static getPerformanceReport(): any { const report: any = { summary: { totalWebhooks: 0, averageProcessingTime: 0, totalProcessingTime: 0, peakProcessingTime: 0, minProcessingTime: Infinity, }, webhooks: {}, }; // Aggregate metrics across all webhooks for (const [key, value] of this.metrics.entries()) { if (key.startsWith('processing_times_')) { const webhookId = key.replace('processing_times_', ''); const avgTime = value.totalTime / value.count; const p95Time = this.calculatePercentile(value.recentTimes, 0.95); report.webhooks[webhookId] = { count: value.count, averageTime: avgTime, minTime: value.minTime === Infinity ? 0 : value.minTime, maxTime: value.maxTime, p95Time, totalTime: value.totalTime, }; report.summary.totalWebhooks++; report.summary.totalProcessingTime += value.totalTime; report.summary.peakProcessingTime = Math.max(report.summary.peakProcessingTime, value.maxTime); report.summary.minProcessingTime = Math.min(report.summary.minProcessingTime, value.minTime); } } if (report.summary.totalWebhooks > 0) { report.summary.averageProcessingTime = report.summary.totalProcessingTime / Object.values(report.webhooks).reduce((sum: number, w: any) => sum + w.count, 0); } return report; } private static calculatePercentile(times: number[], percentile: number): number { if (times.length === 0) return 0; const sorted = [...times].sort((a, b) => a - b); const index = Math.ceil(sorted.length * percentile) - 1; return sorted[Math.max(0, index)]; } static recordError(webhookId: string, error: Error, context: any = {}) { const key = `errors_${webhookId}`; const existing = this.metrics.get(key) || { count: 0, recentErrors: [], }; existing.count++; existing.recentErrors.push({ timestamp: Date.now(), message: error.message, stack: error.stack, context, }); // Keep only recent errors (last 50) if (existing.recentErrors.length > 50) { existing.recentErrors = existing.recentErrors.slice(-50); } this.metrics.set(key, existing); } static getErrorReport(): any { const report: any = { summary: { totalErrors: 0, webhooksWithErrors: 0, }, webhooks: {}, }; for (const [key, value] of this.metrics.entries()) { if (key.startsWith('errors_')) { const webhookId = key.replace('errors_', ''); report.webhooks[webhookId] = { count: value.count, recentErrors: value.recentErrors.slice(-10), // Last 10 errors }; report.summary.totalErrors += value.count; report.summary.webhooksWithErrors++; } } return report; } static cleanupOldMetrics() { const cutoff = Date.now() - this.METRIC_RETENTION_MS; for (const [key, value] of this.metrics.entries()) { if (value.timestamp && value.timestamp < cutoff) { this.metrics.delete(key); } } } static getMemoryUsage(): any { return { metricsCount: this.metrics.size, estimatedMemoryMB: (this.metrics.size * 1024) / (1024 * 1024), // Rough estimate }; } } // Database query optimization utilities export class WebhookQueryOptimizer { static async getWebhooksBatch(ids: string[], batchSize = 100): Promise<any[]> { const results: any[] = []; for (let i = 0; i < ids.length; i += batchSize) { const batch = ids.slice(i, i + batchSize); // Replace with your actual database query const batchResults = await this.queryWebhooksByIds(batch); results.push(...batchResults); } return results; } private static async queryWebhooksByIds(ids: string[]): Promise<any[]> { // Example database query - replace with your actual implementation // return await db.webhooks.findMany({ // where: { id: { in: ids } }, // select: { id: true, url: true, secret: true, events: true } // }); return []; } static createIndexHints(): string[] { return [ 'CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_webhooks_status ON webhooks(status)', 'CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_webhook_events_created_at ON webhook_events(created_at)', 'CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_webhook_events_webhook_id ON webhook_events(webhook_id)', 'CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_webhook_events_retry_count ON webhook_events(retry_count)', ]; } } // Connection pooling and resource management export class WebhookResourceManager { private static connectionPool: any = null; private static readonly MAX_CONNECTIONS = 20; private static readonly IDLE_TIMEOUT = 30000; static initializePool() { // Initialize connection pool based on your database/HTTP client // This is a conceptual example this.connectionPool = { maxConnections: this.MAX_CONNECTIONS, idleTimeout: this.IDLE_TIMEOUT, activeConnections: 0, waitingRequests: [], }; } static async getConnection(): Promise<any> { // Return a connection from the pool // Implement actual connection pooling logic here return this.connectionPool; } static releaseConnection(connection: any) { // Return connection to pool // Implement cleanup logic here } static getPoolStats(): any { return { activeConnections: this.connectionPool?.activeConnections || 0, maxConnections: this.MAX_CONNECTIONS, waitingRequests: this.connectionPool?.waitingRequests?.length || 0, }; } } // Rate limiting and throttling export class WebhookRateLimiter { private static limits = new Map<string, any>(); private static readonly DEFAULT_LIMIT = 100; // requests per minute private static readonly WINDOW_MS = 60 * 1000; // 1 minute static async checkLimit(webhookId: string, customLimit?: number): Promise<boolean> { const limit = customLimit || this.DEFAULT_LIMIT; const now = Date.now(); const windowStart = now - this.WINDOW_MS; const key = `rate_limit_${webhookId}`; const existing = this.limits.get(key) || { requests: [], }; // Remove old requests outside the window existing.requests = existing.requests.filter((timestamp: number) => timestamp > windowStart); if (existing.requests.length >= limit) { return false; // Rate limit exceeded } existing.requests.push(now); this.limits.set(key, existing); return true; } static getRateLimitStatus(webhookId: string): any { const key = `rate_limit_${webhookId}`; const existing = this.limits.get(key) || { requests: [] }; const now = Date.now(); const windowStart = now - this.WINDOW_MS; const recentRequests = existing.requests.filter((timestamp: number) => timestamp > windowStart); return { currentRequests: recentRequests.length, limit: this.DEFAULT_LIMIT, resetTime: windowStart + this.WINDOW_MS, remaining: Math.max(0, this.DEFAULT_LIMIT - recentRequests.length), }; } static clearExpiredLimits() { const now = Date.now(); const windowStart = now - this.WINDOW_MS; for (const [key, value] of this.limits.entries()) { value.requests = value.requests.filter((timestamp: number) => timestamp > windowStart); if (value.requests.length === 0) { this.limits.delete(key); } } } } // Usage example with integrated monitoring export async function processWebhookWithMonitoring(webhookId: string, payload: any) { const timerId = WebhookPerformanceMonitor.startTimer(webhookId); try { // Check rate limit if (!(await WebhookRateLimiter.checkLimit(webhookId))) { throw new Error('Rate limit exceeded'); } // Process webhook const result = await processWebhook(webhookId, payload); WebhookPerformanceMonitor.endTimer(timerId, { payloadSize: JSON.stringify(payload).length, success: true, }); return result; } catch (error) { WebhookPerformanceMonitor.recordError(webhookId, error as Error, { payloadSize: JSON.stringify(payload).length, }); WebhookPerformanceMonitor.endTimer(timerId, { payloadSize: JSON.stringify(payload).length, success: false, error: (error as Error).message, }); throw error; } } async function processWebhook(webhookId: string, payload: any): Promise<any> { // Your webhook processing logic here return { success: true }; } // Cleanup service to run periodically setInterval(() => { WebhookPerformanceMonitor.cleanupOldMetrics(); WebhookRateLimiter.clearExpiredLimits(); }, 5 * 60 * 1000); // Every 5 minutes