Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed6d6349a2 | ||
| d4db2a709a | |||
| 508583809b | |||
|
|
6b1f7e7590 | ||
| 07bb31f4fb | |||
| a42fb76da8 | |||
|
|
08c320423c | ||
| d2498065ed | |||
| 56dc96f418 | |||
|
|
4e9aa0efc3 | ||
| e5e4b1316c | |||
| e8d511b4de | |||
|
|
c4bbf5c251 | ||
| 32a9e6732b | |||
| e7c076e2ed |
@@ -283,7 +283,7 @@ jobs:
|
|||||||
echo "WARNING: No schema hash found in the test database."
|
echo "WARNING: No schema hash found in the test database."
|
||||||
echo "This is expected for a first-time deployment. The hash will be set after a successful deployment."
|
echo "This is expected for a first-time deployment. The hash will be set after a successful deployment."
|
||||||
echo "--- Debug: Dumping schema_info table ---"
|
echo "--- Debug: Dumping schema_info table ---"
|
||||||
PGPASSWORD="$DB_PASSWORD" psql -v ON_ERROR_STOP=0 -h "$DB_HOST" -p 5432 -U "$DB_USER" -d "$DB_NAME" -c "SELECT * FROM public.schema_info;" || true
|
PGPASSWORD="$DB_PASSWORD" psql -v ON_ERROR_STOP=0 -h "$DB_HOST" -p 5432 -U "$DB_USER" -d "$DB_NAME" -P pager=off -c "SELECT * FROM public.schema_info;" || true
|
||||||
echo "----------------------------------------"
|
echo "----------------------------------------"
|
||||||
# We allow the deployment to continue, but a manual schema update is required.
|
# We allow the deployment to continue, but a manual schema update is required.
|
||||||
# You could choose to fail here by adding `exit 1`.
|
# You could choose to fail here by adding `exit 1`.
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ module.exports = {
|
|||||||
// --- General Worker ---
|
// --- General Worker ---
|
||||||
name: 'flyer-crawler-worker',
|
name: 'flyer-crawler-worker',
|
||||||
script: './node_modules/.bin/tsx',
|
script: './node_modules/.bin/tsx',
|
||||||
args: 'src/services/queueService.server.ts', // tsx will execute this file
|
args: 'src/worker.ts', // tsx will execute this file
|
||||||
// Production Environment Settings
|
// Production Environment Settings
|
||||||
env_production: {
|
env_production: {
|
||||||
NODE_ENV: 'production',
|
NODE_ENV: 'production',
|
||||||
@@ -164,7 +164,7 @@ module.exports = {
|
|||||||
// --- Analytics Worker ---
|
// --- Analytics Worker ---
|
||||||
name: 'flyer-crawler-analytics-worker',
|
name: 'flyer-crawler-analytics-worker',
|
||||||
script: './node_modules/.bin/tsx',
|
script: './node_modules/.bin/tsx',
|
||||||
args: 'src/services/queueService.server.ts', // tsx will execute this file
|
args: 'src/worker.ts', // tsx will execute this file
|
||||||
// Production Environment Settings
|
// Production Environment Settings
|
||||||
env_production: {
|
env_production: {
|
||||||
NODE_ENV: 'production',
|
NODE_ENV: 'production',
|
||||||
|
|||||||
4
package-lock.json
generated
4
package-lock.json
generated
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "flyer-crawler",
|
"name": "flyer-crawler",
|
||||||
"version": "0.1.5",
|
"version": "0.1.10",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "flyer-crawler",
|
"name": "flyer-crawler",
|
||||||
"version": "0.1.5",
|
"version": "0.1.10",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@bull-board/api": "^6.14.2",
|
"@bull-board/api": "^6.14.2",
|
||||||
"@bull-board/express": "^6.14.2",
|
"@bull-board/express": "^6.14.2",
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "flyer-crawler",
|
"name": "flyer-crawler",
|
||||||
"private": true,
|
"private": true,
|
||||||
"version": "0.1.5",
|
"version": "0.1.10",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"dev": "concurrently \"npm:start:dev\" \"vite\"",
|
"dev": "concurrently \"npm:start:dev\" \"vite\"",
|
||||||
|
|||||||
@@ -15,16 +15,19 @@ import type { Logger } from 'pino';
|
|||||||
// Create a mock logger that we can inject into requests and assert against.
|
// Create a mock logger that we can inject into requests and assert against.
|
||||||
// We only mock the methods we intend to spy on. The rest of the complex Pino
|
// We only mock the methods we intend to spy on. The rest of the complex Pino
|
||||||
// Logger type is satisfied by casting, which is a common and clean testing practice.
|
// Logger type is satisfied by casting, which is a common and clean testing practice.
|
||||||
const mockLogger = {
|
const { mockLogger } = vi.hoisted(() => {
|
||||||
error: vi.fn(),
|
const mockLogger = {
|
||||||
warn: vi.fn(),
|
error: vi.fn(),
|
||||||
info: vi.fn(),
|
warn: vi.fn(),
|
||||||
debug: vi.fn(),
|
info: vi.fn(),
|
||||||
fatal: vi.fn(),
|
debug: vi.fn(),
|
||||||
trace: vi.fn(),
|
fatal: vi.fn(),
|
||||||
silent: vi.fn(),
|
trace: vi.fn(),
|
||||||
child: vi.fn().mockReturnThis(),
|
silent: vi.fn(),
|
||||||
} as unknown as Logger;
|
child: vi.fn().mockReturnThis(),
|
||||||
|
};
|
||||||
|
return { mockLogger };
|
||||||
|
});
|
||||||
|
|
||||||
// Mock the global logger as a fallback, though our tests will focus on req.log
|
// Mock the global logger as a fallback, though our tests will focus on req.log
|
||||||
vi.mock('../services/logger.server', () => ({ logger: mockLogger }));
|
vi.mock('../services/logger.server', () => ({ logger: mockLogger }));
|
||||||
@@ -37,7 +40,7 @@ const app = express();
|
|||||||
app.use(express.json());
|
app.use(express.json());
|
||||||
// Add a middleware to inject our mock logger into each request as `req.log`
|
// Add a middleware to inject our mock logger into each request as `req.log`
|
||||||
app.use((req: Request, res: Response, next: NextFunction) => {
|
app.use((req: Request, res: Response, next: NextFunction) => {
|
||||||
req.log = mockLogger;
|
req.log = mockLogger as unknown as Logger;
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,12 @@
|
|||||||
// src/middleware/errorHandler.ts
|
// src/middleware/errorHandler.ts
|
||||||
import { Request, Response, NextFunction } from 'express';
|
import { Request, Response, NextFunction } from 'express';
|
||||||
import { ZodError } from 'zod';
|
import { ZodError } from 'zod';
|
||||||
import { NotFoundError, UniqueConstraintError, ValidationError } from '../services/db/errors.db';
|
import {
|
||||||
|
ForeignKeyConstraintError,
|
||||||
|
NotFoundError,
|
||||||
|
UniqueConstraintError,
|
||||||
|
ValidationError,
|
||||||
|
} from '../services/db/errors.db';
|
||||||
import { logger } from '../services/logger.server';
|
import { logger } from '../services/logger.server';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -41,6 +46,11 @@ export const errorHandler = (err: Error, req: Request, res: Response, next: Next
|
|||||||
|
|
||||||
if (err instanceof UniqueConstraintError) {
|
if (err instanceof UniqueConstraintError) {
|
||||||
log.warn({ err }, 'Constraint error occurred');
|
log.warn({ err }, 'Constraint error occurred');
|
||||||
|
return res.status(409).json({ message: err.message }); // Use 409 Conflict for unique constraints
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err instanceof ForeignKeyConstraintError) {
|
||||||
|
log.warn({ err }, 'Foreign key constraint violation');
|
||||||
return res.status(400).json({ message: err.message });
|
return res.status(400).json({ message: err.message });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ import {
|
|||||||
import type { SuggestedCorrection, Brand, UserProfile, UnmatchedFlyerItem } from '../types';
|
import type { SuggestedCorrection, Brand, UserProfile, UnmatchedFlyerItem } from '../types';
|
||||||
import { NotFoundError } from '../services/db/errors.db'; // This can stay, it's a type/class not a module with side effects.
|
import { NotFoundError } from '../services/db/errors.db'; // This can stay, it's a type/class not a module with side effects.
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
// Mock the file upload middleware to allow testing the controller's internal check
|
// Mock the file upload middleware to allow testing the controller's internal check
|
||||||
vi.mock('../middleware/fileUpload.middleware', () => ({
|
vi.mock('../middleware/fileUpload.middleware', () => ({
|
||||||
@@ -96,8 +95,9 @@ vi.mock('@bull-board/express', () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the passport middleware
|
// Mock the passport middleware
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import { createMockUserProfile } from '../tests/utils/mockFactories';
|
|||||||
import type { Job } from 'bullmq';
|
import type { Job } from 'bullmq';
|
||||||
import type { UserProfile } from '../types';
|
import type { UserProfile } from '../types';
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
// Mock the background job service to control its methods.
|
// Mock the background job service to control its methods.
|
||||||
vi.mock('../services/backgroundJobService', () => ({
|
vi.mock('../services/backgroundJobService', () => ({
|
||||||
@@ -66,8 +65,9 @@ import {
|
|||||||
} from '../services/queueService.server';
|
} from '../services/queueService.server';
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the passport middleware
|
// Mock the passport middleware
|
||||||
|
|||||||
@@ -5,7 +5,16 @@ import type { Request, Response, NextFunction } from 'express';
|
|||||||
import { createMockUserProfile, createMockActivityLogItem } from '../tests/utils/mockFactories';
|
import { createMockUserProfile, createMockActivityLogItem } from '../tests/utils/mockFactories';
|
||||||
import type { UserProfile } from '../types';
|
import type { UserProfile } from '../types';
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
const { mockLogger } = vi.hoisted(() => ({
|
||||||
|
mockLogger: {
|
||||||
|
info: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
error: vi.fn(),
|
||||||
|
debug: vi.fn(),
|
||||||
|
child: vi.fn().mockReturnThis(),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
vi.mock('../lib/queue', () => ({
|
vi.mock('../lib/queue', () => ({
|
||||||
serverAdapter: {
|
serverAdapter: {
|
||||||
@@ -27,19 +36,22 @@ vi.mock('../services/db/index.db', () => ({
|
|||||||
notificationRepo: {},
|
notificationRepo: {},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the queue service to control worker statuses
|
// Mock the queue service for queue status checks
|
||||||
vi.mock('../services/queueService.server', () => ({
|
vi.mock('../services/queueService.server', () => ({
|
||||||
|
flyerQueue: { name: 'flyer-processing', getJobCounts: vi.fn() },
|
||||||
|
emailQueue: { name: 'email-sending', getJobCounts: vi.fn() },
|
||||||
|
analyticsQueue: { name: 'analytics-reporting', getJobCounts: vi.fn() },
|
||||||
|
cleanupQueue: { name: 'file-cleanup', getJobCounts: vi.fn() },
|
||||||
|
weeklyAnalyticsQueue: { name: 'weekly-analytics-reporting', getJobCounts: vi.fn() },
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Mock the worker service for worker status checks
|
||||||
|
vi.mock('../services/workers.server', () => ({
|
||||||
flyerWorker: { name: 'flyer-processing', isRunning: vi.fn() },
|
flyerWorker: { name: 'flyer-processing', isRunning: vi.fn() },
|
||||||
emailWorker: { name: 'email-sending', isRunning: vi.fn() },
|
emailWorker: { name: 'email-sending', isRunning: vi.fn() },
|
||||||
analyticsWorker: { name: 'analytics-reporting', isRunning: vi.fn() },
|
analyticsWorker: { name: 'analytics-reporting', isRunning: vi.fn() },
|
||||||
cleanupWorker: { name: 'file-cleanup', isRunning: vi.fn() },
|
cleanupWorker: { name: 'file-cleanup', isRunning: vi.fn() },
|
||||||
weeklyAnalyticsWorker: { name: 'weekly-analytics-reporting', isRunning: vi.fn() },
|
weeklyAnalyticsWorker: { name: 'weekly-analytics-reporting', isRunning: vi.fn() },
|
||||||
flyerQueue: { name: 'flyer-processing', getJobCounts: vi.fn() },
|
|
||||||
emailQueue: { name: 'email-sending', getJobCounts: vi.fn() },
|
|
||||||
analyticsQueue: { name: 'analytics-reporting', getJobCounts: vi.fn() },
|
|
||||||
cleanupQueue: { name: 'file-cleanup', getJobCounts: vi.fn() },
|
|
||||||
// FIX: Add the missing weeklyAnalyticsQueue to prevent import errors in admin.routes.ts
|
|
||||||
weeklyAnalyticsQueue: { name: 'weekly-analytics-reporting', getJobCounts: vi.fn() },
|
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock other dependencies that are part of the adminRouter setup but not directly tested here
|
// Mock other dependencies that are part of the adminRouter setup but not directly tested here
|
||||||
@@ -67,8 +79,10 @@ import adminRouter from './admin.routes';
|
|||||||
|
|
||||||
// Import the mocked modules to control them
|
// Import the mocked modules to control them
|
||||||
import * as queueService from '../services/queueService.server';
|
import * as queueService from '../services/queueService.server';
|
||||||
|
import * as workerService from '../services/workers.server';
|
||||||
import { adminRepo } from '../services/db/index.db';
|
import { adminRepo } from '../services/db/index.db';
|
||||||
const mockedQueueService = queueService as Mocked<typeof queueService>;
|
const mockedQueueService = queueService as Mocked<typeof queueService>;
|
||||||
|
const mockedWorkerService = workerService as Mocked<typeof workerService>;
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', () => ({
|
||||||
@@ -137,11 +151,11 @@ describe('Admin Monitoring Routes (/api/admin)', () => {
|
|||||||
describe('GET /workers/status', () => {
|
describe('GET /workers/status', () => {
|
||||||
it('should return the status of all registered workers', async () => {
|
it('should return the status of all registered workers', async () => {
|
||||||
// Arrange: Set the mock status for each worker
|
// Arrange: Set the mock status for each worker
|
||||||
vi.mocked(mockedQueueService.flyerWorker.isRunning).mockReturnValue(true);
|
vi.mocked(mockedWorkerService.flyerWorker.isRunning).mockReturnValue(true);
|
||||||
vi.mocked(mockedQueueService.emailWorker.isRunning).mockReturnValue(true);
|
vi.mocked(mockedWorkerService.emailWorker.isRunning).mockReturnValue(true);
|
||||||
vi.mocked(mockedQueueService.analyticsWorker.isRunning).mockReturnValue(false); // Simulate one worker being stopped
|
vi.mocked(mockedWorkerService.analyticsWorker.isRunning).mockReturnValue(false); // Simulate one worker being stopped
|
||||||
vi.mocked(mockedQueueService.cleanupWorker.isRunning).mockReturnValue(true);
|
vi.mocked(mockedWorkerService.cleanupWorker.isRunning).mockReturnValue(true);
|
||||||
vi.mocked(mockedQueueService.weeklyAnalyticsWorker.isRunning).mockReturnValue(true);
|
vi.mocked(mockedWorkerService.weeklyAnalyticsWorker.isRunning).mockReturnValue(true);
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
const response = await supertest(app).get('/api/admin/workers/status');
|
const response = await supertest(app).get('/api/admin/workers/status');
|
||||||
|
|||||||
@@ -25,12 +25,14 @@ import {
|
|||||||
analyticsQueue,
|
analyticsQueue,
|
||||||
cleanupQueue,
|
cleanupQueue,
|
||||||
weeklyAnalyticsQueue,
|
weeklyAnalyticsQueue,
|
||||||
flyerWorker,
|
} from '../services/queueService.server'; // Import your queues
|
||||||
emailWorker,
|
import {
|
||||||
analyticsWorker,
|
analyticsWorker,
|
||||||
cleanupWorker,
|
cleanupWorker,
|
||||||
|
emailWorker,
|
||||||
|
flyerWorker,
|
||||||
weeklyAnalyticsWorker,
|
weeklyAnalyticsWorker,
|
||||||
} from '../services/queueService.server'; // Import your queues
|
} from '../services/workers.server';
|
||||||
import { getSimpleWeekAndYear } from '../utils/dateUtils';
|
import { getSimpleWeekAndYear } from '../utils/dateUtils';
|
||||||
import {
|
import {
|
||||||
requiredString,
|
requiredString,
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import type { Request, Response, NextFunction } from 'express';
|
|||||||
import { createMockUserProfile } from '../tests/utils/mockFactories';
|
import { createMockUserProfile } from '../tests/utils/mockFactories';
|
||||||
import type { UserProfile } from '../types';
|
import type { UserProfile } from '../types';
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
vi.mock('../services/db/index.db', () => ({
|
vi.mock('../services/db/index.db', () => ({
|
||||||
adminRepo: {
|
adminRepo: {
|
||||||
@@ -45,8 +44,9 @@ import adminRouter from './admin.routes';
|
|||||||
import { adminRepo } from '../services/db/index.db';
|
import { adminRepo } from '../services/db/index.db';
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the passport middleware
|
// Mock the passport middleware
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import supertest from 'supertest';
|
|||||||
import type { Request, Response, NextFunction } from 'express';
|
import type { Request, Response, NextFunction } from 'express';
|
||||||
import { createMockUserProfile } from '../tests/utils/mockFactories';
|
import { createMockUserProfile } from '../tests/utils/mockFactories';
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
// Mock dependencies
|
// Mock dependencies
|
||||||
vi.mock('../services/geocodingService.server', () => ({
|
vi.mock('../services/geocodingService.server', () => ({
|
||||||
@@ -50,8 +49,9 @@ import adminRouter from './admin.routes';
|
|||||||
import { geocodingService } from '../services/geocodingService.server';
|
import { geocodingService } from '../services/geocodingService.server';
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the passport middleware
|
// Mock the passport middleware
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import { createMockUserProfile, createMockAdminUserView } from '../tests/utils/m
|
|||||||
import type { UserProfile, Profile } from '../types';
|
import type { UserProfile, Profile } from '../types';
|
||||||
import { NotFoundError } from '../services/db/errors.db';
|
import { NotFoundError } from '../services/db/errors.db';
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
vi.mock('../services/db/index.db', () => ({
|
vi.mock('../services/db/index.db', () => ({
|
||||||
adminRepo: {
|
adminRepo: {
|
||||||
@@ -44,8 +43,9 @@ vi.mock('@bull-board/express', () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Import the router AFTER all mocks are defined.
|
// Import the router AFTER all mocks are defined.
|
||||||
|
|||||||
@@ -55,8 +55,9 @@ import aiRouter from './ai.routes';
|
|||||||
import { flyerQueue } from '../services/queueService.server';
|
import { flyerQueue } from '../services/queueService.server';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the passport module to control authentication for different tests.
|
// Mock the passport module to control authentication for different tests.
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import {
|
|||||||
createMockUserProfile,
|
createMockUserProfile,
|
||||||
createMockUserWithPasswordHash,
|
createMockUserWithPasswordHash,
|
||||||
} from '../tests/utils/mockFactories';
|
} from '../tests/utils/mockFactories';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
// --- FIX: Hoist passport mocks to be available for vi.mock ---
|
// --- FIX: Hoist passport mocks to be available for vi.mock ---
|
||||||
const passportMocks = vi.hoisted(() => {
|
const passportMocks = vi.hoisted(() => {
|
||||||
@@ -111,8 +110,9 @@ vi.mock('../services/db/connection.db', () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the email service
|
// Mock the email service
|
||||||
@@ -144,6 +144,8 @@ import { UniqueConstraintError } from '../services/db/errors.db'; // Import actu
|
|||||||
import express from 'express';
|
import express from 'express';
|
||||||
import { errorHandler } from '../middleware/errorHandler'; // Assuming this exists
|
import { errorHandler } from '../middleware/errorHandler'; // Assuming this exists
|
||||||
|
|
||||||
|
const { mockLogger } = await import('../tests/utils/mockLogger');
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
app.use(express.json());
|
app.use(express.json());
|
||||||
app.use(cookieParser()); // Mount BEFORE router
|
app.use(cookieParser()); // Mount BEFORE router
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import {
|
|||||||
createMockBudget,
|
createMockBudget,
|
||||||
createMockSpendingByCategory,
|
createMockSpendingByCategory,
|
||||||
} from '../tests/utils/mockFactories';
|
} from '../tests/utils/mockFactories';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
import { ForeignKeyConstraintError, NotFoundError } from '../services/db/errors.db';
|
import { ForeignKeyConstraintError, NotFoundError } from '../services/db/errors.db';
|
||||||
// 1. Mock the Service Layer directly.
|
// 1. Mock the Service Layer directly.
|
||||||
@@ -26,8 +25,9 @@ vi.mock('../services/db/index.db', () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Import the router and mocked DB AFTER all mocks are defined.
|
// Import the router and mocked DB AFTER all mocks are defined.
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import supertest from 'supertest';
|
|||||||
import type { Request, Response, NextFunction } from 'express';
|
import type { Request, Response, NextFunction } from 'express';
|
||||||
import { createMockUserProfile, createMockWatchedItemDeal } from '../tests/utils/mockFactories';
|
import { createMockUserProfile, createMockWatchedItemDeal } from '../tests/utils/mockFactories';
|
||||||
import type { WatchedItemDeal } from '../types';
|
import type { WatchedItemDeal } from '../types';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
|
|
||||||
// 1. Mock the Service Layer directly.
|
// 1. Mock the Service Layer directly.
|
||||||
@@ -19,8 +18,9 @@ import dealsRouter from './deals.routes';
|
|||||||
import { dealsRepo } from '../services/db/deals.db';
|
import { dealsRepo } from '../services/db/deals.db';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the passport middleware
|
// Mock the passport middleware
|
||||||
|
|||||||
@@ -20,11 +20,11 @@ vi.mock('../services/db/index.db', () => ({
|
|||||||
// Import the router and mocked DB AFTER all mocks are defined.
|
// Import the router and mocked DB AFTER all mocks are defined.
|
||||||
import flyerRouter from './flyer.routes';
|
import flyerRouter from './flyer.routes';
|
||||||
import * as db from '../services/db/index.db';
|
import * as db from '../services/db/index.db';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Define a reusable matcher for the logger object.
|
// Define a reusable matcher for the logger object.
|
||||||
|
|||||||
@@ -27,8 +27,9 @@ import gamificationRouter from './gamification.routes';
|
|||||||
import * as db from '../services/db/index.db';
|
import * as db from '../services/db/index.db';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Use vi.hoisted to create mutable mock function references.
|
// Use vi.hoisted to create mutable mock function references.
|
||||||
|
|||||||
@@ -32,8 +32,9 @@ import healthRouter from './health.routes';
|
|||||||
import * as dbConnection from '../services/db/connection.db';
|
import * as dbConnection from '../services/db/connection.db';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean.
|
// Mock the logger to keep test output clean.
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Cast the mocked import to a Mocked type for type-safe access to mock functions.
|
// Cast the mocked import to a Mocked type for type-safe access to mock functions.
|
||||||
|
|||||||
@@ -56,7 +56,6 @@ import {
|
|||||||
createMockUserProfile,
|
createMockUserProfile,
|
||||||
createMockUserWithPasswordHash,
|
createMockUserWithPasswordHash,
|
||||||
} from '../tests/utils/mockFactories';
|
} from '../tests/utils/mockFactories';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
|
|
||||||
// Mock dependencies before importing the passport configuration
|
// Mock dependencies before importing the passport configuration
|
||||||
vi.mock('../services/db/index.db', () => ({
|
vi.mock('../services/db/index.db', () => ({
|
||||||
@@ -75,8 +74,9 @@ vi.mock('../services/db/index.db', () => ({
|
|||||||
const mockedDb = db as Mocked<typeof db>;
|
const mockedDb = db as Mocked<typeof db>;
|
||||||
|
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', () => ({
|
||||||
// This mock is used by the module under test and can be imported in the test file.
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
logger: mockLogger,
|
// Note: We need to await the import inside the factory
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock bcrypt for password comparisons
|
// Mock bcrypt for password comparisons
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import {
|
|||||||
createMockDietaryRestriction,
|
createMockDietaryRestriction,
|
||||||
createMockAppliance,
|
createMockAppliance,
|
||||||
} from '../tests/utils/mockFactories';
|
} from '../tests/utils/mockFactories';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
|
|
||||||
// 1. Mock the Service Layer directly.
|
// 1. Mock the Service Layer directly.
|
||||||
@@ -23,8 +22,9 @@ import personalizationRouter from './personalization.routes';
|
|||||||
import * as db from '../services/db/index.db';
|
import * as db from '../services/db/index.db';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
describe('Personalization Routes (/api/personalization)', () => {
|
describe('Personalization Routes (/api/personalization)', () => {
|
||||||
|
|||||||
@@ -12,8 +12,9 @@ vi.mock('../services/db/price.db', () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Import the router AFTER other setup.
|
// Import the router AFTER other setup.
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
// src/routes/recipe.routes.test.ts
|
// src/routes/recipe.routes.test.ts
|
||||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||||
import supertest from 'supertest';
|
import supertest from 'supertest';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
import { createMockRecipe, createMockRecipeComment } from '../tests/utils/mockFactories';
|
import { createMockRecipe, createMockRecipeComment } from '../tests/utils/mockFactories';
|
||||||
import { NotFoundError } from '../services/db/errors.db';
|
import { NotFoundError } from '../services/db/errors.db';
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
@@ -22,8 +21,9 @@ import recipeRouter from './recipe.routes';
|
|||||||
import * as db from '../services/db/index.db';
|
import * as db from '../services/db/index.db';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Import the mocked db module to control its functions in tests
|
// Import the mocked db module to control its functions in tests
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
// src/routes/stats.routes.test.ts
|
// src/routes/stats.routes.test.ts
|
||||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||||
import supertest from 'supertest';
|
import supertest from 'supertest';
|
||||||
import { mockLogger } from '../tests/utils/mockLogger';
|
|
||||||
import { createTestApp } from '../tests/utils/createTestApp';
|
import { createTestApp } from '../tests/utils/createTestApp';
|
||||||
|
|
||||||
// 1. Mock the Service Layer directly.
|
// 1. Mock the Service Layer directly.
|
||||||
@@ -16,8 +15,9 @@ import statsRouter from './stats.routes';
|
|||||||
import * as db from '../services/db/index.db';
|
import * as db from '../services/db/index.db';
|
||||||
|
|
||||||
// Mock the logger to keep test output clean
|
// Mock the logger to keep test output clean
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
const expectLogger = expect.objectContaining({
|
const expectLogger = expect.objectContaining({
|
||||||
|
|||||||
@@ -86,8 +86,9 @@ vi.mock('bcrypt', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Mock the logger
|
// Mock the logger
|
||||||
vi.mock('../services/logger.server', () => ({
|
vi.mock('../services/logger.server', async () => ({
|
||||||
logger: mockLogger,
|
// Use async import to avoid hoisting issues with mockLogger
|
||||||
|
logger: (await import('../tests/utils/mockLogger')).mockLogger,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Import the router and other modules AFTER mocks are established
|
// Import the router and other modules AFTER mocks are established
|
||||||
|
|||||||
@@ -176,15 +176,13 @@ describe('API Client', () => {
|
|||||||
// We expect the promise to still resolve with the bad response, but log an error.
|
// We expect the promise to still resolve with the bad response, but log an error.
|
||||||
await apiClient.apiFetch('/some/failing/endpoint');
|
await apiClient.apiFetch('/some/failing/endpoint');
|
||||||
|
|
||||||
// FIX: Use stringContaining to be resilient to port numbers (e.g., localhost:3001)
|
|
||||||
// This checks for the essential parts of the log message without being brittle.
|
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
expect.stringContaining('apiFetch: Request to http://'),
|
expect.objectContaining({
|
||||||
'Internal Server Error',
|
status: 500,
|
||||||
);
|
body: 'Internal Server Error',
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
url: expect.stringContaining('/some/failing/endpoint'),
|
||||||
expect.stringContaining('/api/some/failing/endpoint failed with status 500'),
|
}),
|
||||||
'Internal Server Error',
|
'apiFetch: Request failed',
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -242,10 +240,6 @@ describe('API Client', () => {
|
|||||||
expect(logger.warn).toHaveBeenCalledWith('Failed to track flyer item interaction', {
|
expect(logger.warn).toHaveBeenCalledWith('Failed to track flyer item interaction', {
|
||||||
error: apiError,
|
error: apiError,
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(logger.warn).toHaveBeenCalledWith('Failed to track flyer item interaction', {
|
|
||||||
error: apiError,
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('logSearchQuery should log a warning on failure', async () => {
|
it('logSearchQuery should log a warning on failure', async () => {
|
||||||
@@ -259,8 +253,6 @@ describe('API Client', () => {
|
|||||||
was_successful: false,
|
was_successful: false,
|
||||||
});
|
});
|
||||||
expect(logger.warn).toHaveBeenCalledWith('Failed to log search query', { error: apiError });
|
expect(logger.warn).toHaveBeenCalledWith('Failed to log search query', { error: apiError });
|
||||||
|
|
||||||
expect(logger.warn).toHaveBeenCalledWith('Failed to log search query', { error: apiError });
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ describe('Geocoding Service', () => {
|
|||||||
// Assert
|
// Assert
|
||||||
expect(result).toEqual(coordinates);
|
expect(result).toEqual(coordinates);
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
{ err: 'Redis down', cacheKey: expect.any(String) },
|
{ err: expect.any(Error), cacheKey: expect.any(String) },
|
||||||
'Redis GET or JSON.parse command failed. Proceeding without cache.',
|
'Redis GET or JSON.parse command failed. Proceeding without cache.',
|
||||||
);
|
);
|
||||||
expect(mockGoogleService.geocode).toHaveBeenCalled(); // Should still proceed to fetch
|
expect(mockGoogleService.geocode).toHaveBeenCalled(); // Should still proceed to fetch
|
||||||
@@ -107,7 +107,7 @@ describe('Geocoding Service', () => {
|
|||||||
expect(mocks.mockRedis.get).toHaveBeenCalledWith(cacheKey);
|
expect(mocks.mockRedis.get).toHaveBeenCalledWith(cacheKey);
|
||||||
// The service should log the JSON parsing error and continue
|
// The service should log the JSON parsing error and continue
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
{ err: expect.any(String), cacheKey: expect.any(String) },
|
{ err: expect.any(SyntaxError), cacheKey: expect.any(String) },
|
||||||
'Redis GET or JSON.parse command failed. Proceeding without cache.',
|
'Redis GET or JSON.parse command failed. Proceeding without cache.',
|
||||||
);
|
);
|
||||||
expect(mockGoogleService.geocode).toHaveBeenCalledTimes(1);
|
expect(mockGoogleService.geocode).toHaveBeenCalledTimes(1);
|
||||||
@@ -185,7 +185,7 @@ describe('Geocoding Service', () => {
|
|||||||
// Assert
|
// Assert
|
||||||
expect(result).toEqual(coordinates);
|
expect(result).toEqual(coordinates);
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
{ err: 'Network Error' },
|
{ err: expect.any(Error) },
|
||||||
expect.stringContaining('An error occurred while calling the Google Maps Geocoding API'),
|
expect.stringContaining('An error occurred while calling the Google Maps Geocoding API'),
|
||||||
);
|
);
|
||||||
expect(mockNominatimService.geocode).toHaveBeenCalledWith(address, logger);
|
expect(mockNominatimService.geocode).toHaveBeenCalledWith(address, logger);
|
||||||
@@ -223,7 +223,7 @@ describe('Geocoding Service', () => {
|
|||||||
expect(mockGoogleService.geocode).toHaveBeenCalledTimes(1);
|
expect(mockGoogleService.geocode).toHaveBeenCalledTimes(1);
|
||||||
expect(mocks.mockRedis.set).toHaveBeenCalledTimes(1);
|
expect(mocks.mockRedis.set).toHaveBeenCalledTimes(1);
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
{ err: 'Redis SET failed', cacheKey: expect.any(String) },
|
{ err: expect.any(Error), cacheKey: expect.any(String) },
|
||||||
'Redis SET command failed. Result will not be cached.',
|
'Redis SET command failed. Result will not be cached.',
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
@@ -271,7 +271,7 @@ describe('Geocoding Service', () => {
|
|||||||
// Act & Assert
|
// Act & Assert
|
||||||
await expect(geocodingService.clearGeocodeCache(logger)).rejects.toThrow(redisError);
|
await expect(geocodingService.clearGeocodeCache(logger)).rejects.toThrow(redisError);
|
||||||
expect(logger.error).toHaveBeenCalledWith(
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
{ err: redisError.message },
|
{ err: expect.any(Error) },
|
||||||
'Failed to clear geocode cache from Redis.',
|
'Failed to clear geocode cache from Redis.',
|
||||||
);
|
);
|
||||||
expect(mocks.mockRedis.del).not.toHaveBeenCalled();
|
expect(mocks.mockRedis.del).not.toHaveBeenCalled();
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
// src/services/queueService.server.test.ts
|
// src/services/queueService.server.test.ts
|
||||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||||
import { logger as mockLogger } from './logger.server';
|
import { logger as mockLogger } from './logger.server';
|
||||||
import { EventEmitter } from 'node:events';
|
import { EventEmitter } from 'node:events'; // This was a duplicate, fixed.
|
||||||
import type { Job, Worker } from 'bullmq';
|
import type { Job, Worker } from 'bullmq';
|
||||||
import type { Mock } from 'vitest';
|
import type { Mock } from 'vitest';
|
||||||
|
|
||||||
@@ -31,6 +31,7 @@ mockRedisConnection.quit = vi.fn().mockResolvedValue('OK');
|
|||||||
// We make it a mock function that returns our shared `mockRedisConnection` instance.
|
// We make it a mock function that returns our shared `mockRedisConnection` instance.
|
||||||
vi.mock('ioredis', () => ({
|
vi.mock('ioredis', () => ({
|
||||||
default: vi.fn(function () {
|
default: vi.fn(function () {
|
||||||
|
// This was a duplicate, fixed.
|
||||||
return mockRedisConnection;
|
return mockRedisConnection;
|
||||||
}),
|
}),
|
||||||
}));
|
}));
|
||||||
@@ -51,26 +52,35 @@ vi.mock('bullmq', () => ({
|
|||||||
this.add = vi.fn();
|
this.add = vi.fn();
|
||||||
this.close = vi.fn().mockResolvedValue(undefined);
|
this.close = vi.fn().mockResolvedValue(undefined);
|
||||||
return this;
|
return this;
|
||||||
}),
|
}), // This was a duplicate, fixed.
|
||||||
|
UnrecoverableError: class UnrecoverableError extends Error {},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock('./logger.server', () => ({
|
vi.mock('./logger.server', () => ({
|
||||||
logger: {
|
logger: {
|
||||||
info: vi.fn(),
|
info: vi.fn(),
|
||||||
error: vi.fn(),
|
error: vi.fn(),
|
||||||
warn: vi.fn(),
|
warn: vi.fn(), // This was a duplicate, fixed.
|
||||||
debug: vi.fn(),
|
debug: vi.fn(),
|
||||||
|
child: vi.fn().mockReturnThis(),
|
||||||
},
|
},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Mock other dependencies that are not the focus of this test file.
|
// Mock other dependencies that are not the focus of this test file.
|
||||||
vi.mock('./aiService.server');
|
vi.mock('./aiService.server');
|
||||||
vi.mock('./emailService.server');
|
vi.mock('./emailService.server');
|
||||||
vi.mock('./db/index.db');
|
vi.mock('./db/index.db'); // This was a duplicate, fixed.
|
||||||
|
vi.mock('./flyerProcessingService.server');
|
||||||
|
vi.mock('./flyerDataTransformer');
|
||||||
|
|
||||||
describe('Queue Service Setup and Lifecycle', () => {
|
describe('Worker Service Lifecycle', () => {
|
||||||
let gracefulShutdown: (signal: string) => Promise<void>;
|
let gracefulShutdown: (signal: string) => Promise<void>; // This was a duplicate, fixed.
|
||||||
let flyerWorker: Worker, emailWorker: Worker, analyticsWorker: Worker, cleanupWorker: Worker;
|
let flyerWorker: Worker,
|
||||||
|
emailWorker: Worker,
|
||||||
|
analyticsWorker: Worker,
|
||||||
|
cleanupWorker: Worker,
|
||||||
|
weeklyAnalyticsWorker: Worker,
|
||||||
|
tokenCleanupWorker: Worker;
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
vi.clearAllMocks();
|
vi.clearAllMocks();
|
||||||
@@ -79,22 +89,27 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|||||||
vi.resetModules();
|
vi.resetModules();
|
||||||
|
|
||||||
// Dynamically import the modules after mocks are set up
|
// Dynamically import the modules after mocks are set up
|
||||||
const queueService = await import('./queueService.server');
|
const workerService = await import('./workers.server');
|
||||||
|
|
||||||
// Capture the imported instances for use in tests
|
// Capture the imported instances for use in tests
|
||||||
gracefulShutdown = queueService.gracefulShutdown;
|
gracefulShutdown = workerService.gracefulShutdown;
|
||||||
flyerWorker = queueService.flyerWorker;
|
flyerWorker = workerService.flyerWorker;
|
||||||
emailWorker = queueService.emailWorker;
|
emailWorker = workerService.emailWorker;
|
||||||
analyticsWorker = queueService.analyticsWorker;
|
analyticsWorker = workerService.analyticsWorker;
|
||||||
cleanupWorker = queueService.cleanupWorker;
|
cleanupWorker = workerService.cleanupWorker;
|
||||||
|
weeklyAnalyticsWorker = workerService.weeklyAnalyticsWorker;
|
||||||
|
tokenCleanupWorker = workerService.tokenCleanupWorker;
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
// Clean up all event listeners on the mock connection to prevent open handles.
|
// Clean up all event listeners on the mock connection to prevent open handles.
|
||||||
mockRedisConnection.removeAllListeners();
|
mockRedisConnection.removeAllListeners();
|
||||||
|
vi.useRealTimers();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should log a success message when Redis connects', () => {
|
it('should log a success message when Redis connects', () => {
|
||||||
|
// Re-import redis.server to trigger its event listeners with the mock
|
||||||
|
import('./redis.server');
|
||||||
// Act: Simulate the 'connect' event on the mock Redis connection
|
// Act: Simulate the 'connect' event on the mock Redis connection
|
||||||
mockRedisConnection.emit('connect');
|
mockRedisConnection.emit('connect');
|
||||||
|
|
||||||
@@ -103,6 +118,7 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should log an error message when Redis connection fails', () => {
|
it('should log an error message when Redis connection fails', () => {
|
||||||
|
import('./redis.server');
|
||||||
const redisError = new Error('Connection refused');
|
const redisError = new Error('Connection refused');
|
||||||
mockRedisConnection.emit('error', redisError);
|
mockRedisConnection.emit('error', redisError);
|
||||||
expect(mockLogger.error).toHaveBeenCalledWith({ err: redisError }, '[Redis] Connection error.');
|
expect(mockLogger.error).toHaveBeenCalledWith({ err: redisError }, '[Redis] Connection error.');
|
||||||
@@ -111,7 +127,14 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|||||||
it('should attach completion and failure listeners to all workers', () => {
|
it('should attach completion and failure listeners to all workers', () => {
|
||||||
// The workers are instantiated when the module is imported in beforeEach.
|
// The workers are instantiated when the module is imported in beforeEach.
|
||||||
// We just need to check that the 'on' method was called for each event.
|
// We just need to check that the 'on' method was called for each event.
|
||||||
const workers = [flyerWorker, emailWorker, analyticsWorker, cleanupWorker];
|
const workers = [
|
||||||
|
flyerWorker,
|
||||||
|
emailWorker,
|
||||||
|
analyticsWorker,
|
||||||
|
cleanupWorker,
|
||||||
|
weeklyAnalyticsWorker,
|
||||||
|
tokenCleanupWorker,
|
||||||
|
];
|
||||||
for (const worker of workers) {
|
for (const worker of workers) {
|
||||||
expect(worker.on).toHaveBeenCalledWith('completed', expect.any(Function));
|
expect(worker.on).toHaveBeenCalledWith('completed', expect.any(Function));
|
||||||
expect(worker.on).toHaveBeenCalledWith('failed', expect.any(Function));
|
expect(worker.on).toHaveBeenCalledWith('failed', expect.any(Function));
|
||||||
@@ -171,15 +194,40 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should close all workers, queues, the redis connection, and exit the process', async () => {
|
it('should close all workers, queues, the redis connection, and exit the process', async () => {
|
||||||
|
// We need to import the queues to check if their close methods are called.
|
||||||
|
const {
|
||||||
|
flyerQueue,
|
||||||
|
emailQueue,
|
||||||
|
analyticsQueue,
|
||||||
|
cleanupQueue,
|
||||||
|
weeklyAnalyticsQueue,
|
||||||
|
tokenCleanupQueue,
|
||||||
|
} = await import('./queues.server');
|
||||||
|
|
||||||
await gracefulShutdown('SIGINT');
|
await gracefulShutdown('SIGINT');
|
||||||
expect((flyerWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
|
||||||
expect((emailWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
// Verify workers are closed
|
||||||
expect((analyticsWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
expect((flyerWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
expect((cleanupWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
|
expect((analyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
|
expect((cleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
|
expect((weeklyAnalyticsWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
|
expect((tokenCleanupWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Verify queues are closed
|
||||||
|
expect((flyerQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
||||||
|
expect((emailQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
||||||
|
expect((analyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
||||||
|
expect((cleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
||||||
|
expect((weeklyAnalyticsQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
||||||
|
expect((tokenCleanupQueue as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
||||||
|
|
||||||
// Verify the redis connection is also closed
|
// Verify the redis connection is also closed
|
||||||
expect(mockRedisConnection.quit).toHaveBeenCalledTimes(1);
|
expect(mockRedisConnection.quit).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Check for the correct success log message from workers.server.ts
|
||||||
expect(mockLogger.info).toHaveBeenCalledWith(
|
expect(mockLogger.info).toHaveBeenCalledWith(
|
||||||
'[Shutdown] All workers, queues, and connections closed successfully.',
|
'[Shutdown] All resources closed successfully.',
|
||||||
);
|
);
|
||||||
expect(processExitSpy).toHaveBeenCalledWith(0);
|
expect(processExitSpy).toHaveBeenCalledWith(0);
|
||||||
});
|
});
|
||||||
@@ -192,12 +240,34 @@ describe('Queue Service Setup and Lifecycle', () => {
|
|||||||
await gracefulShutdown('SIGTERM');
|
await gracefulShutdown('SIGTERM');
|
||||||
|
|
||||||
// It should still attempt to close all workers
|
// It should still attempt to close all workers
|
||||||
expect((emailWorker as unknown as MockQueueInstance).close).toHaveBeenCalled();
|
expect((emailWorker as unknown as MockWorkerInstance).close).toHaveBeenCalled();
|
||||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
{ err: closeError, resource: 'flyerWorker' },
|
{ err: closeError, resource: 'flyerWorker' },
|
||||||
'[Shutdown] Error closing resource.',
|
`[Shutdown] Error closing flyerWorker.`,
|
||||||
);
|
);
|
||||||
expect(processExitSpy).toHaveBeenCalledWith(1);
|
expect(processExitSpy).toHaveBeenCalledWith(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should timeout if shutdown takes too long', async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
// Make one of the close calls hang indefinitely
|
||||||
|
(flyerWorker.close as Mock).mockReturnValue(new Promise(() => {}));
|
||||||
|
|
||||||
|
// Run shutdown but don't await it fully, as it will hang
|
||||||
|
const shutdownPromise = gracefulShutdown('SIGTERM');
|
||||||
|
|
||||||
|
// Advance timers past the timeout threshold
|
||||||
|
await vi.advanceTimersByTimeAsync(31000);
|
||||||
|
|
||||||
|
// Now await the promise to see the timeout result
|
||||||
|
await shutdownPromise;
|
||||||
|
|
||||||
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
|
`[Shutdown] Graceful shutdown timed out after 30 seconds. Forcing exit.`,
|
||||||
|
);
|
||||||
|
expect(processExitSpy).toHaveBeenCalledWith(1);
|
||||||
|
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,424 +1,32 @@
|
|||||||
// src/services/queueService.server.ts
|
// src/services/queueService.server.ts
|
||||||
import { Queue, Worker, Job, UnrecoverableError } from 'bullmq';
|
|
||||||
import IORedis from 'ioredis'; // Correctly imported
|
|
||||||
import fsPromises from 'node:fs/promises';
|
|
||||||
import { exec } from 'child_process';
|
|
||||||
import { promisify } from 'util';
|
|
||||||
|
|
||||||
import { logger } from './logger.server';
|
import { logger } from './logger.server';
|
||||||
import { aiService } from './aiService.server';
|
import { connection } from './redis.server';
|
||||||
import * as emailService from './emailService.server';
|
|
||||||
import * as db from './db/index.db';
|
|
||||||
import {
|
import {
|
||||||
FlyerProcessingService,
|
flyerQueue,
|
||||||
type FlyerJobData,
|
emailQueue,
|
||||||
type IFileSystem,
|
analyticsQueue,
|
||||||
} from './flyerProcessingService.server';
|
weeklyAnalyticsQueue,
|
||||||
import { FlyerDataTransformer } from './flyerDataTransformer';
|
cleanupQueue,
|
||||||
|
tokenCleanupQueue,
|
||||||
|
} from './queues.server';
|
||||||
|
|
||||||
export const connection = new IORedis(process.env.REDIS_URL!, {
|
// Re-export everything for backward compatibility where possible
|
||||||
maxRetriesPerRequest: null, // Important for BullMQ
|
export { connection } from './redis.server';
|
||||||
password: process.env.REDIS_PASSWORD, // Add the password from environment variables
|
export * from './queues.server';
|
||||||
});
|
|
||||||
|
|
||||||
// --- Redis Connection Event Listeners ---
|
// We do NOT export workers here anymore to prevent side effects.
|
||||||
connection.on('connect', () => {
|
// Consumers needing workers must import from './workers.server'.
|
||||||
logger.info('[Redis] Connection established successfully.');
|
|
||||||
});
|
|
||||||
|
|
||||||
connection.on('error', (err) => {
|
|
||||||
// This is crucial for diagnosing Redis connection issues. // The patch requested this specific error handling.
|
|
||||||
logger.error({ err }, '[Redis] Connection error.');
|
|
||||||
});
|
|
||||||
|
|
||||||
const execAsync = promisify(exec);
|
|
||||||
// --- Queues ---
|
|
||||||
export const flyerQueue = new Queue<FlyerJobData>('flyer-processing', {
|
|
||||||
connection,
|
|
||||||
defaultJobOptions: {
|
|
||||||
attempts: 3, // Attempt a job 3 times before marking it as failed.
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
delay: 5000, // Start with a 5-second delay for the first retry
|
|
||||||
},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
export const emailQueue = new Queue<EmailJobData>('email-sending', {
|
|
||||||
connection,
|
|
||||||
defaultJobOptions: {
|
|
||||||
attempts: 5, // Emails can be retried more aggressively
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
delay: 10000, // Start with a 10-second delay
|
|
||||||
},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
export const analyticsQueue = new Queue<AnalyticsJobData>('analytics-reporting', {
|
|
||||||
connection,
|
|
||||||
defaultJobOptions: {
|
|
||||||
attempts: 2, // Analytics can be intensive, so fewer retries might be desired.
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
delay: 60000, // Wait a minute before retrying.
|
|
||||||
},
|
|
||||||
// Remove job from queue on completion to save space, as results are in the DB.
|
|
||||||
removeOnComplete: true,
|
|
||||||
removeOnFail: 50, // Keep the last 50 failed jobs for inspection.
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
export const weeklyAnalyticsQueue = new Queue<WeeklyAnalyticsJobData>(
|
|
||||||
'weekly-analytics-reporting',
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
defaultJobOptions: {
|
|
||||||
attempts: 2,
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
delay: 3600000, // 1 hour delay for retries
|
|
||||||
},
|
|
||||||
removeOnComplete: true,
|
|
||||||
removeOnFail: 50,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
export const cleanupQueue = new Queue<CleanupJobData>('file-cleanup', {
|
|
||||||
connection,
|
|
||||||
defaultJobOptions: {
|
|
||||||
attempts: 3,
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
delay: 30000, // Retry cleanup after 30 seconds
|
|
||||||
},
|
|
||||||
removeOnComplete: true, // No need to keep successful cleanup jobs
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
export const tokenCleanupQueue = new Queue<TokenCleanupJobData>('token-cleanup', {
|
|
||||||
connection,
|
|
||||||
defaultJobOptions: {
|
|
||||||
attempts: 2,
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
delay: 3600000, // 1 hour delay
|
|
||||||
},
|
|
||||||
removeOnComplete: true,
|
|
||||||
removeOnFail: 10,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
// --- Job Data Interfaces ---
|
|
||||||
|
|
||||||
interface EmailJobData {
|
|
||||||
to: string;
|
|
||||||
subject: string;
|
|
||||||
text: string;
|
|
||||||
html: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the data for an analytics job.
|
* A function to gracefully shut down all queues and connections.
|
||||||
*/
|
* This is for the API process which only uses queues.
|
||||||
interface AnalyticsJobData {
|
* For worker processes, use the gracefulShutdown from workers.server.ts
|
||||||
reportDate: string; // e.g., '2024-10-26'
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Defines the data for a weekly analytics job.
|
|
||||||
*/
|
|
||||||
interface WeeklyAnalyticsJobData {
|
|
||||||
reportYear: number;
|
|
||||||
reportWeek: number; // ISO week number (1-53)
|
|
||||||
}
|
|
||||||
|
|
||||||
interface CleanupJobData {
|
|
||||||
flyerId: number;
|
|
||||||
// An array of absolute file paths to be deleted. Made optional for manual cleanup triggers.
|
|
||||||
paths?: string[];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Defines the data for a token cleanup job.
|
|
||||||
*/
|
|
||||||
interface TokenCleanupJobData {
|
|
||||||
timestamp: string; // ISO string to ensure the job is unique per run
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Worker Instantiation ---
|
|
||||||
|
|
||||||
// Create an adapter for fsPromises to match the IFileSystem interface.
|
|
||||||
const fsAdapter: IFileSystem = {
|
|
||||||
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
|
|
||||||
unlink: (path: string) => fsPromises.unlink(path),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Instantiate the service with its real dependencies
|
|
||||||
const flyerProcessingService = new FlyerProcessingService(
|
|
||||||
aiService,
|
|
||||||
db,
|
|
||||||
fsAdapter,
|
|
||||||
execAsync,
|
|
||||||
cleanupQueue, // Inject the cleanup queue to break the circular dependency
|
|
||||||
new FlyerDataTransformer(), // Inject the new transformer
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A generic function to attach logging event listeners to any worker.
|
|
||||||
* This centralizes logging for job completion and final failure.
|
|
||||||
* @param worker The BullMQ worker instance.
|
|
||||||
*/
|
|
||||||
const attachWorkerEventListeners = (worker: Worker) => {
|
|
||||||
worker.on('completed', (job: Job, returnValue: unknown) => {
|
|
||||||
logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`);
|
|
||||||
});
|
|
||||||
|
|
||||||
worker.on('failed', (job: Job | undefined, error: Error) => {
|
|
||||||
// This event fires after all retries have failed.
|
|
||||||
logger.error(
|
|
||||||
{ err: error, jobData: job?.data },
|
|
||||||
`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
export const flyerWorker = new Worker<FlyerJobData>(
|
|
||||||
'flyer-processing', // Must match the queue name
|
|
||||||
async (job) => {
|
|
||||||
try {
|
|
||||||
// The processJob method creates its own job-specific logger internally.
|
|
||||||
return await flyerProcessingService.processJob(job);
|
|
||||||
} catch (error: any) {
|
|
||||||
// Check for quota errors or other unrecoverable errors from the AI service
|
|
||||||
const errorMessage = error?.message || '';
|
|
||||||
if (
|
|
||||||
errorMessage.includes('quota') ||
|
|
||||||
errorMessage.includes('429') ||
|
|
||||||
errorMessage.includes('RESOURCE_EXHAUSTED')
|
|
||||||
) {
|
|
||||||
logger.error(
|
|
||||||
{ err: error, jobId: job.id },
|
|
||||||
'[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.',
|
|
||||||
);
|
|
||||||
throw new UnrecoverableError(errorMessage);
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
/**
|
|
||||||
* A dedicated worker process for sending emails.
|
|
||||||
*/
|
|
||||||
export const emailWorker = new Worker<EmailJobData>(
|
|
||||||
'email-sending',
|
|
||||||
async (job: Job<EmailJobData>) => {
|
|
||||||
const { to, subject } = job.data;
|
|
||||||
// Create a job-specific logger instance
|
|
||||||
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
|
||||||
jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`);
|
|
||||||
try {
|
|
||||||
await emailService.sendEmail(job.data, jobLogger);
|
|
||||||
} catch (error: unknown) {
|
|
||||||
logger.error(
|
|
||||||
{
|
|
||||||
err: error,
|
|
||||||
jobData: job.data,
|
|
||||||
},
|
|
||||||
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
||||||
);
|
|
||||||
// Re-throw to let BullMQ handle the failure and retry.
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A dedicated worker for generating daily analytics reports.
|
|
||||||
* This is a placeholder for the actual report generation logic.
|
|
||||||
*/
|
|
||||||
export const analyticsWorker = new Worker<AnalyticsJobData>(
|
|
||||||
'analytics-reporting',
|
|
||||||
async (job: Job<AnalyticsJobData>) => {
|
|
||||||
const { reportDate } = job.data;
|
|
||||||
logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`);
|
|
||||||
try {
|
|
||||||
// Special case for testing the retry mechanism
|
|
||||||
if (reportDate === 'FAIL') {
|
|
||||||
throw new Error('This is a test failure for the analytics job.');
|
|
||||||
}
|
|
||||||
|
|
||||||
// In a real implementation, you would call a database function here.
|
|
||||||
// For example: await db.generateDailyAnalyticsReport(reportDate);
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 10000)); // Simulate a 10-second task
|
|
||||||
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
|
|
||||||
} catch (error: unknown) {
|
|
||||||
// Standardize error logging.
|
|
||||||
logger.error({ err: error, jobData: job.data },
|
|
||||||
`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
||||||
);
|
|
||||||
throw error; // Re-throw to let BullMQ handle the failure and retry.
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A dedicated worker for cleaning up flyer-related files from the filesystem.
|
|
||||||
* This is triggered manually by an admin after a flyer has been reviewed.
|
|
||||||
*/
|
|
||||||
export const cleanupWorker = new Worker<CleanupJobData>(
|
|
||||||
// This worker now handles two types of cleanup jobs.
|
|
||||||
'file-cleanup', // The queue name
|
|
||||||
async (job: Job<CleanupJobData>) => {
|
|
||||||
// Destructure the data from the job payload.
|
|
||||||
const { flyerId, paths } = job.data;
|
|
||||||
logger.info(
|
|
||||||
{ paths },
|
|
||||||
`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`,
|
|
||||||
);
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (!paths || paths.length === 0) {
|
|
||||||
logger.warn(
|
|
||||||
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`,
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over the file paths provided in the job data and delete each one.
|
|
||||||
for (const filePath of paths) {
|
|
||||||
try {
|
|
||||||
await fsAdapter.unlink(filePath);
|
|
||||||
logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`);
|
|
||||||
} catch (unlinkError: unknown) {
|
|
||||||
// If the file doesn't exist, it's a success from our perspective.
|
|
||||||
// We can log it as a warning and continue without failing the job.
|
|
||||||
if (
|
|
||||||
unlinkError instanceof Error &&
|
|
||||||
'code' in unlinkError &&
|
|
||||||
unlinkError.code === 'ENOENT'
|
|
||||||
) {
|
|
||||||
logger.warn(
|
|
||||||
`[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
throw unlinkError; // For any other error (e.g., permissions), re-throw to fail the job.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info(
|
|
||||||
`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`,
|
|
||||||
);
|
|
||||||
} catch (error: unknown) {
|
|
||||||
// Standardize error logging.
|
|
||||||
logger.error(
|
|
||||||
{ err: error },
|
|
||||||
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
||||||
);
|
|
||||||
throw error; // Re-throw to let BullMQ handle the failure and retry.
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A dedicated worker for generating weekly analytics reports.
|
|
||||||
* This is a placeholder for the actual report generation logic.
|
|
||||||
*/
|
|
||||||
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
|
|
||||||
'weekly-analytics-reporting',
|
|
||||||
async (job: Job<WeeklyAnalyticsJobData>) => {
|
|
||||||
const { reportYear, reportWeek } = job.data;
|
|
||||||
logger.info(
|
|
||||||
{ reportYear, reportWeek },
|
|
||||||
`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`,
|
|
||||||
);
|
|
||||||
try {
|
|
||||||
// Simulate a longer-running task for weekly reports
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 30000)); // Simulate 30-second task
|
|
||||||
logger.info(
|
|
||||||
`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`,
|
|
||||||
);
|
|
||||||
} catch (error: unknown) {
|
|
||||||
// Standardize error logging.
|
|
||||||
logger.error(
|
|
||||||
{ err: error, jobData: job.data },
|
|
||||||
`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
||||||
);
|
|
||||||
throw error; // Re-throw to let BullMQ handle the failure and retry.
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A dedicated worker for cleaning up expired password reset tokens.
|
|
||||||
*/
|
|
||||||
export const tokenCleanupWorker = new Worker<TokenCleanupJobData>(
|
|
||||||
'token-cleanup',
|
|
||||||
async (job: Job<TokenCleanupJobData>) => {
|
|
||||||
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
|
||||||
jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.');
|
|
||||||
try {
|
|
||||||
const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger);
|
|
||||||
jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`);
|
|
||||||
return { deletedCount };
|
|
||||||
} catch (error: unknown) {
|
|
||||||
jobLogger.error({ err: error }, `[TokenCleanupWorker] Job ${job.id} failed.`);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
connection,
|
|
||||||
concurrency: 1, // This is a low-priority, non-intensive task.
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// --- Attach Event Listeners to All Workers ---
|
|
||||||
attachWorkerEventListeners(flyerWorker);
|
|
||||||
attachWorkerEventListeners(emailWorker);
|
|
||||||
attachWorkerEventListeners(analyticsWorker);
|
|
||||||
attachWorkerEventListeners(cleanupWorker);
|
|
||||||
attachWorkerEventListeners(weeklyAnalyticsWorker);
|
|
||||||
attachWorkerEventListeners(tokenCleanupWorker);
|
|
||||||
|
|
||||||
logger.info('All workers started and listening for jobs.');
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A function to gracefully shut down all queue workers and connections.
|
|
||||||
* This is essential for preventing jobs from getting stuck in an 'active' state
|
|
||||||
* when the application process is terminated.
|
|
||||||
* @param signal The signal that triggered the shutdown (e.g., 'SIGINT').
|
|
||||||
*/
|
*/
|
||||||
export const gracefulShutdown = async (signal: string) => {
|
export const gracefulShutdown = async (signal: string) => {
|
||||||
logger.info(`[Shutdown] Received ${signal}. Closing all workers and queues...`);
|
logger.info(`[Shutdown] Received ${signal}. Closing all queues...`);
|
||||||
let exitCode = 0; // Default to success
|
let exitCode = 0; // Default to success
|
||||||
|
|
||||||
const resources = [
|
const resources = [
|
||||||
{ name: 'flyerWorker', close: () => flyerWorker.close() },
|
|
||||||
{ name: 'emailWorker', close: () => emailWorker.close() },
|
|
||||||
{ name: 'analyticsWorker', close: () => analyticsWorker.close() },
|
|
||||||
{ name: 'cleanupWorker', close: () => cleanupWorker.close() },
|
|
||||||
{ name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() },
|
|
||||||
{ name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() },
|
|
||||||
{ name: 'flyerQueue', close: () => flyerQueue.close() },
|
{ name: 'flyerQueue', close: () => flyerQueue.close() },
|
||||||
{ name: 'emailQueue', close: () => emailQueue.close() },
|
{ name: 'emailQueue', close: () => emailQueue.close() },
|
||||||
{ name: 'analyticsQueue', close: () => analyticsQueue.close() },
|
{ name: 'analyticsQueue', close: () => analyticsQueue.close() },
|
||||||
@@ -441,7 +49,7 @@ export const gracefulShutdown = async (signal: string) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (exitCode === 0) {
|
if (exitCode === 0) {
|
||||||
logger.info('[Shutdown] All workers, queues, and connections closed successfully.');
|
logger.info('[Shutdown] All queues and connections closed successfully.');
|
||||||
} else {
|
} else {
|
||||||
logger.warn('[Shutdown] Graceful shutdown completed with errors.');
|
logger.warn('[Shutdown] Graceful shutdown completed with errors.');
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ describe('Queue Workers', () => {
|
|||||||
const emailError = 'SMTP server is down'; // Reject with a string
|
const emailError = 'SMTP server is down'; // Reject with a string
|
||||||
mocks.sendEmail.mockRejectedValue(emailError);
|
mocks.sendEmail.mockRejectedValue(emailError);
|
||||||
|
|
||||||
await expect(emailProcessor(job)).rejects.toBe(emailError);
|
await expect(emailProcessor(job)).rejects.toThrow(emailError);
|
||||||
|
|
||||||
// The worker should wrap the string in an Error object for logging
|
// The worker should wrap the string in an Error object for logging
|
||||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
|
|||||||
96
src/services/queues.server.ts
Normal file
96
src/services/queues.server.ts
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import { connection } from './redis.server';
|
||||||
|
import type { FlyerJobData } from './flyerProcessingService.server';
|
||||||
|
|
||||||
|
// --- Job Data Interfaces ---
|
||||||
|
|
||||||
|
export interface EmailJobData {
|
||||||
|
to: string;
|
||||||
|
subject: string;
|
||||||
|
text: string;
|
||||||
|
html: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface AnalyticsJobData {
|
||||||
|
reportDate: string; // e.g., '2024-10-26'
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface WeeklyAnalyticsJobData {
|
||||||
|
reportYear: number;
|
||||||
|
reportWeek: number; // ISO week number (1-53)
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface CleanupJobData {
|
||||||
|
flyerId: number;
|
||||||
|
paths?: string[];
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TokenCleanupJobData {
|
||||||
|
timestamp: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Queues ---
|
||||||
|
|
||||||
|
export const flyerQueue = new Queue<FlyerJobData>('flyer-processing', {
|
||||||
|
connection,
|
||||||
|
defaultJobOptions: {
|
||||||
|
attempts: 3,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 5000,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
export const emailQueue = new Queue<EmailJobData>('email-sending', {
|
||||||
|
connection,
|
||||||
|
defaultJobOptions: {
|
||||||
|
attempts: 5,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 10000,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
export const analyticsQueue = new Queue<AnalyticsJobData>('analytics-reporting', {
|
||||||
|
connection,
|
||||||
|
defaultJobOptions: {
|
||||||
|
attempts: 2,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
delay: 60000,
|
||||||
|
},
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: 50,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
export const weeklyAnalyticsQueue = new Queue<WeeklyAnalyticsJobData>('weekly-analytics-reporting', {
|
||||||
|
connection,
|
||||||
|
defaultJobOptions: {
|
||||||
|
attempts: 2,
|
||||||
|
backoff: { type: 'exponential', delay: 3600000 },
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: 50,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
export const cleanupQueue = new Queue<CleanupJobData>('file-cleanup', {
|
||||||
|
connection,
|
||||||
|
defaultJobOptions: {
|
||||||
|
attempts: 3,
|
||||||
|
backoff: { type: 'exponential', delay: 30000 },
|
||||||
|
removeOnComplete: true,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
export const tokenCleanupQueue = new Queue<TokenCleanupJobData>('token-cleanup', {
|
||||||
|
connection,
|
||||||
|
defaultJobOptions: {
|
||||||
|
attempts: 2,
|
||||||
|
backoff: { type: 'exponential', delay: 3600000 },
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: 10,
|
||||||
|
},
|
||||||
|
});
|
||||||
16
src/services/redis.server.ts
Normal file
16
src/services/redis.server.ts
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
import IORedis from 'ioredis';
|
||||||
|
import { logger } from './logger.server';
|
||||||
|
|
||||||
|
export const connection = new IORedis(process.env.REDIS_URL!, {
|
||||||
|
maxRetriesPerRequest: null, // Important for BullMQ
|
||||||
|
password: process.env.REDIS_PASSWORD,
|
||||||
|
});
|
||||||
|
|
||||||
|
// --- Redis Connection Event Listeners ---
|
||||||
|
connection.on('connect', () => {
|
||||||
|
logger.info('[Redis] Connection established successfully.');
|
||||||
|
});
|
||||||
|
|
||||||
|
connection.on('error', (err) => {
|
||||||
|
logger.error({ err }, '[Redis] Connection error.');
|
||||||
|
});
|
||||||
30
src/services/worker.ts
Normal file
30
src/services/worker.ts
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
import { gracefulShutdown } from './workers.server';
|
||||||
|
import { logger } from './logger.server';
|
||||||
|
|
||||||
|
logger.info('[Worker] Initializing worker process...');
|
||||||
|
|
||||||
|
// The workers are instantiated as side effects of importing workers.server.ts.
|
||||||
|
// This pattern ensures they start immediately upon import.
|
||||||
|
|
||||||
|
// Handle graceful shutdown
|
||||||
|
const handleShutdown = (signal: string) => {
|
||||||
|
logger.info(`[Worker] Received ${signal}. Initiating graceful shutdown...`);
|
||||||
|
gracefulShutdown(signal).catch((error: unknown) => {
|
||||||
|
logger.error({ err: error }, '[Worker] Error during shutdown.');
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
process.on('SIGINT', () => handleShutdown('SIGINT'));
|
||||||
|
process.on('SIGTERM', () => handleShutdown('SIGTERM'));
|
||||||
|
|
||||||
|
// Catch unhandled errors to log them before crashing
|
||||||
|
process.on('uncaughtException', (err) => {
|
||||||
|
logger.error({ err }, '[Worker] Uncaught exception');
|
||||||
|
});
|
||||||
|
|
||||||
|
process.on('unhandledRejection', (reason, promise) => {
|
||||||
|
logger.error({ reason, promise }, '[Worker] Unhandled Rejection');
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info('[Worker] Worker process is running and listening for jobs.');
|
||||||
346
src/services/workers.server.test.ts
Normal file
346
src/services/workers.server.test.ts
Normal file
@@ -0,0 +1,346 @@
|
|||||||
|
// src/services/workers.server.test.ts
|
||||||
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||||
|
import type { Job } from 'bullmq';
|
||||||
|
|
||||||
|
// --- Hoisted Mocks ---
|
||||||
|
const mocks = vi.hoisted(() => {
|
||||||
|
// This object will store the processor functions captured from the worker constructors.
|
||||||
|
const capturedProcessors: Record<string, (job: Job) => Promise<unknown>> = {};
|
||||||
|
|
||||||
|
return {
|
||||||
|
sendEmail: vi.fn(),
|
||||||
|
unlink: vi.fn(),
|
||||||
|
processFlyerJob: vi.fn(),
|
||||||
|
capturedProcessors,
|
||||||
|
deleteExpiredResetTokens: vi.fn(),
|
||||||
|
// Mock the Worker constructor to capture the processor function. It must be a
|
||||||
|
// `function` and not an arrow function so it can be called with `new`.
|
||||||
|
MockWorker: vi.fn(function (name: string, processor: (job: Job) => Promise<unknown>) {
|
||||||
|
if (processor) {
|
||||||
|
capturedProcessors[name] = processor;
|
||||||
|
}
|
||||||
|
// Return a mock worker instance, though it's not used in this test file.
|
||||||
|
return { on: vi.fn(), close: vi.fn() };
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
// --- Mock Modules ---
|
||||||
|
vi.mock('./emailService.server', async (importOriginal) => {
|
||||||
|
const actual = await importOriginal<typeof import('./emailService.server')>();
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
// We only need to mock the specific function being called by the worker.
|
||||||
|
// The rest of the module can retain its original implementation if needed elsewhere.
|
||||||
|
sendEmail: mocks.sendEmail,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
// The workers use an `fsAdapter`. We can mock the underlying `fsPromises`
|
||||||
|
// that the adapter is built from in queueService.server.ts.
|
||||||
|
vi.mock('node:fs/promises', () => ({
|
||||||
|
default: {
|
||||||
|
unlink: mocks.unlink,
|
||||||
|
// Add other fs functions if needed by other tests
|
||||||
|
readdir: vi.fn(),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock('./logger.server', () => ({
|
||||||
|
logger: {
|
||||||
|
info: vi.fn(),
|
||||||
|
error: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
debug: vi.fn(),
|
||||||
|
child: vi.fn().mockReturnThis(),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock('./db/index.db', () => ({
|
||||||
|
userRepo: {
|
||||||
|
deleteExpiredResetTokens: mocks.deleteExpiredResetTokens,
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Mock bullmq to capture the processor functions passed to the Worker constructor
|
||||||
|
import { logger as mockLogger } from './logger.server';
|
||||||
|
vi.mock('bullmq', () => ({
|
||||||
|
Worker: mocks.MockWorker,
|
||||||
|
// FIX: Use a standard function for the mock constructor to allow `new Queue(...)` to work.
|
||||||
|
Queue: vi.fn(function () {
|
||||||
|
return { add: vi.fn() };
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Mock flyerProcessingService.server as flyerWorker depends on it
|
||||||
|
vi.mock('./flyerProcessingService.server', () => ({
|
||||||
|
FlyerProcessingService: class {
|
||||||
|
processJob = mocks.processFlyerJob;
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Mock flyerDataTransformer as it's a dependency of FlyerProcessingService
|
||||||
|
vi.mock('./flyerDataTransformer', () => ({
|
||||||
|
FlyerDataTransformer: class {
|
||||||
|
transform = vi.fn(); // Mock transform method
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Helper to create a mock BullMQ Job object
|
||||||
|
const createMockJob = <T>(data: T): Job<T> => {
|
||||||
|
return {
|
||||||
|
id: 'job-1',
|
||||||
|
data,
|
||||||
|
updateProgress: vi.fn().mockResolvedValue(undefined),
|
||||||
|
log: vi.fn().mockResolvedValue(undefined),
|
||||||
|
opts: { attempts: 3 },
|
||||||
|
attemptsMade: 1,
|
||||||
|
trace: vi.fn().mockResolvedValue(undefined),
|
||||||
|
moveToCompleted: vi.fn().mockResolvedValue(undefined),
|
||||||
|
moveToFailed: vi.fn().mockResolvedValue(undefined),
|
||||||
|
} as unknown as Job<T>;
|
||||||
|
};
|
||||||
|
|
||||||
|
describe('Queue Workers', () => {
|
||||||
|
// These will hold the captured processor functions for each test.
|
||||||
|
let flyerProcessor: (job: Job) => Promise<unknown>;
|
||||||
|
let emailProcessor: (job: Job) => Promise<unknown>;
|
||||||
|
let analyticsProcessor: (job: Job) => Promise<unknown>;
|
||||||
|
let cleanupProcessor: (job: Job) => Promise<unknown>;
|
||||||
|
let weeklyAnalyticsProcessor: (job: Job) => Promise<unknown>;
|
||||||
|
let tokenCleanupProcessor: (job: Job) => Promise<unknown>;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
|
||||||
|
// Reset default mock implementations for hoisted mocks
|
||||||
|
mocks.sendEmail.mockResolvedValue(undefined);
|
||||||
|
mocks.unlink.mockResolvedValue(undefined);
|
||||||
|
mocks.processFlyerJob.mockResolvedValue({ flyerId: 123 }); // Default success for flyer processing
|
||||||
|
mocks.deleteExpiredResetTokens.mockResolvedValue(5);
|
||||||
|
|
||||||
|
// Reset modules to re-evaluate the workers.server.ts file with fresh mocks.
|
||||||
|
// This ensures that new worker instances are created and their processors are captured for each test.
|
||||||
|
vi.resetModules();
|
||||||
|
|
||||||
|
// Dynamically import the module under test AFTER mocks are reset.
|
||||||
|
// This will trigger the instantiation of the workers, and our mocked Worker constructor will capture the processors.
|
||||||
|
await import('./workers.server');
|
||||||
|
|
||||||
|
// Re-capture the processors for each test to ensure isolation.
|
||||||
|
flyerProcessor = mocks.capturedProcessors['flyer-processing'];
|
||||||
|
emailProcessor = mocks.capturedProcessors['email-sending'];
|
||||||
|
analyticsProcessor = mocks.capturedProcessors['analytics-reporting'];
|
||||||
|
cleanupProcessor = mocks.capturedProcessors['file-cleanup'];
|
||||||
|
weeklyAnalyticsProcessor = mocks.capturedProcessors['weekly-analytics-reporting'];
|
||||||
|
tokenCleanupProcessor = mocks.capturedProcessors['token-cleanup'];
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('flyerWorker', () => {
|
||||||
|
it('should call flyerProcessingService.processJob with the job data', async () => {
|
||||||
|
const jobData = {
|
||||||
|
filePath: '/tmp/flyer.pdf',
|
||||||
|
originalFileName: 'flyer.pdf',
|
||||||
|
checksum: 'abc',
|
||||||
|
};
|
||||||
|
const job = createMockJob(jobData);
|
||||||
|
|
||||||
|
await flyerProcessor(job);
|
||||||
|
|
||||||
|
expect(mocks.processFlyerJob).toHaveBeenCalledTimes(1);
|
||||||
|
expect(mocks.processFlyerJob).toHaveBeenCalledWith(job);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should re-throw an error if flyerProcessingService.processJob fails', async () => {
|
||||||
|
const job = createMockJob({
|
||||||
|
filePath: '/tmp/fail.pdf',
|
||||||
|
originalFileName: 'fail.pdf',
|
||||||
|
checksum: 'def',
|
||||||
|
});
|
||||||
|
const processingError = new Error('Flyer processing failed');
|
||||||
|
mocks.processFlyerJob.mockRejectedValue(processingError);
|
||||||
|
|
||||||
|
await expect(flyerProcessor(job)).rejects.toThrow('Flyer processing failed');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('emailWorker', () => {
|
||||||
|
it('should call emailService.sendEmail with the job data', async () => {
|
||||||
|
const jobData = {
|
||||||
|
to: 'test@example.com',
|
||||||
|
subject: 'Test Email',
|
||||||
|
html: '<p>Hello</p>',
|
||||||
|
text: 'Hello',
|
||||||
|
};
|
||||||
|
const job = createMockJob(jobData);
|
||||||
|
|
||||||
|
await emailProcessor(job);
|
||||||
|
|
||||||
|
expect(mocks.sendEmail).toHaveBeenCalledTimes(1);
|
||||||
|
// The implementation passes the logger as the second argument
|
||||||
|
expect(mocks.sendEmail).toHaveBeenCalledWith(jobData, expect.anything());
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should log and re-throw an error if sendEmail fails with a non-Error object', async () => {
|
||||||
|
const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' });
|
||||||
|
const emailError = 'SMTP server is down'; // Reject with a string
|
||||||
|
mocks.sendEmail.mockRejectedValue(emailError);
|
||||||
|
|
||||||
|
await expect(emailProcessor(job)).rejects.toThrow(emailError);
|
||||||
|
|
||||||
|
// The worker should wrap the string in an Error object for logging
|
||||||
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
|
{ err: new Error(emailError), jobData: job.data },
|
||||||
|
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should re-throw an error if sendEmail fails', async () => {
|
||||||
|
const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' });
|
||||||
|
const emailError = new Error('SMTP server is down');
|
||||||
|
mocks.sendEmail.mockRejectedValue(emailError);
|
||||||
|
|
||||||
|
await expect(emailProcessor(job)).rejects.toThrow('SMTP server is down');
|
||||||
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
|
{ err: emailError, jobData: job.data },
|
||||||
|
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('analyticsWorker', () => {
|
||||||
|
it('should complete successfully for a valid report date', async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const job = createMockJob({ reportDate: '2024-01-01' });
|
||||||
|
|
||||||
|
const promise = analyticsProcessor(job);
|
||||||
|
// Advance timers to simulate the 10-second task completing
|
||||||
|
await vi.advanceTimersByTimeAsync(10000);
|
||||||
|
await promise; // Wait for the promise to resolve
|
||||||
|
|
||||||
|
// No error should be thrown
|
||||||
|
expect(true).toBe(true);
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error if reportDate is "FAIL"', async () => {
|
||||||
|
const job = createMockJob({ reportDate: 'FAIL' });
|
||||||
|
|
||||||
|
await expect(analyticsProcessor(job)).rejects.toThrow(
|
||||||
|
'This is a test failure for the analytics job.',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('cleanupWorker', () => {
|
||||||
|
it('should call unlink for each path provided in the job data', async () => {
|
||||||
|
const jobData = {
|
||||||
|
flyerId: 123,
|
||||||
|
paths: ['/tmp/file1.jpg', '/tmp/file2.pdf'],
|
||||||
|
};
|
||||||
|
const job = createMockJob(jobData);
|
||||||
|
mocks.unlink.mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
await cleanupProcessor(job);
|
||||||
|
|
||||||
|
expect(mocks.unlink).toHaveBeenCalledTimes(2);
|
||||||
|
expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file1.jpg');
|
||||||
|
expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file2.pdf');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not throw an error if a file is already deleted (ENOENT)', async () => {
|
||||||
|
const jobData = {
|
||||||
|
flyerId: 123,
|
||||||
|
paths: ['/tmp/existing.jpg', '/tmp/already-deleted.jpg'],
|
||||||
|
};
|
||||||
|
const job = createMockJob(jobData);
|
||||||
|
// Use the built-in NodeJS.ErrnoException type for mock system errors.
|
||||||
|
const enoentError: NodeJS.ErrnoException = new Error('File not found');
|
||||||
|
enoentError.code = 'ENOENT';
|
||||||
|
|
||||||
|
// First call succeeds, second call fails with ENOENT
|
||||||
|
mocks.unlink.mockResolvedValueOnce(undefined).mockRejectedValueOnce(enoentError);
|
||||||
|
|
||||||
|
// The processor should complete without throwing
|
||||||
|
await expect(cleanupProcessor(job)).resolves.toBeUndefined();
|
||||||
|
|
||||||
|
expect(mocks.unlink).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should re-throw an error for issues other than ENOENT (e.g., permissions)', async () => {
|
||||||
|
const jobData = {
|
||||||
|
flyerId: 123,
|
||||||
|
paths: ['/tmp/protected-file.jpg'],
|
||||||
|
};
|
||||||
|
const job = createMockJob(jobData);
|
||||||
|
// Use the built-in NodeJS.ErrnoException type for mock system errors.
|
||||||
|
const permissionError: NodeJS.ErrnoException = new Error('Permission denied');
|
||||||
|
permissionError.code = 'EACCES';
|
||||||
|
|
||||||
|
mocks.unlink.mockRejectedValue(permissionError);
|
||||||
|
|
||||||
|
await expect(cleanupProcessor(job)).rejects.toThrow('Permission denied');
|
||||||
|
|
||||||
|
// Verify the error was logged by the worker's catch block
|
||||||
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
|
{ err: permissionError },
|
||||||
|
expect.stringContaining(
|
||||||
|
`[CleanupWorker] Job ${job.id} for flyer ${job.data.flyerId} failed.`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('weeklyAnalyticsWorker', () => {
|
||||||
|
it('should complete successfully for a valid report date', async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const job = createMockJob({ reportYear: 2024, reportWeek: 1 });
|
||||||
|
|
||||||
|
const promise = weeklyAnalyticsProcessor(job);
|
||||||
|
// Advance timers to simulate the 30-second task completing
|
||||||
|
await vi.advanceTimersByTimeAsync(30000);
|
||||||
|
await promise; // Wait for the promise to resolve
|
||||||
|
|
||||||
|
// No error should be thrown
|
||||||
|
expect(true).toBe(true);
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should re-throw an error if the job fails', async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const job = createMockJob({ reportYear: 2024, reportWeek: 1 });
|
||||||
|
// Mock the internal logic to throw an error
|
||||||
|
const originalSetTimeout = setTimeout;
|
||||||
|
vi.spyOn(global, 'setTimeout').mockImplementation((callback, ms) => {
|
||||||
|
if (ms === 30000) {
|
||||||
|
// Target the simulated delay
|
||||||
|
throw new Error('Weekly analytics job failed');
|
||||||
|
}
|
||||||
|
return originalSetTimeout(callback, ms);
|
||||||
|
});
|
||||||
|
|
||||||
|
await expect(weeklyAnalyticsProcessor(job)).rejects.toThrow('Weekly analytics job failed');
|
||||||
|
vi.useRealTimers();
|
||||||
|
vi.restoreAllMocks(); // Restore setTimeout mock
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('tokenCleanupWorker', () => {
|
||||||
|
it('should call userRepo.deleteExpiredResetTokens and return the count', async () => {
|
||||||
|
const job = createMockJob({ timestamp: new Date().toISOString() });
|
||||||
|
mocks.deleteExpiredResetTokens.mockResolvedValue(10);
|
||||||
|
|
||||||
|
const result = await tokenCleanupProcessor(job);
|
||||||
|
|
||||||
|
expect(mocks.deleteExpiredResetTokens).toHaveBeenCalledTimes(1);
|
||||||
|
expect(result).toEqual({ deletedCount: 10 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should re-throw an error if the database call fails', async () => {
|
||||||
|
const job = createMockJob({ timestamp: new Date().toISOString() });
|
||||||
|
const dbError = new Error('DB cleanup failed');
|
||||||
|
mocks.deleteExpiredResetTokens.mockRejectedValue(dbError);
|
||||||
|
await expect(tokenCleanupProcessor(job)).rejects.toThrow(dbError);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
344
src/services/workers.server.ts
Normal file
344
src/services/workers.server.ts
Normal file
@@ -0,0 +1,344 @@
|
|||||||
|
import { Worker, Job, UnrecoverableError } from 'bullmq';
|
||||||
|
import fsPromises from 'node:fs/promises';
|
||||||
|
import { exec } from 'child_process';
|
||||||
|
import { promisify } from 'util';
|
||||||
|
|
||||||
|
import { logger } from './logger.server';
|
||||||
|
import { connection } from './redis.server';
|
||||||
|
import { aiService } from './aiService.server';
|
||||||
|
import * as emailService from './emailService.server';
|
||||||
|
import * as db from './db/index.db';
|
||||||
|
import {
|
||||||
|
FlyerProcessingService,
|
||||||
|
type FlyerJobData,
|
||||||
|
type IFileSystem,
|
||||||
|
} from './flyerProcessingService.server';
|
||||||
|
import { FlyerDataTransformer } from './flyerDataTransformer';
|
||||||
|
import {
|
||||||
|
flyerQueue,
|
||||||
|
emailQueue,
|
||||||
|
analyticsQueue,
|
||||||
|
weeklyAnalyticsQueue,
|
||||||
|
cleanupQueue,
|
||||||
|
tokenCleanupQueue,
|
||||||
|
type EmailJobData,
|
||||||
|
type AnalyticsJobData,
|
||||||
|
type CleanupJobData,
|
||||||
|
type WeeklyAnalyticsJobData,
|
||||||
|
type TokenCleanupJobData,
|
||||||
|
} from './queues.server';
|
||||||
|
|
||||||
|
const execAsync = promisify(exec);
|
||||||
|
|
||||||
|
// --- Worker Instantiation ---
|
||||||
|
|
||||||
|
const fsAdapter: IFileSystem = {
|
||||||
|
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
|
||||||
|
unlink: (path: string) => fsPromises.unlink(path),
|
||||||
|
};
|
||||||
|
|
||||||
|
const flyerProcessingService = new FlyerProcessingService(
|
||||||
|
aiService,
|
||||||
|
db,
|
||||||
|
fsAdapter,
|
||||||
|
execAsync,
|
||||||
|
cleanupQueue,
|
||||||
|
new FlyerDataTransformer(),
|
||||||
|
);
|
||||||
|
|
||||||
|
const normalizeError = (error: unknown): Error => {
|
||||||
|
return error instanceof Error ? error : new Error(String(error));
|
||||||
|
};
|
||||||
|
|
||||||
|
const attachWorkerEventListeners = (worker: Worker) => {
|
||||||
|
worker.on('completed', (job: Job, returnValue: unknown) => {
|
||||||
|
logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`);
|
||||||
|
});
|
||||||
|
|
||||||
|
worker.on('failed', (job: Job | undefined, error: Error) => {
|
||||||
|
logger.error(
|
||||||
|
{ err: error, jobData: job?.data },
|
||||||
|
`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
export const flyerWorker = new Worker<FlyerJobData>(
|
||||||
|
'flyer-processing',
|
||||||
|
async (job) => {
|
||||||
|
try {
|
||||||
|
return await flyerProcessingService.processJob(job);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const wrappedError = normalizeError(error);
|
||||||
|
const errorMessage = wrappedError.message || '';
|
||||||
|
if (
|
||||||
|
errorMessage.includes('quota') ||
|
||||||
|
errorMessage.includes('429') ||
|
||||||
|
errorMessage.includes('RESOURCE_EXHAUSTED')
|
||||||
|
) {
|
||||||
|
logger.error(
|
||||||
|
{ err: wrappedError, jobId: job.id },
|
||||||
|
'[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.',
|
||||||
|
);
|
||||||
|
throw new UnrecoverableError(errorMessage);
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
export const emailWorker = new Worker<EmailJobData>(
|
||||||
|
'email-sending',
|
||||||
|
async (job: Job<EmailJobData>) => {
|
||||||
|
const { to, subject } = job.data;
|
||||||
|
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
||||||
|
jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`);
|
||||||
|
try {
|
||||||
|
await emailService.sendEmail(job.data, jobLogger);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const wrappedError = normalizeError(error);
|
||||||
|
logger.error(
|
||||||
|
{
|
||||||
|
err: wrappedError,
|
||||||
|
jobData: job.data,
|
||||||
|
},
|
||||||
|
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||||
|
);
|
||||||
|
throw wrappedError;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
export const analyticsWorker = new Worker<AnalyticsJobData>(
|
||||||
|
'analytics-reporting',
|
||||||
|
async (job: Job<AnalyticsJobData>) => {
|
||||||
|
const { reportDate } = job.data;
|
||||||
|
logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`);
|
||||||
|
try {
|
||||||
|
if (reportDate === 'FAIL') {
|
||||||
|
throw new Error('This is a test failure for the analytics job.');
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||||
|
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const wrappedError = normalizeError(error);
|
||||||
|
logger.error({ err: wrappedError, jobData: job.data },
|
||||||
|
`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||||
|
);
|
||||||
|
throw wrappedError;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
export const cleanupWorker = new Worker<CleanupJobData>(
|
||||||
|
'file-cleanup',
|
||||||
|
async (job: Job<CleanupJobData>) => {
|
||||||
|
const { flyerId, paths } = job.data;
|
||||||
|
logger.info(
|
||||||
|
{ paths },
|
||||||
|
`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`,
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!paths || paths.length === 0) {
|
||||||
|
logger.warn(
|
||||||
|
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const filePath of paths) {
|
||||||
|
try {
|
||||||
|
await fsAdapter.unlink(filePath);
|
||||||
|
logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`);
|
||||||
|
} catch (unlinkError: unknown) {
|
||||||
|
if (
|
||||||
|
unlinkError instanceof Error &&
|
||||||
|
'code' in unlinkError &&
|
||||||
|
(unlinkError as any).code === 'ENOENT'
|
||||||
|
) {
|
||||||
|
logger.warn(
|
||||||
|
`[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
throw unlinkError;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.info(
|
||||||
|
`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`,
|
||||||
|
);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const wrappedError = normalizeError(error);
|
||||||
|
logger.error(
|
||||||
|
{ err: wrappedError },
|
||||||
|
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||||
|
);
|
||||||
|
throw wrappedError;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
|
||||||
|
'weekly-analytics-reporting',
|
||||||
|
async (job: Job<WeeklyAnalyticsJobData>) => {
|
||||||
|
const { reportYear, reportWeek } = job.data;
|
||||||
|
logger.info(
|
||||||
|
{ reportYear, reportWeek },
|
||||||
|
`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 30000));
|
||||||
|
logger.info(
|
||||||
|
`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`,
|
||||||
|
);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const wrappedError = normalizeError(error);
|
||||||
|
logger.error(
|
||||||
|
{ err: wrappedError, jobData: job.data },
|
||||||
|
`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||||
|
);
|
||||||
|
throw wrappedError;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
export const tokenCleanupWorker = new Worker<TokenCleanupJobData>(
|
||||||
|
'token-cleanup',
|
||||||
|
async (job: Job<TokenCleanupJobData>) => {
|
||||||
|
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
||||||
|
jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.');
|
||||||
|
try {
|
||||||
|
const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger);
|
||||||
|
jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`);
|
||||||
|
return { deletedCount };
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const wrappedError = normalizeError(error);
|
||||||
|
jobLogger.error({ err: wrappedError }, `[TokenCleanupWorker] Job ${job.id} failed.`);
|
||||||
|
throw wrappedError;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: 1,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
attachWorkerEventListeners(flyerWorker);
|
||||||
|
attachWorkerEventListeners(emailWorker);
|
||||||
|
attachWorkerEventListeners(analyticsWorker);
|
||||||
|
attachWorkerEventListeners(cleanupWorker);
|
||||||
|
attachWorkerEventListeners(weeklyAnalyticsWorker);
|
||||||
|
attachWorkerEventListeners(tokenCleanupWorker);
|
||||||
|
|
||||||
|
logger.info('All workers started and listening for jobs.');
|
||||||
|
|
||||||
|
const SHUTDOWN_TIMEOUT = 30000; // 30 seconds
|
||||||
|
|
||||||
|
export const gracefulShutdown = async (signal: string) => {
|
||||||
|
logger.info(
|
||||||
|
`[Shutdown] Received ${signal}. Initiating graceful shutdown (timeout: ${SHUTDOWN_TIMEOUT / 1000}s)...`,
|
||||||
|
);
|
||||||
|
|
||||||
|
const shutdownPromise = (async () => {
|
||||||
|
let hasErrors = false;
|
||||||
|
|
||||||
|
// Helper function to close a group of resources and log results
|
||||||
|
const closeResources = async (resources: { name: string; close: () => Promise<any> }[], type: string) => {
|
||||||
|
logger.info(`[Shutdown] Closing all ${type}...`);
|
||||||
|
const results = await Promise.allSettled(resources.map((r) => r.close()));
|
||||||
|
let groupHasErrors = false;
|
||||||
|
|
||||||
|
results.forEach((result, index) => {
|
||||||
|
if (result.status === 'rejected') {
|
||||||
|
groupHasErrors = true;
|
||||||
|
logger.error(
|
||||||
|
{ err: result.reason, resource: resources[index].name },
|
||||||
|
`[Shutdown] Error closing ${resources[index].name}.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!groupHasErrors) logger.info(`[Shutdown] All ${type} closed successfully.`);
|
||||||
|
return groupHasErrors;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Define resource groups for sequential shutdown
|
||||||
|
const workerResources = [
|
||||||
|
{ name: 'flyerWorker', close: () => flyerWorker.close() },
|
||||||
|
{ name: 'emailWorker', close: () => emailWorker.close() },
|
||||||
|
{ name: 'analyticsWorker', close: () => analyticsWorker.close() },
|
||||||
|
{ name: 'cleanupWorker', close: () => cleanupWorker.close() },
|
||||||
|
{ name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() },
|
||||||
|
{ name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() },
|
||||||
|
];
|
||||||
|
|
||||||
|
const queueResources = [
|
||||||
|
{ name: 'flyerQueue', close: () => flyerQueue.close() },
|
||||||
|
{ name: 'emailQueue', close: () => emailQueue.close() },
|
||||||
|
{ name: 'analyticsQueue', close: () => analyticsQueue.close() },
|
||||||
|
{ name: 'cleanupQueue', close: () => cleanupQueue.close() },
|
||||||
|
{ name: 'weeklyAnalyticsQueue', close: () => weeklyAnalyticsQueue.close() },
|
||||||
|
{ name: 'tokenCleanupQueue', close: () => tokenCleanupQueue.close() },
|
||||||
|
];
|
||||||
|
|
||||||
|
// 1. Close workers first
|
||||||
|
if (await closeResources(workerResources, 'workers')) hasErrors = true;
|
||||||
|
|
||||||
|
// 2. Then close queues
|
||||||
|
if (await closeResources(queueResources, 'queues')) hasErrors = true;
|
||||||
|
|
||||||
|
// 3. Finally, close the Redis connection
|
||||||
|
logger.info('[Shutdown] Closing Redis connection...');
|
||||||
|
try {
|
||||||
|
await connection.quit();
|
||||||
|
logger.info('[Shutdown] Redis connection closed successfully.');
|
||||||
|
} catch (err) {
|
||||||
|
hasErrors = true;
|
||||||
|
logger.error({ err, resource: 'redisConnection' }, `[Shutdown] Error closing Redis connection.`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return hasErrors;
|
||||||
|
})();
|
||||||
|
|
||||||
|
const timeoutPromise = new Promise<string>((resolve) =>
|
||||||
|
setTimeout(() => resolve('timeout'), SHUTDOWN_TIMEOUT),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await Promise.race([shutdownPromise, timeoutPromise]);
|
||||||
|
|
||||||
|
if (result === 'timeout') {
|
||||||
|
logger.error(
|
||||||
|
`[Shutdown] Graceful shutdown timed out after ${SHUTDOWN_TIMEOUT / 1000} seconds. Forcing exit.`,
|
||||||
|
);
|
||||||
|
process.exit(1);
|
||||||
|
} else {
|
||||||
|
const hasErrors = result as boolean;
|
||||||
|
if (!hasErrors) {
|
||||||
|
logger.info('[Shutdown] All resources closed successfully.');
|
||||||
|
} else {
|
||||||
|
logger.warn('[Shutdown] Graceful shutdown completed with errors.');
|
||||||
|
}
|
||||||
|
process.exit(hasErrors ? 1 : 0);
|
||||||
|
}
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user