// src/services/queueService.workers.test.ts import { describe, it, expect, vi, beforeEach } from 'vitest'; import type { Job } from 'bullmq'; // --- Hoisted Mocks --- const mocks = vi.hoisted(() => { // This object will store the processor functions captured from the worker constructors. const capturedProcessors: Record Promise> = {}; return { // Service method mocks processFlyerJob: vi.fn(), processCleanupJob: vi.fn(), processEmailJob: vi.fn(), processDailyReportJob: vi.fn(), processWeeklyReportJob: vi.fn(), processTokenCleanupJob: vi.fn(), // Test utilities capturedProcessors, // Mock the Worker constructor to capture the processor function. It must be a` // `function` and not an arrow function so it can be called with `new`. MockWorker: vi.fn(function (name: string, processor: (job: Job) => Promise) { if (processor) { capturedProcessors[name] = processor; } // Return a mock worker instance, though it's not used in this test file. return { on: vi.fn(), close: vi.fn() }; }), }; }); // --- Mock Modules --- vi.mock('./emailService.server', () => ({ processEmailJob: mocks.processEmailJob, })); vi.mock('./analyticsService.server', () => ({ analyticsService: { processDailyReportJob: mocks.processDailyReportJob, processWeeklyReportJob: mocks.processWeeklyReportJob, }, })); vi.mock('./userService', () => ({ userService: { processTokenCleanupJob: mocks.processTokenCleanupJob, }, })); vi.mock('./logger.server', () => ({ logger: { info: vi.fn(), error: vi.fn(), warn: vi.fn(), debug: vi.fn(), child: vi.fn().mockReturnThis(), }, })); // Mock bullmq to capture the processor functions passed to the Worker constructor vi.mock('bullmq', () => ({ Worker: mocks.MockWorker, Queue: vi.fn(function () { return { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }; }), // Add UnrecoverableError to the mock so it can be used in tests UnrecoverableError: class UnrecoverableError extends Error {}, })); // Mock redis.server to prevent real Redis connection attempts vi.mock('./redis.server', () => ({ connection: { on: vi.fn(), quit: vi.fn().mockResolvedValue(undefined), }, })); // Mock queues.server to provide mock queue instances vi.mock('./queues.server', () => ({ flyerQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, emailQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, analyticsQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, cleanupQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, weeklyAnalyticsQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, tokenCleanupQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, receiptQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, expiryAlertQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, barcodeQueue: { add: vi.fn(), close: vi.fn().mockResolvedValue(undefined) }, })); // Mock flyerProcessingService.server as flyerWorker and cleanupWorker depend on it vi.mock('./flyerProcessingService.server', () => { // Mock the constructor to return an object with the mocked methods return { FlyerProcessingService: vi.fn().mockImplementation(function () { return { processJob: mocks.processFlyerJob, processCleanupJob: mocks.processCleanupJob, }; }), }; }); // Mock flyerDataTransformer as it's a dependency of FlyerProcessingService vi.mock('./flyerDataTransformer', () => ({ FlyerDataTransformer: class { transform = vi.fn(); // Mock transform method }, })); // Mock aiService.server to prevent initialization issues vi.mock('./aiService.server', () => ({ aiService: { extractAndValidateData: vi.fn(), }, })); // Mock db/index.db to prevent database connections vi.mock('./db/index.db', () => ({ personalizationRepo: {}, })); // Mock flyerAiProcessor.server vi.mock('./flyerAiProcessor.server', () => ({ FlyerAiProcessor: vi.fn().mockImplementation(function () { return { processFlyer: vi.fn() }; }), })); // Mock flyerPersistenceService.server vi.mock('./flyerPersistenceService.server', () => ({ FlyerPersistenceService: vi.fn().mockImplementation(function () { return { persistFlyerData: vi.fn() }; }), })); // Mock db/connection.db to prevent database connections vi.mock('./db/connection.db', () => ({ withTransaction: vi.fn(), })); // Mock receiptService.server vi.mock('./receiptService.server', () => ({ processReceiptJob: vi.fn().mockResolvedValue(undefined), })); // Mock expiryService.server vi.mock('./expiryService.server', () => ({ processExpiryAlertJob: vi.fn().mockResolvedValue(undefined), })); // Mock barcodeService.server vi.mock('./barcodeService.server', () => ({ processBarcodeDetectionJob: vi.fn().mockResolvedValue(undefined), })); // Mock flyerFileHandler.server vi.mock('./flyerFileHandler.server', () => ({ FlyerFileHandler: vi.fn().mockImplementation(function () { return { handleFile: vi.fn() }; }), })); // Mock workerOptions config vi.mock('../config/workerOptions', () => ({ defaultWorkerOptions: { lockDuration: 30000, stalledInterval: 30000, }, })); // Helper to create a mock BullMQ Job object const createMockJob = (data: T): Job => { return { id: 'job-1', data, updateProgress: vi.fn().mockResolvedValue(undefined), log: vi.fn().mockResolvedValue(undefined), opts: { attempts: 3 }, attemptsMade: 1, trace: vi.fn().mockResolvedValue(undefined), moveToCompleted: vi.fn().mockResolvedValue(undefined), moveToFailed: vi.fn().mockResolvedValue(undefined), } as unknown as Job; }; describe('Queue Workers', () => { let flyerProcessor: (job: Job) => Promise; let emailProcessor: (job: Job) => Promise; let analyticsProcessor: (job: Job) => Promise; let cleanupProcessor: (job: Job) => Promise; let weeklyAnalyticsProcessor: (job: Job) => Promise; let tokenCleanupProcessor: (job: Job) => Promise; beforeEach(async () => { // Reset default mock implementations for hoisted mocks mocks.processFlyerJob.mockResolvedValue({ flyerId: 123 }); mocks.processCleanupJob.mockResolvedValue({ status: 'success' }); mocks.processEmailJob.mockResolvedValue(undefined); mocks.processDailyReportJob.mockResolvedValue({ status: 'success' }); mocks.processWeeklyReportJob.mockResolvedValue({ status: 'success' }); mocks.processTokenCleanupJob.mockResolvedValue({ deletedCount: 5 }); vi.clearAllMocks(); vi.resetModules(); await import('./workers.server'); flyerProcessor = mocks.capturedProcessors['flyer-processing']; emailProcessor = mocks.capturedProcessors['email-sending']; analyticsProcessor = mocks.capturedProcessors['analytics-reporting']; cleanupProcessor = mocks.capturedProcessors['file-cleanup']; weeklyAnalyticsProcessor = mocks.capturedProcessors['weekly-analytics-reporting']; tokenCleanupProcessor = mocks.capturedProcessors['token-cleanup']; }); describe('flyerWorker', () => { it('should call flyerProcessingService.processJob with the job data', async () => { const jobData = { filePath: '/tmp/flyer.pdf', originalFileName: 'flyer.pdf', checksum: 'abc', }; const job = createMockJob(jobData); await flyerProcessor(job); expect(mocks.processFlyerJob).toHaveBeenCalledTimes(1); expect(mocks.processFlyerJob).toHaveBeenCalledWith(job); }); it('should re-throw an error if flyerProcessingService.processJob fails', async () => { const job = createMockJob({ filePath: '/tmp/fail.pdf', originalFileName: 'fail.pdf', checksum: 'def', }); const processingError = new Error('Flyer processing failed'); mocks.processFlyerJob.mockRejectedValue(processingError); await expect(flyerProcessor(job)).rejects.toThrow('Flyer processing failed'); }); it('should re-throw UnrecoverableError from the service layer', async () => { const { UnrecoverableError } = await import('bullmq'); const job = createMockJob({ filePath: '/tmp/fail.pdf', originalFileName: 'fail.pdf', checksum: 'def', }); const unrecoverableError = new UnrecoverableError('Quota exceeded'); mocks.processFlyerJob.mockRejectedValue(unrecoverableError); // The worker should just let this specific error type pass through. await expect(flyerProcessor(job)).rejects.toThrow(unrecoverableError); }); }); describe('emailWorker', () => { it('should call emailService.processEmailJob with the job', async () => { const jobData = { to: 'test@example.com', subject: 'Test Email', html: '

