Files
flyer-crawler.projectium.com/src/services/queueService.server.ts
Torben Sorensen 409abbaf24
All checks were successful
Deploy to Web Server flyer-crawler.projectium.com / deploy (push) Successful in 3m40s
fix: improve image source handling in FlyerDisplay and adjust worker concurrency fallback
2025-12-04 18:42:50 -08:00

339 lines
13 KiB
TypeScript

// src/services/queueService.server.ts
import { Queue, Worker, Job } from 'bullmq';
import IORedis from 'ioredis'; // Correctly imported
import path from 'path';
import fs from 'fs/promises';
import { exec } from 'child_process';
import { promisify } from 'util';
import { logger } from './logger.server';
import * as aiService from './aiService.server';
import * as emailService from './emailService.server';
import * as db from './db/index.db';
import { generateFlyerIcon } from '../utils/imageProcessor';
export const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // Important for BullMQ
password: process.env.REDIS_PASSWORD, // Add the password from environment variables
});
const execAsync = promisify(exec);
export const flyerQueue = new Queue<FlyerJobData>('flyer-processing', {
connection,
defaultJobOptions: {
attempts: 3, // Attempt a job 3 times before marking it as failed.
backoff: {
type: 'exponential',
delay: 5000, // Start with a 5-second delay for the first retry
},
},
});
export const emailQueue = new Queue<EmailJobData>('email-sending', {
connection,
defaultJobOptions: {
attempts: 5, // Emails can be retried more aggressively
backoff: {
type: 'exponential',
delay: 10000, // Start with a 10-second delay
},
},
});
export const analyticsQueue = new Queue<AnalyticsJobData>('analytics-reporting', {
connection,
defaultJobOptions: {
attempts: 2, // Analytics can be intensive, so fewer retries might be desired.
backoff: {
type: 'exponential',
delay: 60000, // Wait a minute before retrying.
},
// Remove job from queue on completion to save space, as results are in the DB.
removeOnComplete: true,
removeOnFail: 50, // Keep the last 50 failed jobs for inspection.
},
});
export const cleanupQueue = new Queue<CleanupJobData>('file-cleanup', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 30000, // Retry cleanup after 30 seconds
},
removeOnComplete: true, // No need to keep successful cleanup jobs
},
});
// --- Job Data Interfaces ---
interface FlyerJobData {
filePath: string;
originalFileName: string;
checksum: string;
userId?: string;
submitterIp?: string;
userProfileAddress?: string;
}
interface EmailJobData {
to: string;
subject: string;
text: string;
html: string;
}
/**
* Defines the data for an analytics job.
*/
interface AnalyticsJobData {
reportDate: string; // e.g., '2024-10-26'
}
interface CleanupJobData {
flyerId: number;
}
/**
* The main worker process for handling flyer jobs.
* This should be run as a separate process.
*/
export const flyerWorker = new Worker<FlyerJobData>(
'flyer-processing',
async (job: Job<FlyerJobData>) => {
const { filePath, originalFileName, checksum, userId, submitterIp, userProfileAddress } = job.data;
const createdImagePaths: string[] = [];
let jobSucceeded = false;
logger.info(`[Worker] Processing job ${job.id} for file: ${originalFileName}`);
try {
await job.updateProgress({ message: 'Starting process...' });
// 1. Convert PDF to images if necessary
const imagePaths: { path: string; mimetype: string }[] = [];
const fileExt = path.extname(filePath).toLowerCase();
if (fileExt === '.pdf') {
await job.updateProgress({ message: 'Converting PDF to images...' });
const outputDir = path.dirname(filePath);
const outputFilePrefix = path.join(outputDir, path.basename(filePath, '.pdf'));
// Use the pdftocairo command-line tool for robust, server-side PDF conversion.
// -jpeg flag outputs JPEG files.
// -r 150 sets the resolution to 150 DPI.
// The final argument is the output file prefix. pdftocairo will append page numbers.
const command = `pdftocairo -jpeg -r 150 "${filePath}" "${outputFilePrefix}"`;
await execAsync(command);
// After conversion, find the generated image files.
const filesInDir = await fs.readdir(outputDir);
const generatedImages = filesInDir
.filter(f => f.startsWith(path.basename(outputFilePrefix)) && f.endsWith('.jpg'))
.sort(); // Sort to ensure page order.
for (const img of generatedImages) {
imagePaths.push({ path: path.join(outputDir, img), mimetype: 'image/jpeg' });
const imagePath = path.join(outputDir, img);
imagePaths.push({ path: imagePath, mimetype: 'image/jpeg' });
createdImagePaths.push(imagePath); // Track generated images for cleanup
}
logger.info(`[Worker] Converted PDF to ${imagePaths.length} images.`);
} else {
imagePaths.push({ path: filePath, mimetype: `image/${fileExt.slice(1)}` });
}
// 2. Call AI Service
const masterItems = await db.getAllMasterItems(); // Fetch master items for the AI
const extractedData = await aiService.extractCoreDataFromFlyerImage(
imagePaths,
masterItems,
submitterIp,
userProfileAddress
);
logger.info(`[Worker] AI extracted ${extractedData.items.length} items.`);
// 3. Save to Database
const firstImage = imagePaths[0].path;
const iconFileName = await generateFlyerIcon(firstImage, path.join(path.dirname(firstImage), 'icons'));
const flyerData = {
file_name: originalFileName,
image_url: `/flyer-images/${path.basename(firstImage)}`,
icon_url: `/flyer-images/icons/${iconFileName}`,
checksum,
store_name: extractedData.store_name || 'Unknown Store (auto)',
valid_from: extractedData.valid_from,
valid_to: extractedData.valid_to,
store_address: extractedData.store_address,
item_count: 0, // Set default to 0; the trigger will update it.
uploaded_by: userId,
};
const newFlyer = await db.createFlyerAndItems(flyerData, extractedData.items);
logger.info(`[Worker] Successfully saved new flyer ID: ${newFlyer.flyer_id}`);
// 4. Log activity
await db.logActivity({
userId,
action: 'flyer_processed',
displayText: `Processed a new flyer for ${flyerData.store_name}.`,
details: { flyerId: newFlyer.flyer_id, storeName: flyerData.store_name },
});
// TODO: Cleanup temporary files (original PDF and generated images)
jobSucceeded = true; // Mark the job as successful before the finally block.
return { flyerId: newFlyer.flyer_id };
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred';
logger.error(`[Worker] Job ${job.id} failed for file ${originalFileName}. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
jobData: job.data,
});
// Update the job's progress to show the error in the UI
await job.updateProgress({ message: `Error: ${errorMessage}` });
// Re-throw the error to let BullMQ know the job has failed and should be retried or marked as failed.
throw error;
} finally {
// This block will run after the try/catch, regardless of success or failure.
if (jobSucceeded) {
logger.info(`[Worker] Job ${job.id} succeeded. File cleanup is disabled; original files will be kept.`);
// try {
// // Delete the generated JPEG images from the PDF conversion.
// for (const imagePath of createdImagePaths) {
// await fs.unlink(imagePath);
// logger.debug(`[Worker] Deleted temporary image: ${imagePath}`);
// }
// // Finally, delete the original uploaded file (PDF or image).
// await fs.unlink(filePath);
// logger.debug(`[Worker] Deleted original upload: ${filePath}`);
// } catch (cleanupError) {
// logger.error(`[Worker] Job ${job.id} completed, but failed during file cleanup.`, { error: cleanupError });
// // We don't re-throw here because the main job was successful.
// }
} else {
logger.warn(`[Worker] Job ${job.id} failed. Temporary files will not be cleaned up to allow for manual inspection.`);
}
}
},
{
connection,
// Control the number of concurrent jobs. This directly limits parallel calls to the AI API.
// It's sourced from an environment variable for easy configuration without code changes.
// The Google AI free tier limit is 60 RPM, so a low concurrency is safe.
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
}
);
/**
* A dedicated worker process for sending emails.
*/
export const emailWorker = new Worker<EmailJobData>(
'email-sending',
async (job: Job<EmailJobData>) => {
const { to, subject } = job.data;
logger.info(`[EmailWorker] Sending email for job ${job.id}`, { to, subject });
try {
await emailService.sendEmail(job.data);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown email error occurred';
logger.error(`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: errorMessage,
jobData: job.data,
});
// Re-throw to let BullMQ handle the failure and retry.
throw error;
}
},
{
connection,
// Sending emails is less resource-intensive, so we can allow higher concurrency.
concurrency: 10,
}
);
/**
* A dedicated worker for generating daily analytics reports.
* This is a placeholder for the actual report generation logic.
*/
export const analyticsWorker = new Worker<AnalyticsJobData>(
'analytics-reporting',
async (job: Job<AnalyticsJobData>) => {
const { reportDate } = job.data;
logger.info(`[AnalyticsWorker] Starting report generation for job ${job.id}`, { reportDate });
try {
// Special case for testing the retry mechanism
if (reportDate === 'FAIL') {
throw new Error('This is a test failure for the analytics job.');
}
// In a real implementation, you would call a database function here.
// For example: await db.generateDailyAnalyticsReport(reportDate);
await new Promise(resolve => setTimeout(resolve, 10000)); // Simulate a 10-second task
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown analytics error occurred';
logger.error(`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: errorMessage,
jobData: job.data,
});
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
{
connection,
concurrency: 1, // Analytics jobs are often resource-intensive, so process them one at a time.
}
);
/**
* A dedicated worker for cleaning up flyer-related files from the filesystem.
* This is triggered manually by an admin after a flyer has been reviewed.
*/
export const cleanupWorker = new Worker<CleanupJobData>(
'file-cleanup',
async (job: Job<CleanupJobData>) => {
const { flyerId } = job.data;
logger.info(`[CleanupWorker] Starting file cleanup for flyer ID: ${flyerId}`);
try {
// 1. Fetch the flyer from the database to get its file paths.
const flyer = await db.getFlyerById(flyerId);
if (!flyer) {
throw new Error(`Flyer with ID ${flyerId} not found. Cannot perform cleanup.`);
}
// 2. Determine the base path for the flyer images.
const storagePath = process.env.STORAGE_PATH!;
// 3. Delete the main flyer image.
const mainImagePath = path.join(storagePath, path.basename(flyer.image_url));
await fs.unlink(mainImagePath).catch(err => logger.warn(`[CleanupWorker] Could not delete main image (may not exist): ${mainImagePath}`, { error: err.message }));
logger.info(`[CleanupWorker] Deleted main image: ${mainImagePath}`);
// 4. Delete the flyer icon.
if (flyer.icon_url) {
const iconPath = path.join(storagePath, 'icons', path.basename(flyer.icon_url));
await fs.unlink(iconPath).catch(err => logger.warn(`[CleanupWorker] Could not delete icon (may not exist): ${iconPath}`, { error: err.message }));
logger.info(`[CleanupWorker] Deleted icon: ${iconPath}`);
}
// Note: This process does not delete the original PDF, as its path is not stored.
// A more advanced implementation could store the original path in the job data and pass it here.
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown cleanup error occurred';
logger.error(`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed.`, { error: errorMessage });
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
{
connection,
concurrency: 10, // Cleanup is not very resource-intensive.
}
);
logger.info('All workers started and listening for jobs.');