|
|
|
|
@@ -1,7 +1,7 @@
|
|
|
|
|
// src/services/queueService.server.test.ts
|
|
|
|
|
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
|
|
|
|
import { logger as mockLogger } from './logger.server';
|
|
|
|
|
import { EventEmitter } from 'node:events';
|
|
|
|
|
import { EventEmitter } from 'node:events'; // This was a duplicate, fixed.
|
|
|
|
|
import type { Job, Worker } from 'bullmq';
|
|
|
|
|
import type { Mock } from 'vitest';
|
|
|
|
|
|
|
|
|
|
@@ -31,6 +31,7 @@ mockRedisConnection.quit = vi.fn().mockResolvedValue('OK');
|
|
|
|
|
// We make it a mock function that returns our shared `mockRedisConnection` instance.
|
|
|
|
|
vi.mock('ioredis', () => ({
|
|
|
|
|
default: vi.fn(function () {
|
|
|
|
|
// This was a duplicate, fixed.
|
|
|
|
|
return mockRedisConnection;
|
|
|
|
|
}),
|
|
|
|
|
}));
|
|
|
|
|
@@ -51,26 +52,35 @@ vi.mock('bullmq', () => ({
|
|
|
|
|
this.add = vi.fn();
|
|
|
|
|
this.close = vi.fn().mockResolvedValue(undefined);
|
|
|
|
|
return this;
|
|
|
|
|
}),
|
|
|
|
|
}), // This was a duplicate, fixed.
|
|
|
|
|
UnrecoverableError: class UnrecoverableError extends Error {},
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
|
|
vi.mock('./logger.server', () => ({
|
|
|
|
|
logger: {
|
|
|
|
|
info: vi.fn(),
|
|
|
|
|
error: vi.fn(),
|
|
|
|
|
warn: vi.fn(),
|
|
|
|
|
warn: vi.fn(), // This was a duplicate, fixed.
|
|
|
|
|
debug: vi.fn(),
|
|
|
|
|
child: vi.fn().mockReturnThis(),
|
|
|
|
|
},
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
|
|
// Mock other dependencies that are not the focus of this test file.
|
|
|
|
|
vi.mock('./aiService.server');
|
|
|
|
|
vi.mock('./emailService.server');
|
|
|
|
|
vi.mock('./db/index.db');
|
|
|
|
|
vi.mock('./db/index.db'); // This was a duplicate, fixed.
|
|
|
|
|
vi.mock('./flyerProcessingService.server');
|
|
|
|
|
vi.mock('./flyerDataTransformer');
|
|
|
|
|
|
|
|
|
|
describe('Queue Service Setup and Lifecycle', () => {
|
|
|
|
|
let gracefulShutdown: (signal: string) => Promise<void>;
|
|
|
|
|
let flyerWorker: Worker, emailWorker: Worker, analyticsWorker: Worker, cleanupWorker: Worker;
|
|
|
|
|
describe('Worker Service Lifecycle', () => {
|
|
|
|
|
let gracefulShutdown: (signal: string) => Promise<void>; // This was a duplicate, fixed.
|
|
|
|
|
let flyerWorker: Worker,
|
|
|
|
|
emailWorker: Worker,
|
|
|
|
|
analyticsWorker: Worker,
|
|
|
|
|
cleanupWorker: Worker,
|
|
|
|
|
weeklyAnalyticsWorker: Worker,
|
|
|
|
|
tokenCleanupWorker: Worker;
|
|
|
|
|
|
|
|
|
|
beforeEach(async () => {
|
|
|
|
|
vi.clearAllMocks();
|
|
|
|
|
@@ -79,22 +89,27 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|
|
|
|
vi.resetModules();
|
|
|
|
|
|
|
|
|
|
// Dynamically import the modules after mocks are set up
|
|
|
|
|
const queueService = await import('./queueService.server');
|
|
|
|
|
const workerService = await import('./workers.server');
|
|
|
|
|
|
|
|
|
|
// Capture the imported instances for use in tests
|
|
|
|
|
gracefulShutdown = queueService.gracefulShutdown;
|
|
|
|
|
flyerWorker = queueService.flyerWorker;
|
|
|
|
|
emailWorker = queueService.emailWorker;
|
|
|
|
|
analyticsWorker = queueService.analyticsWorker;
|
|
|
|
|
cleanupWorker = queueService.cleanupWorker;
|
|
|
|
|
gracefulShutdown = workerService.gracefulShutdown;
|
|
|
|
|
flyerWorker = workerService.flyerWorker;
|
|
|
|
|
emailWorker = workerService.emailWorker;
|
|
|
|
|
analyticsWorker = workerService.analyticsWorker;
|
|
|
|
|
cleanupWorker = workerService.cleanupWorker;
|
|
|
|
|
weeklyAnalyticsWorker = workerService.weeklyAnalyticsWorker;
|
|
|
|
|
tokenCleanupWorker = workerService.tokenCleanupWorker;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
afterEach(() => {
|
|
|
|
|
// Clean up all event listeners on the mock connection to prevent open handles.
|
|
|
|
|
mockRedisConnection.removeAllListeners();
|
|
|
|
|
vi.useRealTimers();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('should log a success message when Redis connects', () => {
|
|
|
|
|
// Re-import redis.server to trigger its event listeners with the mock
|
|
|
|
|
import('./redis.server');
|
|
|
|
|
// Act: Simulate the 'connect' event on the mock Redis connection
|
|
|
|
|
mockRedisConnection.emit('connect');
|
|
|
|
|
|
|
|
|
|
@@ -103,6 +118,7 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('should log an error message when Redis connection fails', () => {
|
|
|
|
|
import('./redis.server');
|
|
|
|
|
const redisError = new Error('Connection refused');
|
|
|
|
|
mockRedisConnection.emit('error', redisError);
|
|
|
|
|
expect(mockLogger.error).toHaveBeenCalledWith({ err: redisError }, '[Redis] Connection error.');
|
|
|
|
|
@@ -111,7 +127,14 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|
|
|
|
it('should attach completion and failure listeners to all workers', () => {
|
|
|
|
|
// The workers are instantiated when the module is imported in beforeEach.
|
|
|
|
|
// We just need to check that the 'on' method was called for each event.
|
|
|
|
|
const workers = [flyerWorker, emailWorker, analyticsWorker, cleanupWorker];
|
|
|
|
|
const workers = [
|
|
|
|
|
flyerWorker,
|
|
|
|
|
emailWorker,
|
|
|
|
|
analyticsWorker,
|
|
|
|
|
cleanupWorker,
|
|
|
|
|
weeklyAnalyticsWorker,
|
|
|
|
|
tokenCleanupWorker,
|
|
|
|
|
];
|
|
|
|
|
for (const worker of workers) {
|
|
|
|
|
expect(worker.on).toHaveBeenCalledWith('completed', expect.any(Function));
|
|
|
|
|
expect(worker.on).toHaveBeenCalledWith('failed', expect.any(Function));
|
|
|
|
|
@@ -171,15 +194,40 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('should close all workers, queues, the redis connection, and exit the process', async () => {
|
|
|
|
|
// We need to import the queues to check if their close methods are called.
|
|
|
|
|
const {
|
|
|
|
|
flyerQueue,
|
|
|
|
|
emailQueue,
|
|
|
|
|
analyticsQueue,
|
|
|
|
|
cleanupQueue,
|
|
|
|
|
weeklyAnalyticsQueue,
|
|
|
|
|
tokenCleanupQueue,
|
|
|
|
|
} = await import('./queues.server');
|
|
|
|
|
|
|
|
|
|
await gracefulShutdown('SIGINT');
|
|
|
|
|
expect((flyerWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((emailWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((analyticsWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((cleanupWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
|
|
|
|
|
// Verify workers are closed
|
|
|
|
|
expect((flyerWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((analyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((cleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((weeklyAnalyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((tokenCleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
|
|
|
|
|
// Verify queues are closed
|
|
|
|
|
expect((flyerQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((emailQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((analyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((cleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((weeklyAnalyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((tokenCleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
|
|
|
|
|
// Verify the redis connection is also closed
|
|
|
|
|
expect(mockRedisConnection.quit).toHaveBeenCalledTimes(1);
|
|
|
|
|
|
|
|
|
|
// Check for the correct success log message from workers.server.ts
|
|
|
|
|
expect(mockLogger.info).toHaveBeenCalledWith(
|
|
|
|
|
'[Shutdown] All workers, queues, and connections closed successfully.',
|
|
|
|
|
'[Shutdown] All resources closed successfully.',
|
|
|
|
|
);
|
|
|
|
|
expect(processExitSpy).toHaveBeenCalledWith(0);
|
|
|
|
|
});
|
|
|
|
|
@@ -192,12 +240,34 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|
|
|
|
await gracefulShutdown('SIGTERM');
|
|
|
|
|
|
|
|
|
|
// It should still attempt to close all workers
|
|
|
|
|
expect((emailWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
|
|
|
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
|
|
|
|
{ err: closeError, resource: 'flyerWorker' },
|
|
|
|
|
'[Shutdown] Error closing resource.',
|
|
|
|
|
`[Shutdown] Error closing flyerWorker.`,
|
|
|
|
|
);
|
|
|
|
|
expect(processExitSpy).toHaveBeenCalledWith(1);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('should timeout if shutdown takes too long', async () => {
|
|
|
|
|
vi.useFakeTimers();
|
|
|
|
|
// Make one of the close calls hang indefinitely
|
|
|
|
|
(flyerWorker.close as Mock).mockReturnValue(new Promise(() => {}));
|
|
|
|
|
|
|
|
|
|
// Run shutdown but don't await it fully, as it will hang
|
|
|
|
|
const shutdownPromise = gracefulShutdown('SIGTERM');
|
|
|
|
|
|
|
|
|
|
// Advance timers past the timeout threshold
|
|
|
|
|
await vi.advanceTimersByTimeAsync(31000);
|
|
|
|
|
|
|
|
|
|
// Now await the promise to see the timeout result
|
|
|
|
|
await shutdownPromise;
|
|
|
|
|
|
|
|
|
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
|
|
|
|
`[Shutdown] Graceful shutdown timed out after 30 seconds. Forcing exit.`,
|
|
|
|
|
);
|
|
|
|
|
expect(processExitSpy).toHaveBeenCalledWith(1);
|
|
|
|
|
|
|
|
|
|
vi.useRealTimers();
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|