Files
flyer-crawler.projectium.com/src/services/queueService.server.ts
Torben Sorensen 158778c2ec
Some checks failed
Deploy to Web Server flyer-crawler.projectium.com / deploy (push) Failing after 5m16s
DB refactor for easier testsing
App.ts refactor into hooks
unit tests
2025-12-08 20:46:12 -08:00

337 lines
12 KiB
TypeScript

// src/services/queueService.server.ts
import { Queue, Worker, Job } from 'bullmq';
import IORedis from 'ioredis'; // Correctly imported
import type { Dirent } from 'node:fs';
import fsPromises from 'node:fs/promises';
import { exec } from 'child_process';
import { promisify } from 'util';
import { logger } from './logger.server';
import { aiService } from './aiService.server';
import * as emailService from './emailService.server';
import * as db from './db/index.db';
import { FlyerProcessingService, type FlyerJobData, type IFileSystem } from './flyerProcessingService.server';
import { FlyerDataTransformer } from './flyerDataTransformer';
// --- Start: Interfaces for Dependency Injection ---
// --- End: Interfaces for Dependency Injection ---
export const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // Important for BullMQ
password: process.env.REDIS_PASSWORD, // Add the password from environment variables
});
// --- Redis Connection Event Listeners ---
connection.on('connect', () => {
logger.info('[Redis] Connection established successfully.');
});
connection.on('error', (err) => {
// This is crucial for diagnosing Redis connection issues.
logger.error('[Redis] Connection error.', { error: err });
});
const execAsync = promisify(exec);
// --- Queues ---
export const flyerQueue = new Queue<FlyerJobData>('flyer-processing', {
connection,
defaultJobOptions: {
attempts: 3, // Attempt a job 3 times before marking it as failed.
backoff: {
type: 'exponential',
delay: 5000, // Start with a 5-second delay for the first retry
},
},
});
export const emailQueue = new Queue<EmailJobData>('email-sending', {
connection,
defaultJobOptions: {
attempts: 5, // Emails can be retried more aggressively
backoff: {
type: 'exponential',
delay: 10000, // Start with a 10-second delay
},
},
});
export const analyticsQueue = new Queue<AnalyticsJobData>('analytics-reporting', {
connection,
defaultJobOptions: {
attempts: 2, // Analytics can be intensive, so fewer retries might be desired.
backoff: {
type: 'exponential',
delay: 60000, // Wait a minute before retrying.
},
// Remove job from queue on completion to save space, as results are in the DB.
removeOnComplete: true,
removeOnFail: 50, // Keep the last 50 failed jobs for inspection.
},
});
export const weeklyAnalyticsQueue = new Queue<WeeklyAnalyticsJobData>('weekly-analytics-reporting', {
connection,
defaultJobOptions: {
attempts: 2,
backoff: {
type: 'exponential',
delay: 3600000, // 1 hour delay for retries
},
removeOnComplete: true,
removeOnFail: 50,
},
});
export const cleanupQueue = new Queue<CleanupJobData>('file-cleanup', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 30000, // Retry cleanup after 30 seconds
},
removeOnComplete: true, // No need to keep successful cleanup jobs
},
});
// --- Job Data Interfaces ---
interface EmailJobData {
to: string;
subject: string;
text: string;
html: string;
}
/**
* Defines the data for an analytics job.
*/
interface AnalyticsJobData {
reportDate: string; // e.g., '2024-10-26'
}
/**
* Defines the data for a weekly analytics job.
*/
interface WeeklyAnalyticsJobData {
reportYear: number;
reportWeek: number; // ISO week number (1-53)
}
interface CleanupJobData {
flyerId: number;
// An array of absolute file paths to be deleted. Made optional for manual cleanup triggers.
paths?: string[];
}
// --- Worker Instantiation ---
// Create an adapter for fsPromises to match the IFileSystem interface.
const fsAdapter: IFileSystem = {
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
unlink: (path: string) => fsPromises.unlink(path),
};
// Instantiate the service with its real dependencies
const flyerProcessingService = new FlyerProcessingService(
aiService,
db,
fsAdapter,
execAsync,
cleanupQueue, // Inject the cleanup queue to break the circular dependency
new FlyerDataTransformer() // Inject the new transformer
);
/**
* A generic function to attach logging event listeners to any worker.
* This centralizes logging for job completion and final failure.
* @param worker The BullMQ worker instance.
*/
const attachWorkerEventListeners = (worker: Worker) => {
worker.on('completed', (job: Job, returnValue: unknown) => {
logger.info(`[${worker.name}] Job ${job.id} completed successfully.`, { returnValue });
});
worker.on('failed', (job: Job | undefined, error: Error) => {
// This event fires after all retries have failed.
logger.error(`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`, { error: error.message, stack: error.stack, jobData: job?.data });
});
};
export const flyerWorker = new Worker<FlyerJobData>(
'flyer-processing', // Must match the queue name
(job) => flyerProcessingService.processJob(job), // The processor function
{
connection,
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
}
);
/**
* A dedicated worker process for sending emails.
*/
export const emailWorker = new Worker<EmailJobData>(
'email-sending',
async (job: Job<EmailJobData>) => {
const { to, subject } = job.data;
logger.info(`[EmailWorker] Sending email for job ${job.id}`, { to, subject });
try {
await emailService.sendEmail(job.data);
} catch (error: unknown) {
// Standardize error logging to capture the full error object, including the stack trace.
// This provides more context for debugging than just logging the message.
logger.error(`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
// Log the full error object for better diagnostics.
error: error instanceof Error ? error : new Error(String(error)),
// Also include the job data for context.
jobData: job.data,
});
// Re-throw to let BullMQ handle the failure and retry.
throw error;
}
},
{
connection,
concurrency: parseInt(process.env.EMAIL_WORKER_CONCURRENCY || '10', 10),
}
);
/**
* A dedicated worker for generating daily analytics reports.
* This is a placeholder for the actual report generation logic.
*/
export const analyticsWorker = new Worker<AnalyticsJobData>(
'analytics-reporting',
async (job: Job<AnalyticsJobData>) => {
const { reportDate } = job.data;
logger.info(`[AnalyticsWorker] Starting report generation for job ${job.id}`, { reportDate });
try {
// Special case for testing the retry mechanism
if (reportDate === 'FAIL') {
throw new Error('This is a test failure for the analytics job.');
}
// In a real implementation, you would call a database function here.
// For example: await db.generateDailyAnalyticsReport(reportDate);
await new Promise(resolve => setTimeout(resolve, 10000)); // Simulate a 10-second task
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
} catch (error: unknown) {
// Standardize error logging.
logger.error(`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: error instanceof Error ? error : new Error(String(error)),
jobData: job.data,
});
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
{
connection,
concurrency: parseInt(process.env.ANALYTICS_WORKER_CONCURRENCY || '1', 10),
}
);
/**
* A dedicated worker for cleaning up flyer-related files from the filesystem.
* This is triggered manually by an admin after a flyer has been reviewed.
*/
export const cleanupWorker = new Worker<CleanupJobData>(
// This worker now handles two types of cleanup jobs.
'file-cleanup', // The queue name
async (job: Job<CleanupJobData>) => {
// Destructure the data from the job payload.
const { flyerId, paths } = job.data;
logger.info(`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`, { paths });
try {
if (!paths || paths.length === 0) {
logger.warn(`[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`);
return;
}
// Iterate over the file paths provided in the job data and delete each one.
for (const filePath of paths) {
try {
await fsAdapter.unlink(filePath);
logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`);
} catch (unlinkError: unknown) {
// If the file doesn't exist, it's a success from our perspective.
// We can log it as a warning and continue without failing the job.
if (unlinkError instanceof Error && 'code' in unlinkError && unlinkError.code === 'ENOENT') {
logger.warn(`[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`);
} else {
throw unlinkError; // For any other error (e.g., permissions), re-throw to fail the job.
}
}
}
logger.info(`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`);
} catch (error: unknown) {
// Standardize error logging.
logger.error(`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: error instanceof Error ? error : new Error(String(error)),
});
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
{
connection,
concurrency: parseInt(process.env.CLEANUP_WORKER_CONCURRENCY || '10', 10),
}
);
/**
* A dedicated worker for generating weekly analytics reports.
* This is a placeholder for the actual report generation logic.
*/
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
'weekly-analytics-reporting',
async (job: Job<WeeklyAnalyticsJobData>) => {
const { reportYear, reportWeek } = job.data;
logger.info(`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`, { reportYear, reportWeek });
try {
// Simulate a longer-running task for weekly reports
await new Promise(resolve => setTimeout(resolve, 30000)); // Simulate 30-second task
logger.info(`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`);
} catch (error: unknown) {
// Standardize error logging.
logger.error(`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: error instanceof Error ? error : new Error(String(error)),
jobData: job.data,
});
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
{
connection,
concurrency: parseInt(process.env.WEEKLY_ANALYTICS_WORKER_CONCURRENCY || '1', 10),
}
);
// --- Attach Event Listeners to All Workers ---
attachWorkerEventListeners(flyerWorker);
attachWorkerEventListeners(emailWorker);
attachWorkerEventListeners(analyticsWorker);
attachWorkerEventListeners(cleanupWorker);
attachWorkerEventListeners(weeklyAnalyticsWorker);
logger.info('All workers started and listening for jobs.');
/**
* A function to gracefully shut down all queue workers and connections.
* This is essential for preventing jobs from getting stuck in an 'active' state
* when the application process is terminated.
* @param signal The signal that triggered the shutdown (e.g., 'SIGINT').
*/
export const gracefulShutdown = async (signal: string) => {
logger.info(`[Shutdown] Received ${signal}. Closing all workers and queues...`);
// The order is important: close workers first, then queues.
await Promise.all([
flyerWorker.close(),
emailWorker.close(),
analyticsWorker.close(),
cleanupWorker.close(),
]);
logger.info('[Shutdown] All workers have been closed.');
logger.info('[Shutdown] Graceful shutdown complete.');
process.exit(0);
};