more work on the BullMQ workers
This commit is contained in:
@@ -1,438 +1,32 @@
|
||||
// src/services/queueService.server.ts
|
||||
import { Queue, Worker, Job, UnrecoverableError } from 'bullmq';
|
||||
import IORedis from 'ioredis'; // Correctly imported
|
||||
import fsPromises from 'node:fs/promises';
|
||||
import { exec } from 'child_process';
|
||||
import { promisify } from 'util';
|
||||
|
||||
import { logger } from './logger.server';
|
||||
import { aiService } from './aiService.server';
|
||||
import * as emailService from './emailService.server';
|
||||
import * as db from './db/index.db';
|
||||
import { connection } from './redis.server';
|
||||
import {
|
||||
FlyerProcessingService,
|
||||
type FlyerJobData,
|
||||
type IFileSystem,
|
||||
} from './flyerProcessingService.server';
|
||||
import { FlyerDataTransformer } from './flyerDataTransformer';
|
||||
flyerQueue,
|
||||
emailQueue,
|
||||
analyticsQueue,
|
||||
weeklyAnalyticsQueue,
|
||||
cleanupQueue,
|
||||
tokenCleanupQueue,
|
||||
} from './queues.server';
|
||||
|
||||
export const connection = new IORedis(process.env.REDIS_URL!, {
|
||||
maxRetriesPerRequest: null, // Important for BullMQ
|
||||
password: process.env.REDIS_PASSWORD, // Add the password from environment variables
|
||||
});
|
||||
// Re-export everything for backward compatibility where possible
|
||||
export { connection } from './redis.server';
|
||||
export * from './queues.server';
|
||||
|
||||
// --- Redis Connection Event Listeners ---
|
||||
connection.on('connect', () => {
|
||||
logger.info('[Redis] Connection established successfully.');
|
||||
});
|
||||
|
||||
connection.on('error', (err) => {
|
||||
// This is crucial for diagnosing Redis connection issues. // The patch requested this specific error handling.
|
||||
logger.error({ err }, '[Redis] Connection error.');
|
||||
});
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
// --- Queues ---
|
||||
export const flyerQueue = new Queue<FlyerJobData>('flyer-processing', {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 3, // Attempt a job 3 times before marking it as failed.
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 5000, // Start with a 5-second delay for the first retry
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
export const emailQueue = new Queue<EmailJobData>('email-sending', {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 5, // Emails can be retried more aggressively
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 10000, // Start with a 10-second delay
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
export const analyticsQueue = new Queue<AnalyticsJobData>('analytics-reporting', {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 2, // Analytics can be intensive, so fewer retries might be desired.
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 60000, // Wait a minute before retrying.
|
||||
},
|
||||
// Remove job from queue on completion to save space, as results are in the DB.
|
||||
removeOnComplete: true,
|
||||
removeOnFail: 50, // Keep the last 50 failed jobs for inspection.
|
||||
},
|
||||
});
|
||||
|
||||
export const weeklyAnalyticsQueue = new Queue<WeeklyAnalyticsJobData>(
|
||||
'weekly-analytics-reporting',
|
||||
{
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 2,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 3600000, // 1 hour delay for retries
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: 50,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
export const cleanupQueue = new Queue<CleanupJobData>('file-cleanup', {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 30000, // Retry cleanup after 30 seconds
|
||||
},
|
||||
removeOnComplete: true, // No need to keep successful cleanup jobs
|
||||
},
|
||||
});
|
||||
|
||||
export const tokenCleanupQueue = new Queue<TokenCleanupJobData>('token-cleanup', {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 2,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 3600000, // 1 hour delay
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: 10,
|
||||
},
|
||||
});
|
||||
// --- Job Data Interfaces ---
|
||||
|
||||
interface EmailJobData {
|
||||
to: string;
|
||||
subject: string;
|
||||
text: string;
|
||||
html: string;
|
||||
}
|
||||
// We do NOT export workers here anymore to prevent side effects.
|
||||
// Consumers needing workers must import from './workers.server'.
|
||||
|
||||
/**
|
||||
* Defines the data for an analytics job.
|
||||
*/
|
||||
interface AnalyticsJobData {
|
||||
reportDate: string; // e.g., '2024-10-26'
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the data for a weekly analytics job.
|
||||
*/
|
||||
interface WeeklyAnalyticsJobData {
|
||||
reportYear: number;
|
||||
reportWeek: number; // ISO week number (1-53)
|
||||
}
|
||||
|
||||
interface CleanupJobData {
|
||||
flyerId: number;
|
||||
// An array of absolute file paths to be deleted. Made optional for manual cleanup triggers.
|
||||
paths?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the data for a token cleanup job.
|
||||
*/
|
||||
interface TokenCleanupJobData {
|
||||
timestamp: string; // ISO string to ensure the job is unique per run
|
||||
}
|
||||
|
||||
// --- Worker Instantiation ---
|
||||
|
||||
// Create an adapter for fsPromises to match the IFileSystem interface.
|
||||
const fsAdapter: IFileSystem = {
|
||||
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
|
||||
unlink: (path: string) => fsPromises.unlink(path),
|
||||
};
|
||||
|
||||
// Instantiate the service with its real dependencies
|
||||
const flyerProcessingService = new FlyerProcessingService(
|
||||
aiService,
|
||||
db,
|
||||
fsAdapter,
|
||||
execAsync,
|
||||
cleanupQueue, // Inject the cleanup queue to break the circular dependency
|
||||
new FlyerDataTransformer(), // Inject the new transformer
|
||||
);
|
||||
|
||||
/**
|
||||
* Helper to ensure that an unknown error is normalized to an Error object.
|
||||
* This ensures consistent logging structure and stack traces.
|
||||
*/
|
||||
const normalizeError = (error: unknown): Error => {
|
||||
return error instanceof Error ? error : new Error(String(error));
|
||||
};
|
||||
|
||||
/**
|
||||
* A generic function to attach logging event listeners to any worker.
|
||||
* This centralizes logging for job completion and final failure.
|
||||
* @param worker The BullMQ worker instance.
|
||||
*/
|
||||
const attachWorkerEventListeners = (worker: Worker) => {
|
||||
worker.on('completed', (job: Job, returnValue: unknown) => {
|
||||
logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`);
|
||||
});
|
||||
|
||||
worker.on('failed', (job: Job | undefined, error: Error) => {
|
||||
// This event fires after all retries have failed.
|
||||
logger.error(
|
||||
{ err: error, jobData: job?.data },
|
||||
`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`,
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
export const flyerWorker = new Worker<FlyerJobData>(
|
||||
'flyer-processing', // Must match the queue name
|
||||
async (job) => {
|
||||
try {
|
||||
// The processJob method creates its own job-specific logger internally.
|
||||
return await flyerProcessingService.processJob(job);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
// Check for quota errors or other unrecoverable errors from the AI service
|
||||
const errorMessage = wrappedError.message || '';
|
||||
if (
|
||||
errorMessage.includes('quota') ||
|
||||
errorMessage.includes('429') ||
|
||||
errorMessage.includes('RESOURCE_EXHAUSTED')
|
||||
) {
|
||||
logger.error(
|
||||
{ err: wrappedError, jobId: job.id },
|
||||
'[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.',
|
||||
);
|
||||
throw new UnrecoverableError(errorMessage);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
|
||||
},
|
||||
);
|
||||
/**
|
||||
* A dedicated worker process for sending emails.
|
||||
*/
|
||||
export const emailWorker = new Worker<EmailJobData>(
|
||||
'email-sending',
|
||||
async (job: Job<EmailJobData>) => {
|
||||
const { to, subject } = job.data;
|
||||
// Create a job-specific logger instance
|
||||
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
||||
jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`);
|
||||
try {
|
||||
await emailService.sendEmail(job.data, jobLogger);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
logger.error(
|
||||
{
|
||||
err: wrappedError,
|
||||
jobData: job.data,
|
||||
},
|
||||
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
// Re-throw to let BullMQ handle the failure and retry.
|
||||
throw wrappedError;
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10),
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* A dedicated worker for generating daily analytics reports.
|
||||
* This is a placeholder for the actual report generation logic.
|
||||
*/
|
||||
export const analyticsWorker = new Worker<AnalyticsJobData>(
|
||||
'analytics-reporting',
|
||||
async (job: Job<AnalyticsJobData>) => {
|
||||
const { reportDate } = job.data;
|
||||
logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`);
|
||||
try {
|
||||
// Special case for testing the retry mechanism
|
||||
if (reportDate === 'FAIL') {
|
||||
throw new Error('This is a test failure for the analytics job.');
|
||||
}
|
||||
|
||||
// In a real implementation, you would call a database function here.
|
||||
// For example: await db.generateDailyAnalyticsReport(reportDate);
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000)); // Simulate a 10-second task
|
||||
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
// Standardize error logging.
|
||||
logger.error({ err: wrappedError, jobData: job.data },
|
||||
`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError; // Re-throw to let BullMQ handle the failure and retry.
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* A dedicated worker for cleaning up flyer-related files from the filesystem.
|
||||
* This is triggered manually by an admin after a flyer has been reviewed.
|
||||
*/
|
||||
export const cleanupWorker = new Worker<CleanupJobData>(
|
||||
// This worker now handles two types of cleanup jobs.
|
||||
'file-cleanup', // The queue name
|
||||
async (job: Job<CleanupJobData>) => {
|
||||
// Destructure the data from the job payload.
|
||||
const { flyerId, paths } = job.data;
|
||||
logger.info(
|
||||
{ paths },
|
||||
`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`,
|
||||
);
|
||||
|
||||
try {
|
||||
if (!paths || paths.length === 0) {
|
||||
logger.warn(
|
||||
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Iterate over the file paths provided in the job data and delete each one.
|
||||
for (const filePath of paths) {
|
||||
try {
|
||||
await fsAdapter.unlink(filePath);
|
||||
logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`);
|
||||
} catch (unlinkError: unknown) {
|
||||
// If the file doesn't exist, it's a success from our perspective.
|
||||
// We can log it as a warning and continue without failing the job.
|
||||
if (
|
||||
unlinkError instanceof Error &&
|
||||
'code' in unlinkError &&
|
||||
unlinkError.code === 'ENOENT'
|
||||
) {
|
||||
logger.warn(
|
||||
`[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`,
|
||||
);
|
||||
} else {
|
||||
throw unlinkError; // For any other error (e.g., permissions), re-throw to fail the job.
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(
|
||||
`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`,
|
||||
);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
// Standardize error logging.
|
||||
logger.error(
|
||||
{ err: wrappedError },
|
||||
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError; // Re-throw to let BullMQ handle the failure and retry.
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10),
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* A dedicated worker for generating weekly analytics reports.
|
||||
* This is a placeholder for the actual report generation logic.
|
||||
*/
|
||||
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
|
||||
'weekly-analytics-reporting',
|
||||
async (job: Job<WeeklyAnalyticsJobData>) => {
|
||||
const { reportYear, reportWeek } = job.data;
|
||||
logger.info(
|
||||
{ reportYear, reportWeek },
|
||||
`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`,
|
||||
);
|
||||
try {
|
||||
// Simulate a longer-running task for weekly reports
|
||||
await new Promise((resolve) => setTimeout(resolve, 30000)); // Simulate 30-second task
|
||||
logger.info(
|
||||
`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`,
|
||||
);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
// Standardize error logging.
|
||||
logger.error(
|
||||
{ err: wrappedError, jobData: job.data },
|
||||
`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError; // Re-throw to let BullMQ handle the failure and retry.
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10),
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* A dedicated worker for cleaning up expired password reset tokens.
|
||||
*/
|
||||
export const tokenCleanupWorker = new Worker<TokenCleanupJobData>(
|
||||
'token-cleanup',
|
||||
async (job: Job<TokenCleanupJobData>) => {
|
||||
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
||||
jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.');
|
||||
try {
|
||||
const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger);
|
||||
jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`);
|
||||
return { deletedCount };
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
jobLogger.error({ err: wrappedError }, `[TokenCleanupWorker] Job ${job.id} failed.`);
|
||||
throw wrappedError;
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
concurrency: 1, // This is a low-priority, non-intensive task.
|
||||
},
|
||||
);
|
||||
|
||||
// --- Attach Event Listeners to All Workers ---
|
||||
attachWorkerEventListeners(flyerWorker);
|
||||
attachWorkerEventListeners(emailWorker);
|
||||
attachWorkerEventListeners(analyticsWorker);
|
||||
attachWorkerEventListeners(cleanupWorker);
|
||||
attachWorkerEventListeners(weeklyAnalyticsWorker);
|
||||
attachWorkerEventListeners(tokenCleanupWorker);
|
||||
|
||||
logger.info('All workers started and listening for jobs.');
|
||||
|
||||
/**
|
||||
* A function to gracefully shut down all queue workers and connections.
|
||||
* This is essential for preventing jobs from getting stuck in an 'active' state
|
||||
* when the application process is terminated.
|
||||
* @param signal The signal that triggered the shutdown (e.g., 'SIGINT').
|
||||
* A function to gracefully shut down all queues and connections.
|
||||
* This is for the API process which only uses queues.
|
||||
* For worker processes, use the gracefulShutdown from workers.server.ts
|
||||
*/
|
||||
export const gracefulShutdown = async (signal: string) => {
|
||||
logger.info(`[Shutdown] Received ${signal}. Closing all workers and queues...`);
|
||||
logger.info(`[Shutdown] Received ${signal}. Closing all queues...`);
|
||||
let exitCode = 0; // Default to success
|
||||
|
||||
const resources = [
|
||||
{ name: 'flyerWorker', close: () => flyerWorker.close() },
|
||||
{ name: 'emailWorker', close: () => emailWorker.close() },
|
||||
{ name: 'analyticsWorker', close: () => analyticsWorker.close() },
|
||||
{ name: 'cleanupWorker', close: () => cleanupWorker.close() },
|
||||
{ name: 'weeklyAnalyticsWorker', close: () => weeklyAnalyticsWorker.close() },
|
||||
{ name: 'tokenCleanupWorker', close: () => tokenCleanupWorker.close() },
|
||||
{ name: 'flyerQueue', close: () => flyerQueue.close() },
|
||||
{ name: 'emailQueue', close: () => emailQueue.close() },
|
||||
{ name: 'analyticsQueue', close: () => analyticsQueue.close() },
|
||||
@@ -455,7 +49,7 @@ export const gracefulShutdown = async (signal: string) => {
|
||||
});
|
||||
|
||||
if (exitCode === 0) {
|
||||
logger.info('[Shutdown] All workers, queues, and connections closed successfully.');
|
||||
logger.info('[Shutdown] All queues and connections closed successfully.');
|
||||
} else {
|
||||
logger.warn('[Shutdown] Graceful shutdown completed with errors.');
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user