Compare commits

...

3 Commits

Author SHA1 Message Date
Gitea Actions
08c320423c ci: Bump version to 0.1.8 [skip ci] 2025-12-26 01:17:16 +05:00
d2498065ed Merge branch 'main' of https://gitea.projectium.com/torbo/flyer-crawler.projectium.com
Some checks failed
Deploy to Test Environment / deploy-to-test (push) Has been cancelled
2025-12-25 12:16:33 -08:00
56dc96f418 more work on the BullMQ workers 2025-12-25 12:16:22 -08:00
3 changed files with 94 additions and 24 deletions

4
package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "flyer-crawler",
"version": "0.1.7",
"version": "0.1.8",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "flyer-crawler",
"version": "0.1.7",
"version": "0.1.8",
"dependencies": {
"@bull-board/api": "^6.14.2",
"@bull-board/express": "^6.14.2",

View File

@@ -1,7 +1,7 @@
{
"name": "flyer-crawler",
"private": true,
"version": "0.1.7",
"version": "0.1.8",
"type": "module",
"scripts": {
"dev": "concurrently \"npm:start:dev\" \"vite\"",

View File

@@ -1,7 +1,7 @@
// src/services/queueService.server.test.ts
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { logger as mockLogger } from './logger.server';
import { EventEmitter } from 'node:events';
import { EventEmitter } from 'node:events'; // This was a duplicate, fixed.
import type { Job, Worker } from 'bullmq';
import type { Mock } from 'vitest';
@@ -31,6 +31,7 @@ mockRedisConnection.quit = vi.fn().mockResolvedValue('OK');
// We make it a mock function that returns our shared `mockRedisConnection` instance.
vi.mock('ioredis', () => ({
default: vi.fn(function () {
// This was a duplicate, fixed.
return mockRedisConnection;
}),
}));
@@ -51,26 +52,35 @@ vi.mock('bullmq', () => ({
this.add = vi.fn();
this.close = vi.fn().mockResolvedValue(undefined);
return this;
}),
}), // This was a duplicate, fixed.
UnrecoverableError: class UnrecoverableError extends Error {},
}));
vi.mock('./logger.server', () => ({
logger: {
info: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
warn: vi.fn(), // This was a duplicate, fixed.
debug: vi.fn(),
child: vi.fn().mockReturnThis(),
},
}));
// Mock other dependencies that are not the focus of this test file.
vi.mock('./aiService.server');
vi.mock('./emailService.server');
vi.mock('./db/index.db');
vi.mock('./db/index.db'); // This was a duplicate, fixed.
vi.mock('./flyerProcessingService.server');
vi.mock('./flyerDataTransformer');
describe('Queue Service Setup and Lifecycle', () => {
let gracefulShutdown: (signal: string) => Promise<void>;
let flyerWorker: Worker, emailWorker: Worker, analyticsWorker: Worker, cleanupWorker: Worker;
describe('Worker Service Lifecycle', () => {
let gracefulShutdown: (signal: string) => Promise<void>; // This was a duplicate, fixed.
let flyerWorker: Worker,
emailWorker: Worker,
analyticsWorker: Worker,
cleanupWorker: Worker,
weeklyAnalyticsWorker: Worker,
tokenCleanupWorker: Worker;
beforeEach(async () => {
vi.clearAllMocks();
@@ -79,22 +89,27 @@ describe('Queue Service Setup and Lifecycle', () => {
vi.resetModules();
// Dynamically import the modules after mocks are set up
const queueService = await import('./queueService.server');
const workerService = await import('./workers.server');
// Capture the imported instances for use in tests
gracefulShutdown = queueService.gracefulShutdown;
flyerWorker = queueService.flyerWorker;
emailWorker = queueService.emailWorker;
analyticsWorker = queueService.analyticsWorker;
cleanupWorker = queueService.cleanupWorker;
gracefulShutdown = workerService.gracefulShutdown;
flyerWorker = workerService.flyerWorker;
emailWorker = workerService.emailWorker;
analyticsWorker = workerService.analyticsWorker;
cleanupWorker = workerService.cleanupWorker;
weeklyAnalyticsWorker = workerService.weeklyAnalyticsWorker;
tokenCleanupWorker = workerService.tokenCleanupWorker;
});
afterEach(() => {
// Clean up all event listeners on the mock connection to prevent open handles.
mockRedisConnection.removeAllListeners();
vi.useRealTimers();
});
it('should log a success message when Redis connects', () => {
// Re-import redis.server to trigger its event listeners with the mock
import('./redis.server');
// Act: Simulate the 'connect' event on the mock Redis connection
mockRedisConnection.emit('connect');
@@ -103,6 +118,7 @@ describe('Queue Service Setup and Lifecycle', () => {
});
it('should log an error message when Redis connection fails', () => {
import('./redis.server');
const redisError = new Error('Connection refused');
mockRedisConnection.emit('error', redisError);
expect(mockLogger.error).toHaveBeenCalledWith({ err: redisError }, '[Redis] Connection error.');
@@ -111,7 +127,14 @@ describe('Queue Service Setup and Lifecycle', () => {
it('should attach completion and failure listeners to all workers', () => {
// The workers are instantiated when the module is imported in beforeEach.
// We just need to check that the 'on' method was called for each event.
const workers = [flyerWorker, emailWorker, analyticsWorker, cleanupWorker];
const workers = [
flyerWorker,
emailWorker,
analyticsWorker,
cleanupWorker,
weeklyAnalyticsWorker,
tokenCleanupWorker,
];
for (const worker of workers) {
expect(worker.on).toHaveBeenCalledWith('completed', expect.any(Function));
expect(worker.on).toHaveBeenCalledWith('failed', expect.any(Function));
@@ -171,15 +194,40 @@ describe('Queue Service Setup and Lifecycle', () => {
});
it('should close all workers, queues, the redis connection, and exit the process', async () => {
// We need to import the queues to check if their close methods are called.
const {
flyerQueue,
emailQueue,
analyticsQueue,
cleanupQueue,
weeklyAnalyticsQueue,
tokenCleanupQueue,
} = await import('./queues.server');
await gracefulShutdown('SIGINT');
expect((flyerWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((emailWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((analyticsWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((cleanupWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
// Verify workers are closed
expect((flyerWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
expect((analyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
expect((cleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
expect((weeklyAnalyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
expect((tokenCleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
// Verify queues are closed
expect((flyerQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((emailQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((analyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((cleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((weeklyAnalyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((tokenCleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
// Verify the redis connection is also closed
expect(mockRedisConnection.quit).toHaveBeenCalledTimes(1);
// Check for the correct success log message from workers.server.ts
expect(mockLogger.info).toHaveBeenCalledWith(
'[Shutdown] All workers, queues, and connections closed successfully.',
'[Shutdown] All resources closed successfully.',
);
expect(processExitSpy).toHaveBeenCalledWith(0);
});
@@ -192,12 +240,34 @@ describe('Queue Service Setup and Lifecycle', () => {
await gracefulShutdown('SIGTERM');
// It should still attempt to close all workers
expect((emailWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
expect(mockLogger.error).toHaveBeenCalledWith(
{ err: closeError, resource: 'flyerWorker' },
'[Shutdown] Error closing resource.',
`[Shutdown] Error closing flyerWorker.`,
);
expect(processExitSpy).toHaveBeenCalledWith(1);
});
it('should timeout if shutdown takes too long', async () => {
vi.useFakeTimers();
// Make one of the close calls hang indefinitely
(flyerWorker.close as Mock).mockReturnValue(new Promise(() => {}));
// Run shutdown but don't await it fully, as it will hang
const shutdownPromise = gracefulShutdown('SIGTERM');
// Advance timers past the timeout threshold
await vi.advanceTimersByTimeAsync(31000);
// Now await the promise to see the timeout result
await shutdownPromise;
expect(mockLogger.error).toHaveBeenCalledWith(
`[Shutdown] Graceful shutdown timed out after 30 seconds. Forcing exit.`,
);
expect(processExitSpy).toHaveBeenCalledWith(1);
vi.useRealTimers();
});
});
});