# ADR-022: Real-time Notification System **Date**: 2025-12-12 **Status**: Accepted **Implemented**: 2026-01-19 ## Context A core feature is providing "Active Deal Alerts" to users. The current HTTP-based architecture is not suitable for pushing real-time updates to clients efficiently. Relying on traditional polling would be inefficient and slow. Users need to be notified immediately when: 1. **New deals are found** on their watched items 2. **System announcements** need to be broadcast 3. **Background jobs complete** that affect their data Traditional approaches: - **HTTP Polling**: Inefficient, creates unnecessary load, delays up to polling interval - **Server-Sent Events (SSE)**: One-way only, no client-to-server messaging - **WebSockets**: Bi-directional, real-time, efficient ## Decision We will implement a real-time communication system using **WebSockets** with the `ws` library. This will involve: 1. **WebSocket Server**: Manages connections, authentication, and message routing 2. **React Hook**: Provides easy integration for React components 3. **Event Bus Integration**: Bridges WebSocket messages to in-app events 4. **Background Job Integration**: Emits WebSocket notifications when deals are found ### Design Principles - **JWT Authentication**: WebSocket connections authenticated via JWT tokens - **Type-Safe Messages**: Strongly-typed message formats prevent errors - **Auto-Reconnect**: Client automatically reconnects with exponential backoff - **Graceful Degradation**: Email + DB notifications remain for offline users - **Heartbeat Ping/Pong**: Detect and cleanup dead connections - **Singleton Service**: Single WebSocket service instance shared across app ## Implementation Details ### WebSocket Message Types Located in `src/types/websocket.ts`: ```typescript export interface WebSocketMessage { type: WebSocketMessageType; data: T; timestamp: string; } export type WebSocketMessageType = | 'deal-notification' | 'system-message' | 'ping' | 'pong' | 'error' | 'connection-established'; // Deal notification payload export interface DealNotificationData { notification_id?: string; deals: DealInfo[]; user_id: string; message: string; } // Type-safe message creators export const createWebSocketMessage = { dealNotification: (data: DealNotificationData) => ({ ... }), systemMessage: (data: SystemMessageData) => ({ ... }), error: (data: ErrorMessageData) => ({ ... }), // ... }; ``` ### WebSocket Server Service Located in `src/services/websocketService.server.ts`: ```typescript export class WebSocketService { private wss: WebSocketServer | null = null; private clients: Map> = new Map(); private pingInterval: NodeJS.Timeout | null = null; initialize(server: HTTPServer): void { this.wss = new WebSocketServer({ server, path: '/ws', }); this.wss.on('connection', (ws, request) => { this.handleConnection(ws, request); }); this.startHeartbeat(); // Ping every 30s } // Authentication via JWT from query string or cookie private extractToken(request: IncomingMessage): string | null { // Extract from ?token=xxx or Cookie: accessToken=xxx } // Broadcast to specific user broadcastDealNotification(userId: string, data: DealNotificationData): void { const message = createWebSocketMessage.dealNotification(data); this.broadcastToUser(userId, message); } // Broadcast to all users broadcastToAll(data: SystemMessageData): void { // Send to all connected clients } shutdown(): void { // Gracefully close all connections } } export const websocketService = new WebSocketService(globalLogger); ``` ### Server Integration Located in `server.ts`: ```typescript import { websocketService } from './src/services/websocketService.server'; if (process.env.NODE_ENV !== 'test') { const server = app.listen(PORT, () => { logger.info(`Authentication server started on port ${PORT}`); }); // Initialize WebSocket server (ADR-022) websocketService.initialize(server); logger.info('WebSocket server initialized for real-time notifications'); // Graceful shutdown const handleShutdown = (signal: string) => { websocketService.shutdown(); gracefulShutdown(signal); }; process.on('SIGINT', () => handleShutdown('SIGINT')); process.on('SIGTERM', () => handleShutdown('SIGTERM')); } ``` ### React Client Hook Located in `src/hooks/useWebSocket.ts`: ```typescript export function useWebSocket(options: UseWebSocketOptions = {}) { const [state, setState] = useState({ isConnected: false, isConnecting: false, error: null, }); const connect = useCallback(() => { const url = getWebSocketUrl(); // wss://host/ws?token=xxx const ws = new WebSocket(url); ws.onmessage = (event) => { const message = JSON.parse(event.data) as WebSocketMessage; // Emit to event bus for cross-component communication switch (message.type) { case 'deal-notification': eventBus.dispatch('notification:deal', message.data); break; case 'system-message': eventBus.dispatch('notification:system', message.data); break; // ... } }; ws.onclose = () => { // Auto-reconnect with exponential backoff if (reconnectAttempts < maxReconnectAttempts) { setTimeout(connect, reconnectDelay * Math.pow(2, reconnectAttempts)); reconnectAttempts++; } }; }, []); useEffect(() => { if (autoConnect) connect(); return () => disconnect(); }, [autoConnect, connect, disconnect]); return { ...state, connect, disconnect, send }; } ``` ### Background Job Integration Located in `src/services/backgroundJobService.ts`: ```typescript private async _processDealsForUser({ userProfile, deals }: UserDealGroup) { // ... existing email notification logic ... // Send real-time WebSocket notification (ADR-022) const { websocketService } = await import('./websocketService.server'); websocketService.broadcastDealNotification(userProfile.user_id, { user_id: userProfile.user_id, deals: deals.map((deal) => ({ item_name: deal.item_name, best_price_in_cents: deal.best_price_in_cents, store_name: deal.store.name, store_id: deal.store.store_id, })), message: `You have ${deals.length} new deal(s) on your watched items!`, }); } ``` ### Usage in React Components ```typescript import { useWebSocket } from '../hooks/useWebSocket'; import { useEventBus } from '../hooks/useEventBus'; import { useCallback } from 'react'; function NotificationComponent() { // Connect to WebSocket const { isConnected, error } = useWebSocket({ autoConnect: true }); // Listen for deal notifications via event bus const handleDealNotification = useCallback((data: DealNotificationData) => { toast.success(`${data.deals.length} new deals found!`); }, []); useEventBus('notification:deal', handleDealNotification); return (
{isConnected ? '🟒 Live' : 'πŸ”΄ Offline'}
); } ``` ## Architecture Diagram ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ WebSocket Architecture β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ Server Side: β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Background Job │─────▢│ WebSocket │─────▢│ Connected β”‚ β”‚ (Deal Checker) β”‚ β”‚ Service β”‚ β”‚ Clients β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–² β”‚ β”‚ β–Ό β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ Email Queue β”‚ β”‚ β”‚ (BullMQ) β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β–Ό β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ DB Notification β”‚ β”‚ Express Server β”‚ β”‚ Storage β”‚ β”‚ + WS Upgrade β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ Client Side: β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ useWebSocket │◀────▢│ WebSocket │◀────▢│ Event Bus β”‚ β”‚ Hook β”‚ β”‚ Connection β”‚ β”‚ Integration β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ UI Components β”‚ β”‚ (Notifications) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ## Security Considerations 1. **Authentication**: JWT tokens required for WebSocket connections 2. **User Isolation**: Messages routed only to authenticated user's connections 3. **Rate Limiting**: Heartbeat ping/pong prevents connection flooding 4. **Graceful Shutdown**: Notifies clients before server shutdown 5. **Error Handling**: Failed WebSocket sends don't crash the server ## Consequences ### Positive - **Real-time Updates**: Users see deals immediately when found - **Better UX**: No page refresh needed, instant notifications - **Efficient**: Single persistent connection vs polling every N seconds - **Scalable**: Connection pooling per user, heartbeat cleanup - **Type-Safe**: TypeScript types prevent message format errors - **Resilient**: Auto-reconnect with exponential backoff - **Observable**: Connection stats available via `getConnectionStats()` - **Testable**: Comprehensive unit tests for message types and service ### Negative - **Complexity**: WebSocket server adds new infrastructure component - **Memory**: Each connection consumes server memory - **Scaling**: Single-server implementation (multi-server requires Redis pub/sub) - **Browser Support**: Requires WebSocket-capable browsers (all modern browsers) - **Network**: Persistent connections require stable network ### Mitigation - **Graceful Degradation**: Email + DB notifications remain for offline users - **Connection Limits**: Can add max connections per user if needed - **Monitoring**: Connection stats exposed for observability - **Future Scaling**: Can add Redis pub/sub for multi-instance deployments - **Heartbeat**: 30s ping/pong detects and cleans up dead connections ## Testing Strategy ### Unit Tests Located in `src/services/websocketService.server.test.ts`: ```typescript describe('WebSocketService', () => { it('should initialize without errors', () => { ... }); it('should handle broadcasting with no active connections', () => { ... }); it('should shutdown gracefully', () => { ... }); }); ``` Located in `src/types/websocket.test.ts`: ```typescript describe('WebSocket Message Creators', () => { it('should create valid deal notification messages', () => { ... }); it('should generate valid ISO timestamps', () => { ... }); }); ``` ### Integration Tests Future work: Add integration tests that: - Connect WebSocket clients to test server - Verify authentication and message routing - Test reconnection logic - Validate message delivery ## Key Files - `src/types/websocket.ts` - WebSocket message types and creators - `src/services/websocketService.server.ts` - WebSocket server service - `src/hooks/useWebSocket.ts` - React hook for WebSocket connections - `src/services/backgroundJobService.ts` - Integration point for deal notifications - `server.ts` - Express + WebSocket server initialization - `src/services/websocketService.server.test.ts` - Unit tests - `src/types/websocket.test.ts` - Message type tests ## Related ADRs - [ADR-036](./0036-event-bus-and-pub-sub-pattern.md) - Event Bus Pattern (used by client hook) - [ADR-042](./0042-email-and-notification-architecture.md) - Email Notifications (fallback mechanism) - [ADR-006](./0006-background-job-processing-and-task-queues.md) - Background Jobs (triggers WebSocket notifications)