Files
flyer-crawler.projectium.com/src/services/workers.server.ts

242 lines
8.0 KiB
TypeScript

import { Worker, Job, UnrecoverableError } from 'bullmq';
import fsPromises from 'node:fs/promises';
import { exec } from 'child_process';
import { promisify } from 'util';
import { logger } from './logger.server';
import { connection } from './redis.server';
import { aiService } from './aiService.server';
import { analyticsService } from './analyticsService.server';
import { userService } from './userService';
import * as emailService from './emailService.server';
import * as db from './db/index.db';
import {
FlyerProcessingService,
type FlyerJobData,
} from './flyerProcessingService.server';
import { FlyerFileHandler, type IFileSystem } from './flyerFileHandler.server';
import { FlyerAiProcessor } from './flyerAiProcessor.server';
import { FlyerDataTransformer } from './flyerDataTransformer';
import {
flyerQueue,
emailQueue,
analyticsQueue,
weeklyAnalyticsQueue,
cleanupQueue,
tokenCleanupQueue,
type EmailJobData,
type AnalyticsJobData,
type CleanupJobData,
type WeeklyAnalyticsJobData,
type TokenCleanupJobData,
} from './queues.server';
const execAsync = promisify(exec);
// --- Worker Instantiation ---
const fsAdapter: IFileSystem = {
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
unlink: (path: string) => fsPromises.unlink(path),
};
const flyerProcessingService = new FlyerProcessingService(
aiService,
new FlyerFileHandler(fsAdapter, execAsync),
new FlyerAiProcessor(aiService, db.personalizationRepo),
db,
fsAdapter,
execAsync,
cleanupQueue,
new FlyerDataTransformer(),
);
const normalizeError = (error: unknown): Error => {
return error instanceof Error ? error : new Error(String(error));
};
/**
* Creates a higher-order function to wrap worker processors with common logic.
* This includes error normalization to ensure that any thrown value is an Error instance,
* which is a best practice for BullMQ workers.
* @param processor The core logic for the worker.
* @returns An async function that takes a job and executes the processor.
*/
const createWorkerProcessor = <T>(processor: (job: Job<T>) => Promise<any>) => {
return async (job: Job<T>) => {
try {
return await processor(job);
} catch (error: unknown) {
// The service layer now handles detailed logging. This block just ensures
// any unexpected errors are normalized before BullMQ handles them.
throw normalizeError(error);
}
};
};
const attachWorkerEventListeners = (worker: Worker) => {
worker.on('completed', (job: Job, returnValue: unknown) => {
logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`);
});
worker.on('failed', (job: Job | undefined, error: Error) => {
logger.error(
{ err: error, jobData: job?.data },
`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`,
);
});
};
export const flyerWorker = new Worker<FlyerJobData>(
'flyer-processing',
createWorkerProcessor((job) => flyerProcessingService.processJob(job)),
{
connection,
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
},
);
export const emailWorker = new Worker<EmailJobData>(
'email-sending',
createWorkerProcessor((job) => emailService.processEmailJob(job)),
{
connection,
concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10),
},
);
export const analyticsWorker = new Worker<AnalyticsJobData>(
'analytics-reporting',
createWorkerProcessor((job) => analyticsService.processDailyReportJob(job)),
{
connection,
concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10),
},
);
export const cleanupWorker = new Worker<CleanupJobData>(
'file-cleanup',
createWorkerProcessor((job) => flyerProcessingService.processCleanupJob(job)),
{
connection,
concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10),
},
);
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
'weekly-analytics-reporting',
createWorkerProcessor((job) => analyticsService.processWeeklyReportJob(job)),
{
connection,
concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10),
},
);
export const tokenCleanupWorker = new Worker<TokenCleanupJobData>(
'token-cleanup',
createWorkerProcessor((job) => userService.processTokenCleanupJob(job)),
{
connection,
concurrency: 1,
},
);
attachWorkerEventListeners(flyerWorker);
attachWorkerEventListeners(emailWorker);
attachWorkerEventListeners(analyticsWorker);
attachWorkerEventListeners(cleanupWorker);
attachWorkerEventListeners(weeklyAnalyticsWorker);
attachWorkerEventListeners(tokenCleanupWorker);
logger.info('All workers started and listening for jobs.');
const SHUTDOWN_TIMEOUT = 30000; // 30 seconds
export const gracefulShutdown = async (signal: string) => {
logger.info(
`[Shutdown] Received ${signal}. Initiating graceful shutdown (timeout: ${SHUTDOWN_TIMEOUT / 1000}s)...`,
);
const shutdownPromise = (async () => {
let hasErrors = false;
// Helper function to close a group of resources and log results
const closeResources = async (resources: { name: string; close: () => Promise<any> }[], type: string) => {
logger.info(`[Shutdown] Closing all ${type}...`);
const results = await Promise.allSettled(resources.map((r) => r.close()));
let groupHasErrors = false;
results.forEach((result, index) => {
if (result.status === 'rejected') {
groupHasErrors = true;
logger.error(
{ err: result.reason, resource: resources[index].name },
`[Shutdown] Error closing ${resources[index].name}.`,
);
}
});
if (!groupHasErrors) logger.info(`[Shutdown] All ${type} closed successfully.`);
return groupHasErrors;
};
// Define resource groups for sequential shutdown
const workerResources = [
{ name: 'flyerWorker', close: () => flyerWorker.close() },
{ name: 'emailWorker', close: () => emailWorker.close() },
{ name: 'analyticsWorker', close: () => analyticsWorker.close() },
{ name: 'cleanupWorker', close: () => cleanupWorker.close() },
{ name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() },
{ name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() },
];
const queueResources = [
{ name: 'flyerQueue', close: () => flyerQueue.close() },
{ name: 'emailQueue', close: () => emailQueue.close() },
{ name: 'analyticsQueue', close: () => analyticsQueue.close() },
{ name: 'cleanupQueue', close: () => cleanupQueue.close() },
{ name: 'weeklyAnalyticsQueue', close: () => weeklyAnalyticsQueue.close() },
{ name: 'tokenCleanupQueue', close: () => tokenCleanupQueue.close() },
];
// 1. Close workers first
if (await closeResources(workerResources, 'workers')) hasErrors = true;
// 2. Then close queues
if (await closeResources(queueResources, 'queues')) hasErrors = true;
// 3. Finally, close the Redis connection
logger.info('[Shutdown] Closing Redis connection...');
try {
await connection.quit();
logger.info('[Shutdown] Redis connection closed successfully.');
} catch (err) {
hasErrors = true;
logger.error({ err, resource: 'redisConnection' }, `[Shutdown] Error closing Redis connection.`);
}
return hasErrors;
})();
const timeoutPromise = new Promise<string>((resolve) =>
setTimeout(() => resolve('timeout'), SHUTDOWN_TIMEOUT),
);
const result = await Promise.race([shutdownPromise, timeoutPromise]);
if (result === 'timeout') {
logger.error(
`[Shutdown] Graceful shutdown timed out after ${SHUTDOWN_TIMEOUT / 1000} seconds. Forcing exit.`,
);
process.exit(1);
} else {
const hasErrors = result as boolean;
if (!hasErrors) {
logger.info('[Shutdown] All resources closed successfully.');
} else {
logger.warn('[Shutdown] Graceful shutdown completed with errors.');
}
process.exit(hasErrors ? 1 : 0);
}
};