Hello

', text: 'Hello', }; const job = createMockJob(jobData); await emailProcessor(job); expect(mocks.processEmailJob).toHaveBeenCalledTimes(1); expect(mocks.processEmailJob).toHaveBeenCalledWith(job); }); it('should re-throw an error if processEmailJob fails', async () => { const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' }); const emailError = new Error('SMTP server is down'); mocks.processEmailJob.mockRejectedValue(emailError); await expect(emailProcessor(job)).rejects.toThrow('SMTP server is down'); }); }); describe('analyticsWorker', () => { it('should call analyticsService.processDailyReportJob with the job', async () => { const job = createMockJob({ reportDate: '2024-01-01' }); await analyticsProcessor(job); expect(mocks.processDailyReportJob).toHaveBeenCalledTimes(1); expect(mocks.processDailyReportJob).toHaveBeenCalledWith(job); }); it('should re-throw an error if processDailyReportJob fails', async () => { const job = createMockJob({ reportDate: 'FAIL' }); const analyticsError = new Error('Analytics processing failed'); mocks.processDailyReportJob.mockRejectedValue(analyticsError); await expect(analyticsProcessor(job)).rejects.toThrow('Analytics processing failed'); }); }); describe('cleanupWorker', () => { it('should call flyerProcessingService.processCleanupJob with the job', async () => { const jobData = { flyerId: 123, paths: ['/tmp/file1.jpg', '/tmp/file2.pdf'], }; const job = createMockJob(jobData); await cleanupProcessor(job); expect(mocks.processCleanupJob).toHaveBeenCalledTimes(1); expect(mocks.processCleanupJob).toHaveBeenCalledWith(job); }); it('should re-throw an error if processCleanupJob fails', async () => { const jobData = { flyerId: 123, paths: ['/tmp/protected-file.jpg'] }; const job = createMockJob(jobData); const cleanupError = new Error('Permission denied'); mocks.processCleanupJob.mockRejectedValue(cleanupError); await expect(cleanupProcessor(job)).rejects.toThrow('Permission denied'); }); }); describe('weeklyAnalyticsWorker', () => { it('should call analyticsService.processWeeklyReportJob with the job', async () => { const job = createMockJob({ reportYear: 2024, reportWeek: 1 }); await weeklyAnalyticsProcessor(job); expect(mocks.processWeeklyReportJob).toHaveBeenCalledTimes(1); expect(mocks.processWeeklyReportJob).toHaveBeenCalledWith(job); }); it('should re-throw an error if processWeeklyReportJob fails', async () => { const job = createMockJob({ reportYear: 2024, reportWeek: 1 }); const weeklyError = new Error('Weekly analytics job failed'); mocks.processWeeklyReportJob.mockRejectedValue(weeklyError); await expect(weeklyAnalyticsProcessor(job)).rejects.toThrow('Weekly analytics job failed'); }); }); describe('tokenCleanupWorker', () => { it('should call userService.processTokenCleanupJob with the job', async () => { const job = createMockJob({ timestamp: new Date().toISOString() }); await tokenCleanupProcessor(job); expect(mocks.processTokenCleanupJob).toHaveBeenCalledTimes(1); expect(mocks.processTokenCleanupJob).toHaveBeenCalledWith(job); }); it('should re-throw an error if processTokenCleanupJob fails', async () => { const job = createMockJob({ timestamp: new Date().toISOString() }); const dbError = new Error('DB cleanup failed'); mocks.processTokenCleanupJob.mockRejectedValue(dbError); await expect(tokenCleanupProcessor(job)).rejects.toThrow(dbError); }); }); });