Files
flyer-crawler.projectium.com/src/services/websocketService.server.ts
Torben Sorensen 99f5d52d17
All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 18m34s
more test fixes
2026-01-19 12:13:04 -08:00

371 lines
9.9 KiB
TypeScript

// src/services/websocketService.server.ts
/**
* WebSocket service for real-time notifications
* Manages WebSocket connections and broadcasts messages to connected clients
*/
import { WebSocketServer, WebSocket } from 'ws';
import type { Server as HTTPServer } from 'http';
import jwt from 'jsonwebtoken';
import type { Logger } from 'pino';
import { logger as globalLogger } from './logger.server';
import {
createWebSocketMessage,
type WebSocketMessage,
type DealNotificationData,
type SystemMessageData,
} from '../types/websocket';
import type { IncomingMessage } from 'http';
const JWT_SECRET = process.env.JWT_SECRET || 'test-secret';
if (!process.env.JWT_SECRET) {
console.warn('[WebSocket] JWT_SECRET not set in environment, using fallback');
}
/**
* Extended WebSocket with user context
*/
interface AuthenticatedWebSocket extends WebSocket {
userId?: string;
isAlive?: boolean;
}
/**
* JWT payload structure
*/
interface JWTPayload {
user_id: string;
email: string;
role: string;
}
export class WebSocketService {
private wss: WebSocketServer | null = null;
private clients: Map<string, Set<AuthenticatedWebSocket>> = new Map();
private pingInterval: NodeJS.Timeout | null = null;
constructor(private logger: Logger) {}
/**
* Initialize the WebSocket server and attach it to an HTTP server
*/
initialize(server: HTTPServer): void {
this.wss = new WebSocketServer({
server,
path: '/ws',
});
this.logger.info('WebSocket server initialized on path /ws');
this.wss.on('connection', (ws: AuthenticatedWebSocket, request: IncomingMessage) => {
this.handleConnection(ws, request);
});
// Start heartbeat ping/pong to detect dead connections
this.startHeartbeat();
}
/**
* Handle new WebSocket connection
*/
private handleConnection(ws: AuthenticatedWebSocket, request: IncomingMessage): void {
const connectionLogger = this.logger.child({ context: 'ws-connection' });
// Extract JWT token from query string or cookie
const token = this.extractToken(request);
if (!token) {
connectionLogger.warn('WebSocket connection rejected: No token provided');
ws.close(1008, 'Authentication required');
return;
}
// Verify JWT token
let payload: JWTPayload;
try {
const verified = jwt.verify(token, JWT_SECRET);
connectionLogger.debug({ verified, type: typeof verified }, 'JWT verification result');
if (!verified || typeof verified === 'string') {
connectionLogger.warn(
'WebSocket connection rejected: JWT verification returned invalid payload',
);
ws.close(1008, 'Invalid token');
return;
}
payload = verified as JWTPayload;
} catch (error) {
connectionLogger.warn({ error }, 'WebSocket connection rejected: Invalid token');
ws.close(1008, 'Invalid token');
return;
}
// Attach user ID to the WebSocket connection
ws.userId = payload.user_id;
ws.isAlive = true;
// Register the client
this.registerClient(ws);
connectionLogger.info(
{ userId: ws.userId },
`WebSocket client connected for user ${ws.userId}`,
);
// Send connection confirmation
const confirmationMessage = createWebSocketMessage.connectionEstablished({
user_id: ws.userId,
message: 'Connected to real-time notification service',
});
this.sendToClient(ws, confirmationMessage);
// Handle incoming messages
ws.on('message', (data: Buffer) => {
this.handleMessage(ws, data);
});
// Handle pong responses (heartbeat)
ws.on('pong', () => {
ws.isAlive = true;
});
// Handle disconnection
ws.on('close', () => {
this.unregisterClient(ws);
connectionLogger.info({ userId: ws.userId }, 'WebSocket client disconnected');
});
// Handle errors
ws.on('error', (error: Error) => {
connectionLogger.error({ error, userId: ws.userId }, 'WebSocket error');
});
}
/**
* Extract JWT token from request (query string or cookie)
*/
private extractToken(request: IncomingMessage): string | null {
// Try to extract from query string (?token=xxx)
const url = new URL(request.url || '', `http://${request.headers.host}`);
const tokenFromQuery = url.searchParams.get('token');
if (tokenFromQuery) {
return tokenFromQuery;
}
// Try to extract from cookie
const cookieHeader = request.headers.cookie;
if (cookieHeader) {
const cookies = cookieHeader.split(';').reduce(
(acc, cookie) => {
const [key, value] = cookie.trim().split('=');
acc[key] = value;
return acc;
},
{} as Record<string, string>,
);
return cookies['accessToken'] || null;
}
return null;
}
/**
* Register a WebSocket client
*/
private registerClient(ws: AuthenticatedWebSocket): void {
if (!ws.userId) return;
if (!this.clients.has(ws.userId)) {
this.clients.set(ws.userId, new Set());
}
this.clients.get(ws.userId)!.add(ws);
this.logger.info(
{ userId: ws.userId, totalConnections: this.clients.get(ws.userId)!.size },
'Client registered',
);
}
/**
* Unregister a WebSocket client
*/
private unregisterClient(ws: AuthenticatedWebSocket): void {
if (!ws.userId) return;
const userClients = this.clients.get(ws.userId);
if (userClients) {
userClients.delete(ws);
if (userClients.size === 0) {
this.clients.delete(ws.userId);
}
}
}
/**
* Handle incoming messages from clients
*/
private handleMessage(ws: AuthenticatedWebSocket, data: Buffer): void {
try {
const message = JSON.parse(data.toString()) as WebSocketMessage;
// Handle ping messages
if (message.type === 'ping') {
const pongMessage = createWebSocketMessage.pong();
this.sendToClient(ws, pongMessage);
}
// Log other message types for debugging
this.logger.debug(
{ userId: ws.userId, messageType: message.type },
'Received WebSocket message',
);
} catch (error) {
this.logger.error({ error }, 'Failed to parse WebSocket message');
}
}
/**
* Send a message to a specific WebSocket client
*/
private sendToClient(ws: AuthenticatedWebSocket, message: WebSocketMessage): void {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
/**
* Broadcast a deal notification to a specific user
*/
broadcastDealNotification(userId: string, data: DealNotificationData): void {
const message = createWebSocketMessage.dealNotification(data);
this.broadcastToUser(userId, message);
}
/**
* Broadcast a system message to a specific user
*/
broadcastSystemMessage(userId: string, data: SystemMessageData): void {
const message = createWebSocketMessage.systemMessage(data);
this.broadcastToUser(userId, message);
}
/**
* Broadcast a message to all connections of a specific user
*/
private broadcastToUser(userId: string, message: WebSocketMessage): void {
const userClients = this.clients.get(userId);
if (!userClients || userClients.size === 0) {
this.logger.debug({ userId }, 'No active WebSocket connections for user');
return;
}
let sentCount = 0;
userClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
this.sendToClient(client, message);
sentCount++;
}
});
this.logger.info(
{ userId, messageType: message.type, sentCount, totalConnections: userClients.size },
'Broadcast message to user',
);
}
/**
* Broadcast a system message to all connected clients
*/
broadcastToAll(data: SystemMessageData): void {
const message = createWebSocketMessage.systemMessage(data);
let sentCount = 0;
this.clients.forEach((userClients) => {
userClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
this.sendToClient(client, message);
sentCount++;
}
});
});
this.logger.info(
{ messageType: message.type, sentCount, totalUsers: this.clients.size },
'Broadcast message to all users',
);
}
/**
* Start heartbeat ping/pong to detect dead connections
*/
private startHeartbeat(): void {
this.pingInterval = setInterval(() => {
if (!this.wss) return;
this.wss.clients.forEach((ws) => {
const authWs = ws as AuthenticatedWebSocket;
if (authWs.isAlive === false) {
this.logger.debug({ userId: authWs.userId }, 'Terminating dead connection');
return authWs.terminate();
}
authWs.isAlive = false;
authWs.ping();
});
}, 30000); // Ping every 30 seconds
this.logger.info('WebSocket heartbeat started (30s interval)');
}
/**
* Get count of active connections
*/
getConnectionStats(): { totalUsers: number; totalConnections: number } {
let totalConnections = 0;
this.clients.forEach((userClients) => {
totalConnections += userClients.size;
});
return {
totalUsers: this.clients.size,
totalConnections,
};
}
/**
* Shutdown the WebSocket server gracefully
*/
shutdown(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
if (this.wss) {
this.logger.info('Shutting down WebSocket server');
// Notify all clients about shutdown
this.broadcastToAll({
message: 'Server is shutting down. Please reconnect.',
severity: 'warning',
});
// Close all connections
this.wss.clients.forEach((client) => {
client.close(1001, 'Server shutting down');
});
this.wss.close(() => {
this.logger.info('WebSocket server closed');
});
this.clients.clear();
}
}
}
// Singleton instance
export const websocketService = new WebSocketService(globalLogger);