345 lines
11 KiB
TypeScript
345 lines
11 KiB
TypeScript
import { Worker, Job, UnrecoverableError } from 'bullmq';
|
|
import fsPromises from 'node:fs/promises';
|
|
import { exec } from 'child_process';
|
|
import { promisify } from 'util';
|
|
|
|
import { logger } from './logger.server';
|
|
import { connection } from './redis.server';
|
|
import { aiService } from './aiService.server';
|
|
import * as emailService from './emailService.server';
|
|
import * as db from './db/index.db';
|
|
import {
|
|
FlyerProcessingService,
|
|
type FlyerJobData,
|
|
type IFileSystem,
|
|
} from './flyerProcessingService.server';
|
|
import { FlyerDataTransformer } from './flyerDataTransformer';
|
|
import {
|
|
flyerQueue,
|
|
emailQueue,
|
|
analyticsQueue,
|
|
weeklyAnalyticsQueue,
|
|
cleanupQueue,
|
|
tokenCleanupQueue,
|
|
type EmailJobData,
|
|
type AnalyticsJobData,
|
|
type CleanupJobData,
|
|
type WeeklyAnalyticsJobData,
|
|
type TokenCleanupJobData,
|
|
} from './queues.server';
|
|
|
|
const execAsync = promisify(exec);
|
|
|
|
// --- Worker Instantiation ---
|
|
|
|
const fsAdapter: IFileSystem = {
|
|
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
|
|
unlink: (path: string) => fsPromises.unlink(path),
|
|
};
|
|
|
|
const flyerProcessingService = new FlyerProcessingService(
|
|
aiService,
|
|
db,
|
|
fsAdapter,
|
|
execAsync,
|
|
cleanupQueue,
|
|
new FlyerDataTransformer(),
|
|
);
|
|
|
|
const normalizeError = (error: unknown): Error => {
|
|
return error instanceof Error ? error : new Error(String(error));
|
|
};
|
|
|
|
const attachWorkerEventListeners = (worker: Worker) => {
|
|
worker.on('completed', (job: Job, returnValue: unknown) => {
|
|
logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`);
|
|
});
|
|
|
|
worker.on('failed', (job: Job | undefined, error: Error) => {
|
|
logger.error(
|
|
{ err: error, jobData: job?.data },
|
|
`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`,
|
|
);
|
|
});
|
|
};
|
|
|
|
export const flyerWorker = new Worker<FlyerJobData>(
|
|
'flyer-processing',
|
|
async (job) => {
|
|
try {
|
|
return await flyerProcessingService.processJob(job);
|
|
} catch (error: unknown) {
|
|
const wrappedError = normalizeError(error);
|
|
const errorMessage = wrappedError.message || '';
|
|
if (
|
|
errorMessage.includes('quota') ||
|
|
errorMessage.includes('429') ||
|
|
errorMessage.includes('RESOURCE_EXHAUSTED')
|
|
) {
|
|
logger.error(
|
|
{ err: wrappedError, jobId: job.id },
|
|
'[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.',
|
|
);
|
|
throw new UnrecoverableError(errorMessage);
|
|
}
|
|
throw error;
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
|
|
},
|
|
);
|
|
|
|
export const emailWorker = new Worker<EmailJobData>(
|
|
'email-sending',
|
|
async (job: Job<EmailJobData>) => {
|
|
const { to, subject } = job.data;
|
|
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
|
jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`);
|
|
try {
|
|
await emailService.sendEmail(job.data, jobLogger);
|
|
} catch (error: unknown) {
|
|
const wrappedError = normalizeError(error);
|
|
logger.error(
|
|
{
|
|
err: wrappedError,
|
|
jobData: job.data,
|
|
},
|
|
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
);
|
|
throw wrappedError;
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10),
|
|
},
|
|
);
|
|
|
|
export const analyticsWorker = new Worker<AnalyticsJobData>(
|
|
'analytics-reporting',
|
|
async (job: Job<AnalyticsJobData>) => {
|
|
const { reportDate } = job.data;
|
|
logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`);
|
|
try {
|
|
if (reportDate === 'FAIL') {
|
|
throw new Error('This is a test failure for the analytics job.');
|
|
}
|
|
await new Promise((resolve) => setTimeout(resolve, 10000));
|
|
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
|
|
} catch (error: unknown) {
|
|
const wrappedError = normalizeError(error);
|
|
logger.error({ err: wrappedError, jobData: job.data },
|
|
`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
);
|
|
throw wrappedError;
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
|
},
|
|
);
|
|
|
|
export const cleanupWorker = new Worker<CleanupJobData>(
|
|
'file-cleanup',
|
|
async (job: Job<CleanupJobData>) => {
|
|
const { flyerId, paths } = job.data;
|
|
logger.info(
|
|
{ paths },
|
|
`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`,
|
|
);
|
|
|
|
try {
|
|
if (!paths || paths.length === 0) {
|
|
logger.warn(
|
|
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
for (const filePath of paths) {
|
|
try {
|
|
await fsAdapter.unlink(filePath);
|
|
logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`);
|
|
} catch (unlinkError: unknown) {
|
|
if (
|
|
unlinkError instanceof Error &&
|
|
'code' in unlinkError &&
|
|
(unlinkError as any).code === 'ENOENT'
|
|
) {
|
|
logger.warn(
|
|
`[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`,
|
|
);
|
|
} else {
|
|
throw unlinkError;
|
|
}
|
|
}
|
|
}
|
|
logger.info(
|
|
`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`,
|
|
);
|
|
} catch (error: unknown) {
|
|
const wrappedError = normalizeError(error);
|
|
logger.error(
|
|
{ err: wrappedError },
|
|
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
);
|
|
throw wrappedError;
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10),
|
|
},
|
|
);
|
|
|
|
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
|
|
'weekly-analytics-reporting',
|
|
async (job: Job<WeeklyAnalyticsJobData>) => {
|
|
const { reportYear, reportWeek } = job.data;
|
|
logger.info(
|
|
{ reportYear, reportWeek },
|
|
`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`,
|
|
);
|
|
try {
|
|
await new Promise((resolve) => setTimeout(resolve, 30000));
|
|
logger.info(
|
|
`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`,
|
|
);
|
|
} catch (error: unknown) {
|
|
const wrappedError = normalizeError(error);
|
|
logger.error(
|
|
{ err: wrappedError, jobData: job.data },
|
|
`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
|
);
|
|
throw wrappedError;
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
|
},
|
|
);
|
|
|
|
export const tokenCleanupWorker = new Worker<TokenCleanupJobData>(
|
|
'token-cleanup',
|
|
async (job: Job<TokenCleanupJobData>) => {
|
|
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
|
jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.');
|
|
try {
|
|
const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger);
|
|
jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`);
|
|
return { deletedCount };
|
|
} catch (error: unknown) {
|
|
const wrappedError = normalizeError(error);
|
|
jobLogger.error({ err: wrappedError }, `[TokenCleanupWorker] Job ${job.id} failed.`);
|
|
throw wrappedError;
|
|
}
|
|
},
|
|
{
|
|
connection,
|
|
concurrency: 1,
|
|
},
|
|
);
|
|
|
|
attachWorkerEventListeners(flyerWorker);
|
|
attachWorkerEventListeners(emailWorker);
|
|
attachWorkerEventListeners(analyticsWorker);
|
|
attachWorkerEventListeners(cleanupWorker);
|
|
attachWorkerEventListeners(weeklyAnalyticsWorker);
|
|
attachWorkerEventListeners(tokenCleanupWorker);
|
|
|
|
logger.info('All workers started and listening for jobs.');
|
|
|
|
const SHUTDOWN_TIMEOUT = 30000; // 30 seconds
|
|
|
|
export const gracefulShutdown = async (signal: string) => {
|
|
logger.info(
|
|
`[Shutdown] Received ${signal}. Initiating graceful shutdown (timeout: ${SHUTDOWN_TIMEOUT / 1000}s)...`,
|
|
);
|
|
|
|
const shutdownPromise = (async () => {
|
|
let hasErrors = false;
|
|
|
|
// Helper function to close a group of resources and log results
|
|
const closeResources = async (resources: { name: string; close: () => Promise<any> }[], type: string) => {
|
|
logger.info(`[Shutdown] Closing all ${type}...`);
|
|
const results = await Promise.allSettled(resources.map((r) => r.close()));
|
|
let groupHasErrors = false;
|
|
|
|
results.forEach((result, index) => {
|
|
if (result.status === 'rejected') {
|
|
groupHasErrors = true;
|
|
logger.error(
|
|
{ err: result.reason, resource: resources[index].name },
|
|
`[Shutdown] Error closing ${resources[index].name}.`,
|
|
);
|
|
}
|
|
});
|
|
|
|
if (!groupHasErrors) logger.info(`[Shutdown] All ${type} closed successfully.`);
|
|
return groupHasErrors;
|
|
};
|
|
|
|
// Define resource groups for sequential shutdown
|
|
const workerResources = [
|
|
{ name: 'flyerWorker', close: () => flyerWorker.close() },
|
|
{ name: 'emailWorker', close: () => emailWorker.close() },
|
|
{ name: 'analyticsWorker', close: () => analyticsWorker.close() },
|
|
{ name: 'cleanupWorker', close: () => cleanupWorker.close() },
|
|
{ name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() },
|
|
{ name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() },
|
|
];
|
|
|
|
const queueResources = [
|
|
{ name: 'flyerQueue', close: () => flyerQueue.close() },
|
|
{ name: 'emailQueue', close: () => emailQueue.close() },
|
|
{ name: 'analyticsQueue', close: () => analyticsQueue.close() },
|
|
{ name: 'cleanupQueue', close: () => cleanupQueue.close() },
|
|
{ name: 'weeklyAnalyticsQueue', close: () => weeklyAnalyticsQueue.close() },
|
|
{ name: 'tokenCleanupQueue', close: () => tokenCleanupQueue.close() },
|
|
];
|
|
|
|
// 1. Close workers first
|
|
if (await closeResources(workerResources, 'workers')) hasErrors = true;
|
|
|
|
// 2. Then close queues
|
|
if (await closeResources(queueResources, 'queues')) hasErrors = true;
|
|
|
|
// 3. Finally, close the Redis connection
|
|
logger.info('[Shutdown] Closing Redis connection...');
|
|
try {
|
|
await connection.quit();
|
|
logger.info('[Shutdown] Redis connection closed successfully.');
|
|
} catch (err) {
|
|
hasErrors = true;
|
|
logger.error({ err, resource: 'redisConnection' }, `[Shutdown] Error closing Redis connection.`);
|
|
}
|
|
|
|
return hasErrors;
|
|
})();
|
|
|
|
const timeoutPromise = new Promise<string>((resolve) =>
|
|
setTimeout(() => resolve('timeout'), SHUTDOWN_TIMEOUT),
|
|
);
|
|
|
|
const result = await Promise.race([shutdownPromise, timeoutPromise]);
|
|
|
|
if (result === 'timeout') {
|
|
logger.error(
|
|
`[Shutdown] Graceful shutdown timed out after ${SHUTDOWN_TIMEOUT / 1000} seconds. Forcing exit.`,
|
|
);
|
|
process.exit(1);
|
|
} else {
|
|
const hasErrors = result as boolean;
|
|
if (!hasErrors) {
|
|
logger.info('[Shutdown] All resources closed successfully.');
|
|
} else {
|
|
logger.warn('[Shutdown] Graceful shutdown completed with errors.');
|
|
}
|
|
process.exit(hasErrors ? 1 : 0);
|
|
}
|
|
};
|