// src/services/queueService.server.test.ts import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import { logger as mockLogger } from './logger.server'; import { EventEmitter } from 'node:events'; // This was a duplicate, fixed. import type { Job, Worker } from 'bullmq'; import type { Mock } from 'vitest'; // Define interfaces for our mock constructors to avoid using `any` for the `this` context. interface MockWorkerInstance { name: string; on: Mock; close: Mock<() => Promise>; isRunning: Mock<() => boolean>; } interface MockQueueInstance { name: string; add: Mock; close: Mock<() => Promise>; quit?: Mock<() => Promise<'OK'>>; // Add quit for the Redis mock } // --- Inline Mock Implementations --- // Create a single, shared mock Redis connection instance that we can control in tests. const mockRedisConnection = new EventEmitter() as EventEmitter & { ping: Mock; quit: Mock }; mockRedisConnection.ping = vi.fn().mockResolvedValue('PONG'); mockRedisConnection.quit = vi.fn().mockResolvedValue('OK'); // Mock the 'ioredis' library. The default export is a class constructor. // We make it a mock function that returns our shared `mockRedisConnection` instance. vi.mock('ioredis', () => ({ default: vi.fn(function () { // This was a duplicate, fixed. return mockRedisConnection; }), })); // Mock the 'bullmq' library. vi.mock('bullmq', () => ({ // Mock the Worker class constructor. Worker: vi.fn(function (this: MockWorkerInstance, name: string) { this.name = name; this.on = vi.fn(); this.close = vi.fn().mockResolvedValue(undefined); this.isRunning = vi.fn().mockReturnValue(true); return this; }), // Mock the Queue class constructor. Queue: vi.fn(function (this: MockQueueInstance, name: string) { this.name = name; this.add = vi.fn(); this.close = vi.fn().mockResolvedValue(undefined); return this; }), // This was a duplicate, fixed. UnrecoverableError: class UnrecoverableError extends Error {}, })); vi.mock('./logger.server', () => ({ logger: { info: vi.fn(), error: vi.fn(), warn: vi.fn(), // This was a duplicate, fixed. debug: vi.fn(), child: vi.fn().mockReturnThis(), }, })); // Mock other dependencies that are not the focus of this test file. vi.mock('./aiService.server'); vi.mock('./emailService.server'); vi.mock('./db/index.db'); // This was a duplicate, fixed. vi.mock('./flyerProcessingService.server'); vi.mock('./flyerDataTransformer'); describe('Worker Service Lifecycle', () => { let gracefulShutdown: (signal: string) => Promise; // This was a duplicate, fixed. let flyerWorker: Worker, emailWorker: Worker, analyticsWorker: Worker, cleanupWorker: Worker, weeklyAnalyticsWorker: Worker, tokenCleanupWorker: Worker; beforeEach(async () => { vi.clearAllMocks(); // Reset modules to re-evaluate the queueService.server.ts file with fresh mocks // This ensures that new worker and queue instances are created for each test. vi.resetModules(); // Dynamically import the modules after mocks are set up const workerService = await import('./workers.server'); // Capture the imported instances for use in tests gracefulShutdown = workerService.gracefulShutdown; flyerWorker = workerService.flyerWorker; emailWorker = workerService.emailWorker; analyticsWorker = workerService.analyticsWorker; cleanupWorker = workerService.cleanupWorker; weeklyAnalyticsWorker = workerService.weeklyAnalyticsWorker; tokenCleanupWorker = workerService.tokenCleanupWorker; }); afterEach(() => { // Clean up all event listeners on the mock connection to prevent open handles. mockRedisConnection.removeAllListeners(); vi.useRealTimers(); }); it('should log a success message when Redis connects', () => { // Re-import redis.server to trigger its event listeners with the mock import('./redis.server'); // Act: Simulate the 'connect' event on the mock Redis connection mockRedisConnection.emit('connect'); // Assert: Check if the logger was called with the expected message expect(mockLogger.info).toHaveBeenCalledWith('[Redis] Connection established successfully.'); }); it('should log an error message when Redis connection fails', () => { import('./redis.server'); const redisError = new Error('Connection refused'); mockRedisConnection.emit('error', redisError); expect(mockLogger.error).toHaveBeenCalledWith({ err: redisError }, '[Redis] Connection error.'); }); it('should attach completion and failure listeners to all workers', () => { // The workers are instantiated when the module is imported in beforeEach. // We just need to check that the 'on' method was called for each event. const workers = [ flyerWorker, emailWorker, analyticsWorker, cleanupWorker, weeklyAnalyticsWorker, tokenCleanupWorker, ]; for (const worker of workers) { expect(worker.on).toHaveBeenCalledWith('completed', expect.any(Function)); expect(worker.on).toHaveBeenCalledWith('failed', expect.any(Function)); } }); describe('Worker Event Listeners', () => { it('should log a message when a job is completed', () => { // Find the 'completed' callback registered on our mock worker. const completedCallback = (flyerWorker.on as Mock).mock.calls.find( (call) => call[0] === 'completed', )?.[1]; // Ensure the callback was found before trying to call it expect(completedCallback).toBeDefined(); const mockJob = { id: 'job-abc' }; const mockReturnValue = { flyerId: 123 }; // Call the captured callback (completedCallback as (job: Job, result: unknown) => void)(mockJob as Job, mockReturnValue); expect(mockLogger.info).toHaveBeenCalledWith( { returnValue: mockReturnValue }, `[flyer-processing] Job job-abc completed successfully.`, ); }); it('should log an error when a job has ultimately failed', () => { // Find the 'failed' callback registered on our mock worker. const failedCallback = (emailWorker.on as Mock).mock.calls.find( (call) => call[0] === 'failed', )?.[1]; expect(failedCallback).toBeDefined(); const mockJob = { id: 'job-xyz', data: { to: 'test@example.com' } }; const mockError = new Error('SMTP Server Down'); // Call the captured callback (failedCallback as (job: Job | undefined, error: Error) => void)(mockJob as Job, mockError); expect(mockLogger.error).toHaveBeenCalledWith( { err: mockError, jobData: mockJob.data }, `[email-sending] Job ${mockJob.id} has ultimately failed after all attempts.`, ); }); }); describe('gracefulShutdown', () => { let processExitSpy: Mock; beforeEach(() => { processExitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never); }); afterEach(() => { if (processExitSpy && typeof processExitSpy.mockRestore === 'function') { console.log('[DEBUG] queueService.server.test.ts: Restoring process.exit spy'); processExitSpy.mockRestore(); } }); it('should close all workers, queues, the redis connection, and exit the process', async () => { // We need to import the queues to check if their close methods are called. const { flyerQueue, emailQueue, analyticsQueue, cleanupQueue, weeklyAnalyticsQueue, tokenCleanupQueue, } = await import('./queues.server'); await gracefulShutdown('SIGINT'); // Verify workers are closed expect((flyerWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); expect((analyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); expect((cleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); expect((weeklyAnalyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); expect((tokenCleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); // Verify queues are closed expect((flyerQueue as unknown as MockQueueInstance).close).toHaveBeenCalled(); expect((emailQueue as unknown as MockQueueInstance).close).toHaveBeenCalled(); expect((analyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled(); expect((cleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled(); expect((weeklyAnalyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled(); expect((tokenCleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled(); // Verify the redis connection is also closed expect(mockRedisConnection.quit).toHaveBeenCalledTimes(1); // Check for the correct success log message from workers.server.ts expect(mockLogger.info).toHaveBeenCalledWith( '[Shutdown] All resources closed successfully.', ); expect(processExitSpy).toHaveBeenCalledWith(0); }); it('should log an error if a worker fails to close', async () => { const closeError = new Error('Worker failed to close'); // Simulate one worker failing to close (flyerWorker.close as Mock).mockRejectedValue(closeError); await gracefulShutdown('SIGTERM'); // It should still attempt to close all workers expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled(); expect(mockLogger.error).toHaveBeenCalledWith( { err: closeError, resource: 'flyerWorker' }, `[Shutdown] Error closing flyerWorker.`, ); expect(processExitSpy).toHaveBeenCalledWith(1); }); it('should timeout if shutdown takes too long', async () => { vi.useFakeTimers(); // Make one of the close calls hang indefinitely (flyerWorker.close as Mock).mockReturnValue(new Promise(() => {})); // Run shutdown but don't await it fully, as it will hang const shutdownPromise = gracefulShutdown('SIGTERM'); // Advance timers past the timeout threshold await vi.advanceTimersByTimeAsync(31000); // Now await the promise to see the timeout result await shutdownPromise; expect(mockLogger.error).toHaveBeenCalledWith( `[Shutdown] Graceful shutdown timed out after 30 seconds. Forcing exit.`, ); expect(processExitSpy).toHaveBeenCalledWith(1); vi.useRealTimers(); }); }); });