From e8d511b4de8eff3d532a3cd8a1430740a02a2650 Mon Sep 17 00:00:00 2001 From: Torben Sorensen Date: Thu, 25 Dec 2025 10:59:35 -0800 Subject: [PATCH] more work on the BullMQ workers --- ecosystem.config.cjs | 4 +- src/routes/admin.monitoring.routes.test.ts | 40 +- src/routes/admin.routes.ts | 8 +- src/services/queueService.server.ts | 442 +-------------------- src/services/queues.server.ts | 96 +++++ src/services/redis.server.ts | 16 + src/services/worker.ts | 30 ++ src/services/workers.server.test.ts | 346 ++++++++++++++++ src/services/workers.server.ts | 344 ++++++++++++++++ 9 files changed, 884 insertions(+), 442 deletions(-) create mode 100644 src/services/queues.server.ts create mode 100644 src/services/redis.server.ts create mode 100644 src/services/worker.ts create mode 100644 src/services/workers.server.test.ts create mode 100644 src/services/workers.server.ts diff --git a/ecosystem.config.cjs b/ecosystem.config.cjs index 7f08220..e64f9c6 100644 --- a/ecosystem.config.cjs +++ b/ecosystem.config.cjs @@ -88,7 +88,7 @@ module.exports = { // --- General Worker --- name: 'flyer-crawler-worker', script: './node_modules/.bin/tsx', - args: 'src/services/queueService.server.ts', // tsx will execute this file + args: 'src/worker.ts', // tsx will execute this file // Production Environment Settings env_production: { NODE_ENV: 'production', @@ -164,7 +164,7 @@ module.exports = { // --- Analytics Worker --- name: 'flyer-crawler-analytics-worker', script: './node_modules/.bin/tsx', - args: 'src/services/queueService.server.ts', // tsx will execute this file + args: 'src/worker.ts', // tsx will execute this file // Production Environment Settings env_production: { NODE_ENV: 'production', diff --git a/src/routes/admin.monitoring.routes.test.ts b/src/routes/admin.monitoring.routes.test.ts index 9c526fa..90ef1bd 100644 --- a/src/routes/admin.monitoring.routes.test.ts +++ b/src/routes/admin.monitoring.routes.test.ts @@ -5,7 +5,16 @@ import type { Request, Response, NextFunction } from 'express'; import { createMockUserProfile, createMockActivityLogItem } from '../tests/utils/mockFactories'; import type { UserProfile } from '../types'; import { createTestApp } from '../tests/utils/createTestApp'; -import { mockLogger } from '../tests/utils/mockLogger'; + +const { mockLogger } = vi.hoisted(() => ({ + mockLogger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + child: vi.fn().mockReturnThis(), + }, +})); vi.mock('../lib/queue', () => ({ serverAdapter: { @@ -27,19 +36,22 @@ vi.mock('../services/db/index.db', () => ({ notificationRepo: {}, })); -// Mock the queue service to control worker statuses +// Mock the queue service for queue status checks vi.mock('../services/queueService.server', () => ({ + flyerQueue: { name: 'flyer-processing', getJobCounts: vi.fn() }, + emailQueue: { name: 'email-sending', getJobCounts: vi.fn() }, + analyticsQueue: { name: 'analytics-reporting', getJobCounts: vi.fn() }, + cleanupQueue: { name: 'file-cleanup', getJobCounts: vi.fn() }, + weeklyAnalyticsQueue: { name: 'weekly-analytics-reporting', getJobCounts: vi.fn() }, +})); + +// Mock the worker service for worker status checks +vi.mock('../services/workers.server', () => ({ flyerWorker: { name: 'flyer-processing', isRunning: vi.fn() }, emailWorker: { name: 'email-sending', isRunning: vi.fn() }, analyticsWorker: { name: 'analytics-reporting', isRunning: vi.fn() }, cleanupWorker: { name: 'file-cleanup', isRunning: vi.fn() }, weeklyAnalyticsWorker: { name: 'weekly-analytics-reporting', isRunning: vi.fn() }, - flyerQueue: { name: 'flyer-processing', getJobCounts: vi.fn() }, - emailQueue: { name: 'email-sending', getJobCounts: vi.fn() }, - analyticsQueue: { name: 'analytics-reporting', getJobCounts: vi.fn() }, - cleanupQueue: { name: 'file-cleanup', getJobCounts: vi.fn() }, - // FIX: Add the missing weeklyAnalyticsQueue to prevent import errors in admin.routes.ts - weeklyAnalyticsQueue: { name: 'weekly-analytics-reporting', getJobCounts: vi.fn() }, })); // Mock other dependencies that are part of the adminRouter setup but not directly tested here @@ -67,8 +79,10 @@ import adminRouter from './admin.routes'; // Import the mocked modules to control them import * as queueService from '../services/queueService.server'; +import * as workerService from '../services/workers.server'; import { adminRepo } from '../services/db/index.db'; const mockedQueueService = queueService as Mocked; +const mockedWorkerService = workerService as Mocked; // Mock the logger vi.mock('../services/logger.server', () => ({ @@ -137,11 +151,11 @@ describe('Admin Monitoring Routes (/api/admin)', () => { describe('GET /workers/status', () => { it('should return the status of all registered workers', async () => { // Arrange: Set the mock status for each worker - vi.mocked(mockedQueueService.flyerWorker.isRunning).mockReturnValue(true); - vi.mocked(mockedQueueService.emailWorker.isRunning).mockReturnValue(true); - vi.mocked(mockedQueueService.analyticsWorker.isRunning).mockReturnValue(false); // Simulate one worker being stopped - vi.mocked(mockedQueueService.cleanupWorker.isRunning).mockReturnValue(true); - vi.mocked(mockedQueueService.weeklyAnalyticsWorker.isRunning).mockReturnValue(true); + vi.mocked(mockedWorkerService.flyerWorker.isRunning).mockReturnValue(true); + vi.mocked(mockedWorkerService.emailWorker.isRunning).mockReturnValue(true); + vi.mocked(mockedWorkerService.analyticsWorker.isRunning).mockReturnValue(false); // Simulate one worker being stopped + vi.mocked(mockedWorkerService.cleanupWorker.isRunning).mockReturnValue(true); + vi.mocked(mockedWorkerService.weeklyAnalyticsWorker.isRunning).mockReturnValue(true); // Act const response = await supertest(app).get('/api/admin/workers/status'); diff --git a/src/routes/admin.routes.ts b/src/routes/admin.routes.ts index d15abd6..b250cdf 100644 --- a/src/routes/admin.routes.ts +++ b/src/routes/admin.routes.ts @@ -25,12 +25,14 @@ import { analyticsQueue, cleanupQueue, weeklyAnalyticsQueue, - flyerWorker, - emailWorker, +} from '../services/queueService.server'; // Import your queues +import { analyticsWorker, cleanupWorker, + emailWorker, + flyerWorker, weeklyAnalyticsWorker, -} from '../services/queueService.server'; // Import your queues +} from '../services/workers.server'; import { getSimpleWeekAndYear } from '../utils/dateUtils'; import { requiredString, diff --git a/src/services/queueService.server.ts b/src/services/queueService.server.ts index d74d3ec..95372e0 100644 --- a/src/services/queueService.server.ts +++ b/src/services/queueService.server.ts @@ -1,438 +1,32 @@ // src/services/queueService.server.ts -import { Queue, Worker, Job, UnrecoverableError } from 'bullmq'; -import IORedis from 'ioredis'; // Correctly imported -import fsPromises from 'node:fs/promises'; -import { exec } from 'child_process'; -import { promisify } from 'util'; - import { logger } from './logger.server'; -import { aiService } from './aiService.server'; -import * as emailService from './emailService.server'; -import * as db from './db/index.db'; +import { connection } from './redis.server'; import { - FlyerProcessingService, - type FlyerJobData, - type IFileSystem, -} from './flyerProcessingService.server'; -import { FlyerDataTransformer } from './flyerDataTransformer'; + flyerQueue, + emailQueue, + analyticsQueue, + weeklyAnalyticsQueue, + cleanupQueue, + tokenCleanupQueue, +} from './queues.server'; -export const connection = new IORedis(process.env.REDIS_URL!, { - maxRetriesPerRequest: null, // Important for BullMQ - password: process.env.REDIS_PASSWORD, // Add the password from environment variables -}); +// Re-export everything for backward compatibility where possible +export { connection } from './redis.server'; +export * from './queues.server'; -// --- Redis Connection Event Listeners --- -connection.on('connect', () => { - logger.info('[Redis] Connection established successfully.'); -}); - -connection.on('error', (err) => { - // This is crucial for diagnosing Redis connection issues. // The patch requested this specific error handling. - logger.error({ err }, '[Redis] Connection error.'); -}); - -const execAsync = promisify(exec); -// --- Queues --- -export const flyerQueue = new Queue('flyer-processing', { - connection, - defaultJobOptions: { - attempts: 3, // Attempt a job 3 times before marking it as failed. - backoff: { - type: 'exponential', - delay: 5000, // Start with a 5-second delay for the first retry - }, - }, -}); - -export const emailQueue = new Queue('email-sending', { - connection, - defaultJobOptions: { - attempts: 5, // Emails can be retried more aggressively - backoff: { - type: 'exponential', - delay: 10000, // Start with a 10-second delay - }, - }, -}); - -export const analyticsQueue = new Queue('analytics-reporting', { - connection, - defaultJobOptions: { - attempts: 2, // Analytics can be intensive, so fewer retries might be desired. - backoff: { - type: 'exponential', - delay: 60000, // Wait a minute before retrying. - }, - // Remove job from queue on completion to save space, as results are in the DB. - removeOnComplete: true, - removeOnFail: 50, // Keep the last 50 failed jobs for inspection. - }, -}); - -export const weeklyAnalyticsQueue = new Queue( - 'weekly-analytics-reporting', - { - connection, - defaultJobOptions: { - attempts: 2, - backoff: { - type: 'exponential', - delay: 3600000, // 1 hour delay for retries - }, - removeOnComplete: true, - removeOnFail: 50, - }, - }, -); - -export const cleanupQueue = new Queue('file-cleanup', { - connection, - defaultJobOptions: { - attempts: 3, - backoff: { - type: 'exponential', - delay: 30000, // Retry cleanup after 30 seconds - }, - removeOnComplete: true, // No need to keep successful cleanup jobs - }, -}); - -export const tokenCleanupQueue = new Queue('token-cleanup', { - connection, - defaultJobOptions: { - attempts: 2, - backoff: { - type: 'exponential', - delay: 3600000, // 1 hour delay - }, - removeOnComplete: true, - removeOnFail: 10, - }, -}); -// --- Job Data Interfaces --- - -interface EmailJobData { - to: string; - subject: string; - text: string; - html: string; -} +// We do NOT export workers here anymore to prevent side effects. +// Consumers needing workers must import from './workers.server'. /** - * Defines the data for an analytics job. - */ -interface AnalyticsJobData { - reportDate: string; // e.g., '2024-10-26' -} - -/** - * Defines the data for a weekly analytics job. - */ -interface WeeklyAnalyticsJobData { - reportYear: number; - reportWeek: number; // ISO week number (1-53) -} - -interface CleanupJobData { - flyerId: number; - // An array of absolute file paths to be deleted. Made optional for manual cleanup triggers. - paths?: string[]; -} - -/** - * Defines the data for a token cleanup job. - */ -interface TokenCleanupJobData { - timestamp: string; // ISO string to ensure the job is unique per run -} - -// --- Worker Instantiation --- - -// Create an adapter for fsPromises to match the IFileSystem interface. -const fsAdapter: IFileSystem = { - readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options), - unlink: (path: string) => fsPromises.unlink(path), -}; - -// Instantiate the service with its real dependencies -const flyerProcessingService = new FlyerProcessingService( - aiService, - db, - fsAdapter, - execAsync, - cleanupQueue, // Inject the cleanup queue to break the circular dependency - new FlyerDataTransformer(), // Inject the new transformer -); - -/** - * Helper to ensure that an unknown error is normalized to an Error object. - * This ensures consistent logging structure and stack traces. - */ -const normalizeError = (error: unknown): Error => { - return error instanceof Error ? error : new Error(String(error)); -}; - -/** - * A generic function to attach logging event listeners to any worker. - * This centralizes logging for job completion and final failure. - * @param worker The BullMQ worker instance. - */ -const attachWorkerEventListeners = (worker: Worker) => { - worker.on('completed', (job: Job, returnValue: unknown) => { - logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`); - }); - - worker.on('failed', (job: Job | undefined, error: Error) => { - // This event fires after all retries have failed. - logger.error( - { err: error, jobData: job?.data }, - `[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`, - ); - }); -}; - -export const flyerWorker = new Worker( - 'flyer-processing', // Must match the queue name - async (job) => { - try { - // The processJob method creates its own job-specific logger internally. - return await flyerProcessingService.processJob(job); - } catch (error: unknown) { - const wrappedError = normalizeError(error); - // Check for quota errors or other unrecoverable errors from the AI service - const errorMessage = wrappedError.message || ''; - if ( - errorMessage.includes('quota') || - errorMessage.includes('429') || - errorMessage.includes('RESOURCE_EXHAUSTED') - ) { - logger.error( - { err: wrappedError, jobId: job.id }, - '[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.', - ); - throw new UnrecoverableError(errorMessage); - } - throw error; - } - }, - { - connection, - concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10), - }, -); -/** - * A dedicated worker process for sending emails. - */ -export const emailWorker = new Worker( - 'email-sending', - async (job: Job) => { - const { to, subject } = job.data; - // Create a job-specific logger instance - const jobLogger = logger.child({ jobId: job.id, jobName: job.name }); - jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`); - try { - await emailService.sendEmail(job.data, jobLogger); - } catch (error: unknown) { - const wrappedError = normalizeError(error); - logger.error( - { - err: wrappedError, - jobData: job.data, - }, - `[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, - ); - // Re-throw to let BullMQ handle the failure and retry. - throw wrappedError; - } - }, - { - connection, - concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10), - }, -); - -/** - * A dedicated worker for generating daily analytics reports. - * This is a placeholder for the actual report generation logic. - */ -export const analyticsWorker = new Worker( - 'analytics-reporting', - async (job: Job) => { - const { reportDate } = job.data; - logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`); - try { - // Special case for testing the retry mechanism - if (reportDate === 'FAIL') { - throw new Error('This is a test failure for the analytics job.'); - } - - // In a real implementation, you would call a database function here. - // For example: await db.generateDailyAnalyticsReport(reportDate); - await new Promise((resolve) => setTimeout(resolve, 10000)); // Simulate a 10-second task - logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`); - } catch (error: unknown) { - const wrappedError = normalizeError(error); - // Standardize error logging. - logger.error({ err: wrappedError, jobData: job.data }, - `[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, - ); - throw wrappedError; // Re-throw to let BullMQ handle the failure and retry. - } - }, - { - connection, - concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10), - }, -); - -/** - * A dedicated worker for cleaning up flyer-related files from the filesystem. - * This is triggered manually by an admin after a flyer has been reviewed. - */ -export const cleanupWorker = new Worker( - // This worker now handles two types of cleanup jobs. - 'file-cleanup', // The queue name - async (job: Job) => { - // Destructure the data from the job payload. - const { flyerId, paths } = job.data; - logger.info( - { paths }, - `[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`, - ); - - try { - if (!paths || paths.length === 0) { - logger.warn( - `[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`, - ); - return; - } - - // Iterate over the file paths provided in the job data and delete each one. - for (const filePath of paths) { - try { - await fsAdapter.unlink(filePath); - logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`); - } catch (unlinkError: unknown) { - // If the file doesn't exist, it's a success from our perspective. - // We can log it as a warning and continue without failing the job. - if ( - unlinkError instanceof Error && - 'code' in unlinkError && - unlinkError.code === 'ENOENT' - ) { - logger.warn( - `[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`, - ); - } else { - throw unlinkError; // For any other error (e.g., permissions), re-throw to fail the job. - } - } - } - logger.info( - `[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`, - ); - } catch (error: unknown) { - const wrappedError = normalizeError(error); - // Standardize error logging. - logger.error( - { err: wrappedError }, - `[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, - ); - throw wrappedError; // Re-throw to let BullMQ handle the failure and retry. - } - }, - { - connection, - concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10), - }, -); - -/** - * A dedicated worker for generating weekly analytics reports. - * This is a placeholder for the actual report generation logic. - */ -export const weeklyAnalyticsWorker = new Worker( - 'weekly-analytics-reporting', - async (job: Job) => { - const { reportYear, reportWeek } = job.data; - logger.info( - { reportYear, reportWeek }, - `[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`, - ); - try { - // Simulate a longer-running task for weekly reports - await new Promise((resolve) => setTimeout(resolve, 30000)); // Simulate 30-second task - logger.info( - `[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`, - ); - } catch (error: unknown) { - const wrappedError = normalizeError(error); - // Standardize error logging. - logger.error( - { err: wrappedError, jobData: job.data }, - `[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, - ); - throw wrappedError; // Re-throw to let BullMQ handle the failure and retry. - } - }, - { - connection, - concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10), - }, -); - -/** - * A dedicated worker for cleaning up expired password reset tokens. - */ -export const tokenCleanupWorker = new Worker( - 'token-cleanup', - async (job: Job) => { - const jobLogger = logger.child({ jobId: job.id, jobName: job.name }); - jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.'); - try { - const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger); - jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`); - return { deletedCount }; - } catch (error: unknown) { - const wrappedError = normalizeError(error); - jobLogger.error({ err: wrappedError }, `[TokenCleanupWorker] Job ${job.id} failed.`); - throw wrappedError; - } - }, - { - connection, - concurrency: 1, // This is a low-priority, non-intensive task. - }, -); - -// --- Attach Event Listeners to All Workers --- -attachWorkerEventListeners(flyerWorker); -attachWorkerEventListeners(emailWorker); -attachWorkerEventListeners(analyticsWorker); -attachWorkerEventListeners(cleanupWorker); -attachWorkerEventListeners(weeklyAnalyticsWorker); -attachWorkerEventListeners(tokenCleanupWorker); - -logger.info('All workers started and listening for jobs.'); - -/** - * A function to gracefully shut down all queue workers and connections. - * This is essential for preventing jobs from getting stuck in an 'active' state - * when the application process is terminated. - * @param signal The signal that triggered the shutdown (e.g., 'SIGINT'). + * A function to gracefully shut down all queues and connections. + * This is for the API process which only uses queues. + * For worker processes, use the gracefulShutdown from workers.server.ts */ export const gracefulShutdown = async (signal: string) => { - logger.info(`[Shutdown] Received ${signal}. Closing all workers and queues...`); + logger.info(`[Shutdown] Received ${signal}. Closing all queues...`); let exitCode = 0; // Default to success const resources = [ - { name: 'flyerWorker', close: () => flyerWorker.close() }, - { name: 'emailWorker', close: () => emailWorker.close() }, - { name: 'analyticsWorker', close: () => analyticsWorker.close() }, - { name: 'cleanupWorker', close: () => cleanupWorker.close() }, - { name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() }, - { name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() }, { name: 'flyerQueue', close: () => flyerQueue.close() }, { name: 'emailQueue', close: () => emailQueue.close() }, { name: 'analyticsQueue', close: () => analyticsQueue.close() }, @@ -455,7 +49,7 @@ export const gracefulShutdown = async (signal: string) => { }); if (exitCode === 0) { - logger.info('[Shutdown] All workers, queues, and connections closed successfully.'); + logger.info('[Shutdown] All queues and connections closed successfully.'); } else { logger.warn('[Shutdown] Graceful shutdown completed with errors.'); } diff --git a/src/services/queues.server.ts b/src/services/queues.server.ts new file mode 100644 index 0000000..3609519 --- /dev/null +++ b/src/services/queues.server.ts @@ -0,0 +1,96 @@ +import { Queue } from 'bullmq'; +import { connection } from './redis.server'; +import type { FlyerJobData } from './flyerProcessingService.server'; + +// --- Job Data Interfaces --- + +export interface EmailJobData { + to: string; + subject: string; + text: string; + html: string; +} + +export interface AnalyticsJobData { + reportDate: string; // e.g., '2024-10-26' +} + +export interface WeeklyAnalyticsJobData { + reportYear: number; + reportWeek: number; // ISO week number (1-53) +} + +export interface CleanupJobData { + flyerId: number; + paths?: string[]; +} + +export interface TokenCleanupJobData { + timestamp: string; +} + +// --- Queues --- + +export const flyerQueue = new Queue('flyer-processing', { + connection, + defaultJobOptions: { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000, + }, + }, +}); + +export const emailQueue = new Queue('email-sending', { + connection, + defaultJobOptions: { + attempts: 5, + backoff: { + type: 'exponential', + delay: 10000, + }, + }, +}); + +export const analyticsQueue = new Queue('analytics-reporting', { + connection, + defaultJobOptions: { + attempts: 2, + backoff: { + type: 'exponential', + delay: 60000, + }, + removeOnComplete: true, + removeOnFail: 50, + }, +}); + +export const weeklyAnalyticsQueue = new Queue('weekly-analytics-reporting', { + connection, + defaultJobOptions: { + attempts: 2, + backoff: { type: 'exponential', delay: 3600000 }, + removeOnComplete: true, + removeOnFail: 50, + }, +}); + +export const cleanupQueue = new Queue('file-cleanup', { + connection, + defaultJobOptions: { + attempts: 3, + backoff: { type: 'exponential', delay: 30000 }, + removeOnComplete: true, + }, +}); + +export const tokenCleanupQueue = new Queue('token-cleanup', { + connection, + defaultJobOptions: { + attempts: 2, + backoff: { type: 'exponential', delay: 3600000 }, + removeOnComplete: true, + removeOnFail: 10, + }, +}); \ No newline at end of file diff --git a/src/services/redis.server.ts b/src/services/redis.server.ts new file mode 100644 index 0000000..909f319 --- /dev/null +++ b/src/services/redis.server.ts @@ -0,0 +1,16 @@ +import IORedis from 'ioredis'; +import { logger } from './logger.server'; + +export const connection = new IORedis(process.env.REDIS_URL!, { + maxRetriesPerRequest: null, // Important for BullMQ + password: process.env.REDIS_PASSWORD, +}); + +// --- Redis Connection Event Listeners --- +connection.on('connect', () => { + logger.info('[Redis] Connection established successfully.'); +}); + +connection.on('error', (err) => { + logger.error({ err }, '[Redis] Connection error.'); +}); \ No newline at end of file diff --git a/src/services/worker.ts b/src/services/worker.ts new file mode 100644 index 0000000..f876483 --- /dev/null +++ b/src/services/worker.ts @@ -0,0 +1,30 @@ +import { gracefulShutdown } from './workers.server'; +import { logger } from './logger.server'; + +logger.info('[Worker] Initializing worker process...'); + +// The workers are instantiated as side effects of importing workers.server.ts. +// This pattern ensures they start immediately upon import. + +// Handle graceful shutdown +const handleShutdown = (signal: string) => { + logger.info(`[Worker] Received ${signal}. Initiating graceful shutdown...`); + gracefulShutdown(signal).catch((error: unknown) => { + logger.error({ err: error }, '[Worker] Error during shutdown.'); + process.exit(1); + }); +}; + +process.on('SIGINT', () => handleShutdown('SIGINT')); +process.on('SIGTERM', () => handleShutdown('SIGTERM')); + +// Catch unhandled errors to log them before crashing +process.on('uncaughtException', (err) => { + logger.error({ err }, '[Worker] Uncaught exception'); +}); + +process.on('unhandledRejection', (reason, promise) => { + logger.error({ reason, promise }, '[Worker] Unhandled Rejection'); +}); + +logger.info('[Worker] Worker process is running and listening for jobs.'); \ No newline at end of file diff --git a/src/services/workers.server.test.ts b/src/services/workers.server.test.ts new file mode 100644 index 0000000..c3c3138 --- /dev/null +++ b/src/services/workers.server.test.ts @@ -0,0 +1,346 @@ +// src/services/workers.server.test.ts +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { Job } from 'bullmq'; + +// --- Hoisted Mocks --- +const mocks = vi.hoisted(() => { + // This object will store the processor functions captured from the worker constructors. + const capturedProcessors: Record Promise> = {}; + + return { + sendEmail: vi.fn(), + unlink: vi.fn(), + processFlyerJob: vi.fn(), + capturedProcessors, + deleteExpiredResetTokens: vi.fn(), + // Mock the Worker constructor to capture the processor function. It must be a + // `function` and not an arrow function so it can be called with `new`. + MockWorker: vi.fn(function (name: string, processor: (job: Job) => Promise) { + if (processor) { + capturedProcessors[name] = processor; + } + // Return a mock worker instance, though it's not used in this test file. + return { on: vi.fn(), close: vi.fn() }; + }), + }; +}); + +// --- Mock Modules --- +vi.mock('./emailService.server', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + // We only need to mock the specific function being called by the worker. + // The rest of the module can retain its original implementation if needed elsewhere. + sendEmail: mocks.sendEmail, + }; +}); + +// The workers use an `fsAdapter`. We can mock the underlying `fsPromises` +// that the adapter is built from in queueService.server.ts. +vi.mock('node:fs/promises', () => ({ + default: { + unlink: mocks.unlink, + // Add other fs functions if needed by other tests + readdir: vi.fn(), + }, +})); + +vi.mock('./logger.server', () => ({ + logger: { + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + child: vi.fn().mockReturnThis(), + }, +})); + +vi.mock('./db/index.db', () => ({ + userRepo: { + deleteExpiredResetTokens: mocks.deleteExpiredResetTokens, + }, +})); + +// Mock bullmq to capture the processor functions passed to the Worker constructor +import { logger as mockLogger } from './logger.server'; +vi.mock('bullmq', () => ({ + Worker: mocks.MockWorker, + // FIX: Use a standard function for the mock constructor to allow `new Queue(...)` to work. + Queue: vi.fn(function () { + return { add: vi.fn() }; + }), +})); + +// Mock flyerProcessingService.server as flyerWorker depends on it +vi.mock('./flyerProcessingService.server', () => ({ + FlyerProcessingService: class { + processJob = mocks.processFlyerJob; + }, +})); + +// Mock flyerDataTransformer as it's a dependency of FlyerProcessingService +vi.mock('./flyerDataTransformer', () => ({ + FlyerDataTransformer: class { + transform = vi.fn(); // Mock transform method + }, +})); + +// Helper to create a mock BullMQ Job object +const createMockJob = (data: T): Job => { + return { + id: 'job-1', + data, + updateProgress: vi.fn().mockResolvedValue(undefined), + log: vi.fn().mockResolvedValue(undefined), + opts: { attempts: 3 }, + attemptsMade: 1, + trace: vi.fn().mockResolvedValue(undefined), + moveToCompleted: vi.fn().mockResolvedValue(undefined), + moveToFailed: vi.fn().mockResolvedValue(undefined), + } as unknown as Job; +}; + +describe('Queue Workers', () => { + // These will hold the captured processor functions for each test. + let flyerProcessor: (job: Job) => Promise; + let emailProcessor: (job: Job) => Promise; + let analyticsProcessor: (job: Job) => Promise; + let cleanupProcessor: (job: Job) => Promise; + let weeklyAnalyticsProcessor: (job: Job) => Promise; + let tokenCleanupProcessor: (job: Job) => Promise; + + beforeEach(async () => { + vi.clearAllMocks(); + + // Reset default mock implementations for hoisted mocks + mocks.sendEmail.mockResolvedValue(undefined); + mocks.unlink.mockResolvedValue(undefined); + mocks.processFlyerJob.mockResolvedValue({ flyerId: 123 }); // Default success for flyer processing + mocks.deleteExpiredResetTokens.mockResolvedValue(5); + + // Reset modules to re-evaluate the workers.server.ts file with fresh mocks. + // This ensures that new worker instances are created and their processors are captured for each test. + vi.resetModules(); + + // Dynamically import the module under test AFTER mocks are reset. + // This will trigger the instantiation of the workers, and our mocked Worker constructor will capture the processors. + await import('./workers.server'); + + // Re-capture the processors for each test to ensure isolation. + flyerProcessor = mocks.capturedProcessors['flyer-processing']; + emailProcessor = mocks.capturedProcessors['email-sending']; + analyticsProcessor = mocks.capturedProcessors['analytics-reporting']; + cleanupProcessor = mocks.capturedProcessors['file-cleanup']; + weeklyAnalyticsProcessor = mocks.capturedProcessors['weekly-analytics-reporting']; + tokenCleanupProcessor = mocks.capturedProcessors['token-cleanup']; + }); + + describe('flyerWorker', () => { + it('should call flyerProcessingService.processJob with the job data', async () => { + const jobData = { + filePath: '/tmp/flyer.pdf', + originalFileName: 'flyer.pdf', + checksum: 'abc', + }; + const job = createMockJob(jobData); + + await flyerProcessor(job); + + expect(mocks.processFlyerJob).toHaveBeenCalledTimes(1); + expect(mocks.processFlyerJob).toHaveBeenCalledWith(job); + }); + + it('should re-throw an error if flyerProcessingService.processJob fails', async () => { + const job = createMockJob({ + filePath: '/tmp/fail.pdf', + originalFileName: 'fail.pdf', + checksum: 'def', + }); + const processingError = new Error('Flyer processing failed'); + mocks.processFlyerJob.mockRejectedValue(processingError); + + await expect(flyerProcessor(job)).rejects.toThrow('Flyer processing failed'); + }); + }); + + describe('emailWorker', () => { + it('should call emailService.sendEmail with the job data', async () => { + const jobData = { + to: 'test@example.com', + subject: 'Test Email', + html: '

Hello

', + text: 'Hello', + }; + const job = createMockJob(jobData); + + await emailProcessor(job); + + expect(mocks.sendEmail).toHaveBeenCalledTimes(1); + // The implementation passes the logger as the second argument + expect(mocks.sendEmail).toHaveBeenCalledWith(jobData, expect.anything()); + }); + + it('should log and re-throw an error if sendEmail fails with a non-Error object', async () => { + const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' }); + const emailError = 'SMTP server is down'; // Reject with a string + mocks.sendEmail.mockRejectedValue(emailError); + + await expect(emailProcessor(job)).rejects.toThrow(emailError); + + // The worker should wrap the string in an Error object for logging + expect(mockLogger.error).toHaveBeenCalledWith( + { err: new Error(emailError), jobData: job.data }, + `[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, + ); + }); + + it('should re-throw an error if sendEmail fails', async () => { + const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' }); + const emailError = new Error('SMTP server is down'); + mocks.sendEmail.mockRejectedValue(emailError); + + await expect(emailProcessor(job)).rejects.toThrow('SMTP server is down'); + expect(mockLogger.error).toHaveBeenCalledWith( + { err: emailError, jobData: job.data }, + `[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, + ); + }); + }); + + describe('analyticsWorker', () => { + it('should complete successfully for a valid report date', async () => { + vi.useFakeTimers(); + const job = createMockJob({ reportDate: '2024-01-01' }); + + const promise = analyticsProcessor(job); + // Advance timers to simulate the 10-second task completing + await vi.advanceTimersByTimeAsync(10000); + await promise; // Wait for the promise to resolve + + // No error should be thrown + expect(true).toBe(true); + vi.useRealTimers(); + }); + + it('should throw an error if reportDate is "FAIL"', async () => { + const job = createMockJob({ reportDate: 'FAIL' }); + + await expect(analyticsProcessor(job)).rejects.toThrow( + 'This is a test failure for the analytics job.', + ); + }); + }); + + describe('cleanupWorker', () => { + it('should call unlink for each path provided in the job data', async () => { + const jobData = { + flyerId: 123, + paths: ['/tmp/file1.jpg', '/tmp/file2.pdf'], + }; + const job = createMockJob(jobData); + mocks.unlink.mockResolvedValue(undefined); + + await cleanupProcessor(job); + + expect(mocks.unlink).toHaveBeenCalledTimes(2); + expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file1.jpg'); + expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file2.pdf'); + }); + + it('should not throw an error if a file is already deleted (ENOENT)', async () => { + const jobData = { + flyerId: 123, + paths: ['/tmp/existing.jpg', '/tmp/already-deleted.jpg'], + }; + const job = createMockJob(jobData); + // Use the built-in NodeJS.ErrnoException type for mock system errors. + const enoentError: NodeJS.ErrnoException = new Error('File not found'); + enoentError.code = 'ENOENT'; + + // First call succeeds, second call fails with ENOENT + mocks.unlink.mockResolvedValueOnce(undefined).mockRejectedValueOnce(enoentError); + + // The processor should complete without throwing + await expect(cleanupProcessor(job)).resolves.toBeUndefined(); + + expect(mocks.unlink).toHaveBeenCalledTimes(2); + }); + + it('should re-throw an error for issues other than ENOENT (e.g., permissions)', async () => { + const jobData = { + flyerId: 123, + paths: ['/tmp/protected-file.jpg'], + }; + const job = createMockJob(jobData); + // Use the built-in NodeJS.ErrnoException type for mock system errors. + const permissionError: NodeJS.ErrnoException = new Error('Permission denied'); + permissionError.code = 'EACCES'; + + mocks.unlink.mockRejectedValue(permissionError); + + await expect(cleanupProcessor(job)).rejects.toThrow('Permission denied'); + + // Verify the error was logged by the worker's catch block + expect(mockLogger.error).toHaveBeenCalledWith( + { err: permissionError }, + expect.stringContaining( + `[CleanupWorker] Job ${job.id} for flyer ${job.data.flyerId} failed.`, + ), + ); + }); + }); + + describe('weeklyAnalyticsWorker', () => { + it('should complete successfully for a valid report date', async () => { + vi.useFakeTimers(); + const job = createMockJob({ reportYear: 2024, reportWeek: 1 }); + + const promise = weeklyAnalyticsProcessor(job); + // Advance timers to simulate the 30-second task completing + await vi.advanceTimersByTimeAsync(30000); + await promise; // Wait for the promise to resolve + + // No error should be thrown + expect(true).toBe(true); + vi.useRealTimers(); + }); + + it('should re-throw an error if the job fails', async () => { + vi.useFakeTimers(); + const job = createMockJob({ reportYear: 2024, reportWeek: 1 }); + // Mock the internal logic to throw an error + const originalSetTimeout = setTimeout; + vi.spyOn(global, 'setTimeout').mockImplementation((callback, ms) => { + if (ms === 30000) { + // Target the simulated delay + throw new Error('Weekly analytics job failed'); + } + return originalSetTimeout(callback, ms); + }); + + await expect(weeklyAnalyticsProcessor(job)).rejects.toThrow('Weekly analytics job failed'); + vi.useRealTimers(); + vi.restoreAllMocks(); // Restore setTimeout mock + }); + }); + + describe('tokenCleanupWorker', () => { + it('should call userRepo.deleteExpiredResetTokens and return the count', async () => { + const job = createMockJob({ timestamp: new Date().toISOString() }); + mocks.deleteExpiredResetTokens.mockResolvedValue(10); + + const result = await tokenCleanupProcessor(job); + + expect(mocks.deleteExpiredResetTokens).toHaveBeenCalledTimes(1); + expect(result).toEqual({ deletedCount: 10 }); + }); + + it('should re-throw an error if the database call fails', async () => { + const job = createMockJob({ timestamp: new Date().toISOString() }); + const dbError = new Error('DB cleanup failed'); + mocks.deleteExpiredResetTokens.mockRejectedValue(dbError); + await expect(tokenCleanupProcessor(job)).rejects.toThrow(dbError); + }); + }); +}); diff --git a/src/services/workers.server.ts b/src/services/workers.server.ts new file mode 100644 index 0000000..5efa908 --- /dev/null +++ b/src/services/workers.server.ts @@ -0,0 +1,344 @@ +import { Worker, Job, UnrecoverableError } from 'bullmq'; +import fsPromises from 'node:fs/promises'; +import { exec } from 'child_process'; +import { promisify } from 'util'; + +import { logger } from './logger.server'; +import { connection } from './redis.server'; +import { aiService } from './aiService.server'; +import * as emailService from './emailService.server'; +import * as db from './db/index.db'; +import { + FlyerProcessingService, + type FlyerJobData, + type IFileSystem, +} from './flyerProcessingService.server'; +import { FlyerDataTransformer } from './flyerDataTransformer'; +import { + flyerQueue, + emailQueue, + analyticsQueue, + weeklyAnalyticsQueue, + cleanupQueue, + tokenCleanupQueue, + type EmailJobData, + type AnalyticsJobData, + type CleanupJobData, + type WeeklyAnalyticsJobData, + type TokenCleanupJobData, +} from './queues.server'; + +const execAsync = promisify(exec); + +// --- Worker Instantiation --- + +const fsAdapter: IFileSystem = { + readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options), + unlink: (path: string) => fsPromises.unlink(path), +}; + +const flyerProcessingService = new FlyerProcessingService( + aiService, + db, + fsAdapter, + execAsync, + cleanupQueue, + new FlyerDataTransformer(), +); + +const normalizeError = (error: unknown): Error => { + return error instanceof Error ? error : new Error(String(error)); +}; + +const attachWorkerEventListeners = (worker: Worker) => { + worker.on('completed', (job: Job, returnValue: unknown) => { + logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`); + }); + + worker.on('failed', (job: Job | undefined, error: Error) => { + logger.error( + { err: error, jobData: job?.data }, + `[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`, + ); + }); +}; + +export const flyerWorker = new Worker( + 'flyer-processing', + async (job) => { + try { + return await flyerProcessingService.processJob(job); + } catch (error: unknown) { + const wrappedError = normalizeError(error); + const errorMessage = wrappedError.message || ''; + if ( + errorMessage.includes('quota') || + errorMessage.includes('429') || + errorMessage.includes('RESOURCE_EXHAUSTED') + ) { + logger.error( + { err: wrappedError, jobId: job.id }, + '[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.', + ); + throw new UnrecoverableError(errorMessage); + } + throw error; + } + }, + { + connection, + concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10), + }, +); + +export const emailWorker = new Worker( + 'email-sending', + async (job: Job) => { + const { to, subject } = job.data; + const jobLogger = logger.child({ jobId: job.id, jobName: job.name }); + jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`); + try { + await emailService.sendEmail(job.data, jobLogger); + } catch (error: unknown) { + const wrappedError = normalizeError(error); + logger.error( + { + err: wrappedError, + jobData: job.data, + }, + `[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, + ); + throw wrappedError; + } + }, + { + connection, + concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10), + }, +); + +export const analyticsWorker = new Worker( + 'analytics-reporting', + async (job: Job) => { + const { reportDate } = job.data; + logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`); + try { + if (reportDate === 'FAIL') { + throw new Error('This is a test failure for the analytics job.'); + } + await new Promise((resolve) => setTimeout(resolve, 10000)); + logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`); + } catch (error: unknown) { + const wrappedError = normalizeError(error); + logger.error({ err: wrappedError, jobData: job.data }, + `[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, + ); + throw wrappedError; + } + }, + { + connection, + concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10), + }, +); + +export const cleanupWorker = new Worker( + 'file-cleanup', + async (job: Job) => { + const { flyerId, paths } = job.data; + logger.info( + { paths }, + `[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`, + ); + + try { + if (!paths || paths.length === 0) { + logger.warn( + `[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`, + ); + return; + } + + for (const filePath of paths) { + try { + await fsAdapter.unlink(filePath); + logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`); + } catch (unlinkError: unknown) { + if ( + unlinkError instanceof Error && + 'code' in unlinkError && + (unlinkError as any).code === 'ENOENT' + ) { + logger.warn( + `[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`, + ); + } else { + throw unlinkError; + } + } + } + logger.info( + `[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`, + ); + } catch (error: unknown) { + const wrappedError = normalizeError(error); + logger.error( + { err: wrappedError }, + `[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, + ); + throw wrappedError; + } + }, + { + connection, + concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10), + }, +); + +export const weeklyAnalyticsWorker = new Worker( + 'weekly-analytics-reporting', + async (job: Job) => { + const { reportYear, reportWeek } = job.data; + logger.info( + { reportYear, reportWeek }, + `[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`, + ); + try { + await new Promise((resolve) => setTimeout(resolve, 30000)); + logger.info( + `[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`, + ); + } catch (error: unknown) { + const wrappedError = normalizeError(error); + logger.error( + { err: wrappedError, jobData: job.data }, + `[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, + ); + throw wrappedError; + } + }, + { + connection, + concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10), + }, +); + +export const tokenCleanupWorker = new Worker( + 'token-cleanup', + async (job: Job) => { + const jobLogger = logger.child({ jobId: job.id, jobName: job.name }); + jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.'); + try { + const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger); + jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`); + return { deletedCount }; + } catch (error: unknown) { + const wrappedError = normalizeError(error); + jobLogger.error({ err: wrappedError }, `[TokenCleanupWorker] Job ${job.id} failed.`); + throw wrappedError; + } + }, + { + connection, + concurrency: 1, + }, +); + +attachWorkerEventListeners(flyerWorker); +attachWorkerEventListeners(emailWorker); +attachWorkerEventListeners(analyticsWorker); +attachWorkerEventListeners(cleanupWorker); +attachWorkerEventListeners(weeklyAnalyticsWorker); +attachWorkerEventListeners(tokenCleanupWorker); + +logger.info('All workers started and listening for jobs.'); + +const SHUTDOWN_TIMEOUT = 30000; // 30 seconds + +export const gracefulShutdown = async (signal: string) => { + logger.info( + `[Shutdown] Received ${signal}. Initiating graceful shutdown (timeout: ${SHUTDOWN_TIMEOUT / 1000}s)...`, + ); + + const shutdownPromise = (async () => { + let hasErrors = false; + + // Helper function to close a group of resources and log results + const closeResources = async (resources: { name: string; close: () => Promise }[], type: string) => { + logger.info(`[Shutdown] Closing all ${type}...`); + const results = await Promise.allSettled(resources.map((r) => r.close())); + let groupHasErrors = false; + + results.forEach((result, index) => { + if (result.status === 'rejected') { + groupHasErrors = true; + logger.error( + { err: result.reason, resource: resources[index].name }, + `[Shutdown] Error closing ${resources[index].name}.`, + ); + } + }); + + if (!groupHasErrors) logger.info(`[Shutdown] All ${type} closed successfully.`); + return groupHasErrors; + }; + + // Define resource groups for sequential shutdown + const workerResources = [ + { name: 'flyerWorker', close: () => flyerWorker.close() }, + { name: 'emailWorker', close: () => emailWorker.close() }, + { name: 'analyticsWorker', close: () => analyticsWorker.close() }, + { name: 'cleanupWorker', close: () => cleanupWorker.close() }, + { name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() }, + { name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() }, + ]; + + const queueResources = [ + { name: 'flyerQueue', close: () => flyerQueue.close() }, + { name: 'emailQueue', close: () => emailQueue.close() }, + { name: 'analyticsQueue', close: () => analyticsQueue.close() }, + { name: 'cleanupQueue', close: () => cleanupQueue.close() }, + { name: 'weeklyAnalyticsQueue', close: () => weeklyAnalyticsQueue.close() }, + { name: 'tokenCleanupQueue', close: () => tokenCleanupQueue.close() }, + ]; + + // 1. Close workers first + if (await closeResources(workerResources, 'workers')) hasErrors = true; + + // 2. Then close queues + if (await closeResources(queueResources, 'queues')) hasErrors = true; + + // 3. Finally, close the Redis connection + logger.info('[Shutdown] Closing Redis connection...'); + try { + await connection.quit(); + logger.info('[Shutdown] Redis connection closed successfully.'); + } catch (err) { + hasErrors = true; + logger.error({ err, resource: 'redisConnection' }, `[Shutdown] Error closing Redis connection.`); + } + + return hasErrors; + })(); + + const timeoutPromise = new Promise((resolve) => + setTimeout(() => resolve('timeout'), SHUTDOWN_TIMEOUT), + ); + + const result = await Promise.race([shutdownPromise, timeoutPromise]); + + if (result === 'timeout') { + logger.error( + `[Shutdown] Graceful shutdown timed out after ${SHUTDOWN_TIMEOUT / 1000} seconds. Forcing exit.`, + ); + process.exit(1); + } else { + const hasErrors = result as boolean; + if (!hasErrors) { + logger.info('[Shutdown] All resources closed successfully.'); + } else { + logger.warn('[Shutdown] Graceful shutdown completed with errors.'); + } + process.exit(hasErrors ? 1 : 0); + } +};