// src/services/workers.server.ts import { Worker, Job } from 'bullmq'; import fsPromises from 'node:fs/promises'; import { exec } from 'child_process'; import { promisify } from 'util'; import { logger } from './logger.server'; import { connection } from './redis.server'; import { aiService } from './aiService.server'; import { analyticsService } from './analyticsService.server'; import { userService } from './userService'; import * as emailService from './emailService.server'; import * as db from './db/index.db'; import { FlyerProcessingService } from './flyerProcessingService.server'; import { FlyerAiProcessor } from './flyerAiProcessor.server'; import { FlyerDataTransformer } from './flyerDataTransformer'; import { FlyerPersistenceService } from './flyerPersistenceService.server'; import { withTransaction } from './db/connection.db'; import { cleanupQueue, flyerQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue, receiptQueue, expiryAlertQueue, barcodeQueue, } from './queues.server'; import type { FlyerJobData, EmailJobData, AnalyticsJobData, WeeklyAnalyticsJobData, CleanupJobData, TokenCleanupJobData, ReceiptJobData, ExpiryAlertJobData, BarcodeDetectionJobData, } from '../types/job-data'; import * as receiptService from './receiptService.server'; import * as expiryService from './expiryService.server'; import * as barcodeService from './barcodeService.server'; import { FlyerFileHandler, type IFileSystem } from './flyerFileHandler.server'; import { defaultWorkerOptions } from '../config/workerOptions'; const execAsync = promisify(exec); // --- Worker Instantiation --- export const fsAdapter: IFileSystem = { readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options), unlink: (path: string) => fsPromises.unlink(path), rename: (oldPath: string, newPath: string) => fsPromises.rename(oldPath, newPath), }; // Create a singleton instance of the FlyerProcessingService. // NOTE: In Vitest integration tests, globalSetup runs in a separate Node.js context from test files. // This means the singleton created here is NOT accessible from test files - tests get their own instance. // For tests that need to inject mocks into the worker's service, use an API-based mechanism or // mark them as .todo() until a cross-context mock injection mechanism is implemented. export const flyerProcessingService = new FlyerProcessingService( new FlyerFileHandler(fsAdapter, execAsync), new FlyerAiProcessor(aiService, db.personalizationRepo), fsAdapter, cleanupQueue, new FlyerDataTransformer(), new FlyerPersistenceService(withTransaction), ); const normalizeError = (error: unknown): Error => { return error instanceof Error ? error : new Error(String(error)); }; /** * Creates a higher-order function to wrap worker processors with common logic. * This includes error normalization to ensure that any thrown value is an Error instance, * which is a best practice for BullMQ workers. * @param processor The core logic for the worker. * @returns An async function that takes a job and executes the processor. */ const createWorkerProcessor = (processor: (job: Job) => Promise) => { return async (job: Job) => { try { return await processor(job); } catch (error: unknown) { // The service layer now handles detailed logging. This block just ensures // any unexpected errors are normalized before BullMQ handles them. throw normalizeError(error); } }; }; const attachWorkerEventListeners = (worker: Worker) => { worker.on('completed', (job: Job, returnValue: unknown) => { logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`); }); worker.on('failed', (job: Job | undefined, error: Error) => { logger.error( { err: error, jobData: job?.data }, `[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`, ); }); }; export const flyerWorker = new Worker( 'flyer-processing', createWorkerProcessor((job) => flyerProcessingService.processJob(job)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10), // Increase lock duration to prevent jobs from being re-processed prematurely. // We use the env var if set, otherwise fallback to the defaultWorkerOptions value (30000) lockDuration: parseInt( process.env.WORKER_LOCK_DURATION || String(defaultWorkerOptions.lockDuration), 10, ), }, ); export const emailWorker = new Worker( 'email-sending', createWorkerProcessor((job) => emailService.processEmailJob(job)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10), }, ); export const analyticsWorker = new Worker( 'analytics-reporting', createWorkerProcessor((job) => analyticsService.processDailyReportJob(job)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10), }, ); export const cleanupWorker = new Worker( 'file-cleanup', createWorkerProcessor((job) => flyerProcessingService.processCleanupJob(job)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10), }, ); export const weeklyAnalyticsWorker = new Worker( 'weekly-analytics-reporting', createWorkerProcessor((job) => analyticsService.processWeeklyReportJob(job)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10), }, ); export const tokenCleanupWorker = new Worker( 'token-cleanup', createWorkerProcessor((job) => userService.processTokenCleanupJob(job)), { ...defaultWorkerOptions, connection, concurrency: 1, }, ); export const receiptWorker = new Worker( 'receipt-processing', createWorkerProcessor((job) => receiptService.processReceiptJob(job, logger)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.RECEIPT_WORKER_CONCURRENCY || '2', 10), }, ); export const expiryAlertWorker = new Worker( 'expiry-alerts', createWorkerProcessor((job) => expiryService.processExpiryAlertJob(job, logger)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.EXPIRY_ALERT_WORKER_CONCURRENCY || '1', 10), }, ); export const barcodeWorker = new Worker( 'barcode-detection', createWorkerProcessor((job) => barcodeService.processBarcodeDetectionJob(job, logger)), { ...defaultWorkerOptions, connection, concurrency: parseInt(process.env.BARCODE_WORKER_CONCURRENCY || '2', 10), }, ); attachWorkerEventListeners(flyerWorker); attachWorkerEventListeners(emailWorker); attachWorkerEventListeners(analyticsWorker); attachWorkerEventListeners(cleanupWorker); attachWorkerEventListeners(weeklyAnalyticsWorker); attachWorkerEventListeners(tokenCleanupWorker); attachWorkerEventListeners(receiptWorker); attachWorkerEventListeners(expiryAlertWorker); attachWorkerEventListeners(barcodeWorker); logger.info('All workers started and listening for jobs.'); const SHUTDOWN_TIMEOUT = 30000; // 30 seconds /** * Closes all workers. Used primarily for integration testing to ensure clean teardown * without exiting the process. */ export const closeWorkers = async () => { await Promise.all([ flyerWorker.close(), emailWorker.close(), analyticsWorker.close(), cleanupWorker.close(), weeklyAnalyticsWorker.close(), tokenCleanupWorker.close(), receiptWorker.close(), expiryAlertWorker.close(), barcodeWorker.close(), ]); }; export const gracefulShutdown = async (signal: string) => { logger.info( `[Shutdown] Received ${signal}. Initiating graceful shutdown (timeout: ${SHUTDOWN_TIMEOUT / 1000}s)...`, ); const shutdownPromise = (async () => { let hasErrors = false; // Helper function to close a group of resources and log results const closeResources = async ( resources: { name: string; close: () => Promise }[], type: string, ) => { logger.info(`[Shutdown] Closing all ${type}...`); const results = await Promise.allSettled(resources.map((r) => r.close())); let groupHasErrors = false; results.forEach((result, index) => { if (result.status === 'rejected') { groupHasErrors = true; logger.error( { err: result.reason, resource: resources[index].name }, `[Shutdown] Error closing ${resources[index].name}.`, ); } }); if (!groupHasErrors) logger.info(`[Shutdown] All ${type} closed successfully.`); return groupHasErrors; }; // Define resource groups for sequential shutdown const workerResources = [ { name: 'flyerWorker', close: () => flyerWorker.close() }, { name: 'emailWorker', close: () => emailWorker.close() }, { name: 'analyticsWorker', close: () => analyticsWorker.close() }, { name: 'cleanupWorker', close: () => cleanupWorker.close() }, { name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() }, { name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() }, { name: 'receiptWorker', close: () => receiptWorker.close() }, { name: 'expiryAlertWorker', close: () => expiryAlertWorker.close() }, { name: 'barcodeWorker', close: () => barcodeWorker.close() }, ]; const queueResources = [ { name: 'flyerQueue', close: () => flyerQueue.close() }, { name: 'emailQueue', close: () => emailQueue.close() }, { name: 'analyticsQueue', close: () => analyticsQueue.close() }, { name: 'cleanupQueue', close: () => cleanupQueue.close() }, { name: 'weeklyAnalyticsQueue', close: () => weeklyAnalyticsQueue.close() }, { name: 'tokenCleanupQueue', close: () => tokenCleanupQueue.close() }, { name: 'receiptQueue', close: () => receiptQueue.close() }, { name: 'expiryAlertQueue', close: () => expiryAlertQueue.close() }, { name: 'barcodeQueue', close: () => barcodeQueue.close() }, ]; // 1. Close workers first if (await closeResources(workerResources, 'workers')) hasErrors = true; // 2. Then close queues if (await closeResources(queueResources, 'queues')) hasErrors = true; // 3. Finally, close the Redis connection logger.info('[Shutdown] Closing Redis connection...'); try { await connection.quit(); logger.info('[Shutdown] Redis connection closed successfully.'); } catch (err) { hasErrors = true; logger.error( { err, resource: 'redisConnection' }, `[Shutdown] Error closing Redis connection.`, ); } return hasErrors; })(); const timeoutPromise = new Promise((resolve) => setTimeout(() => resolve('timeout'), SHUTDOWN_TIMEOUT), ); const result = await Promise.race([shutdownPromise, timeoutPromise]); if (result === 'timeout') { logger.error( `[Shutdown] Graceful shutdown timed out after ${SHUTDOWN_TIMEOUT / 1000} seconds. Forcing exit.`, ); process.exit(1); } else { const hasErrors = result as boolean; if (!hasErrors) { logger.info('[Shutdown] All resources closed successfully.'); } else { logger.warn('[Shutdown] Graceful shutdown completed with errors.'); } process.exit(hasErrors ? 1 : 0); } };