Files
flyer-crawler.projectium.com/src/services/queueService.server.ts
Torben Sorensen 33b1800f72
All checks were successful
Deploy to Web Server flyer-crawler.projectium.com / deploy (push) Successful in 4m30s
ugh
2025-12-02 18:43:49 -08:00

249 lines
9.3 KiB
TypeScript

// src/services/queueService.server.ts
import { Queue, Worker, Job } from 'bullmq';
import IORedis from 'ioredis'; // Correctly imported
import path from 'path';
import fs from 'fs/promises';
// Use pdfjs-dist for PDF parsing and sharp for image processing.
import * as pdfjs from 'pdfjs-dist/legacy/build/pdf.mjs';
import sharp from 'sharp';
import { logger } from './logger.server';
import * as aiService from './aiService.server';
import * as emailService from './emailService.server';
import * as db from './db';
import { generateFlyerIcon } from '../utils/imageProcessor';
export const connection = new IORedis(process.env.REDIS_URL || 'redis://127.0.0.1:6379', {
maxRetriesPerRequest: null, // Important for BullMQ
password: process.env.REDIS_PASSWORD, // Add the password from environment variables
});
export const flyerQueue = new Queue<FlyerJobData>('flyer-processing', {
connection,
defaultJobOptions: {
attempts: 3, // Attempt a job 3 times before marking it as failed.
backoff: {
type: 'exponential',
delay: 5000, // Start with a 5-second delay for the first retry
},
},
});
export const emailQueue = new Queue<EmailJobData>('email-sending', {
connection,
defaultJobOptions: {
attempts: 5, // Emails can be retried more aggressively
backoff: {
type: 'exponential',
delay: 10000, // Start with a 10-second delay
},
},
});
export const analyticsQueue = new Queue<AnalyticsJobData>('analytics-reporting', {
connection,
defaultJobOptions: {
attempts: 2, // Analytics can be intensive, so fewer retries might be desired.
backoff: {
type: 'exponential',
delay: 60000, // Wait a minute before retrying.
},
// Remove job from queue on completion to save space, as results are in the DB.
removeOnComplete: true,
removeOnFail: 50, // Keep the last 50 failed jobs for inspection.
},
});
// --- Job Data Interfaces ---
interface FlyerJobData {
filePath: string;
originalFileName: string;
checksum: string;
userId?: string;
}
interface EmailJobData {
to: string;
subject: string;
text: string;
html: string;
}
/**
* Defines the data for an analytics job.
*/
interface AnalyticsJobData {
reportDate: string; // e.g., '2024-10-26'
}
/**
* The main worker process for handling flyer jobs.
* This should be run as a separate process.
*/
export const flyerWorker = new Worker<FlyerJobData>(
'flyer-processing',
async (job: Job<FlyerJobData>) => {
const { filePath, originalFileName, checksum, userId } = job.data;
logger.info(`[Worker] Processing job ${job.id} for file: ${originalFileName}`);
try {
await job.updateProgress({ message: 'Starting process...' });
// 1. Convert PDF to images if necessary
const imagePaths: { path: string; mimetype: string }[] = [];
const fileExt = path.extname(filePath).toLowerCase();
if (fileExt === '.pdf') {
await job.updateProgress({ message: 'Converting PDF to images...' });
// Load the PDF document using pdfjs-dist
const data = new Uint8Array(await fs.readFile(filePath));
const pdfDocument = await pdfjs.getDocument({ data }).promise;
const outputDir = path.dirname(filePath);
for (let i = 1; i <= pdfDocument.numPages; i++) {
const page = await pdfDocument.getPage(i);
const viewport = page.getViewport({ scale: 1.5 }); // ~150 DPI
// Create a fake canvas and context to render the PDF into raw pixel data.
// This is a common pattern for using pdf.js on the server without a real canvas.
const canvas = {
width: viewport.width,
height: viewport.height,
getContext: () => context,
};
const context = {
canvas: canvas, // The context needs a back-reference to its canvas.
getImageData: () => ({ data: new Uint8ClampedArray(viewport.width * viewport.height * 4) }),
};
const renderContext = { canvasContext: context as any, viewport, canvas: canvas as any };
const renderTask = page.render(renderContext);
await renderTask.promise;
const rawPixelData = context.getImageData().data;
const imageFileName = `${path.basename(filePath, '.pdf')}_page_${i}.jpeg`;
const imageOutputPath = path.join(outputDir, imageFileName);
await sharp(rawPixelData, { raw: { width: viewport.width, height: viewport.height, channels: 4 } }).jpeg().toFile(imageOutputPath);
imagePaths.push({ path: imageOutputPath, mimetype: 'image/jpeg' });
}
logger.info(`[Worker] Converted PDF to ${imagePaths.length} images.`);
} else {
imagePaths.push({ path: filePath, mimetype: `image/${fileExt.slice(1)}` });
}
// 2. Call AI Service
const masterItems = await db.getAllMasterItems(); // Fetch master items for the AI
const extractedData = await aiService.extractCoreDataFromFlyerImage(imagePaths, masterItems);
logger.info(`[Worker] AI extracted ${extractedData.items.length} items.`);
// 3. Save to Database
const firstImage = imagePaths[0].path;
const iconFileName = await generateFlyerIcon(firstImage, path.join(path.dirname(firstImage), 'icons'));
const flyerData = {
file_name: originalFileName,
image_url: `/assets/${path.basename(firstImage)}`,
icon_url: `/assets/icons/${iconFileName}`,
checksum,
store_name: extractedData.store_name || 'Unknown Store (auto)',
valid_from: extractedData.valid_from,
valid_to: extractedData.valid_to,
store_address: extractedData.store_address,
uploaded_by: userId,
};
const newFlyer = await db.createFlyerAndItems(flyerData, extractedData.items);
logger.info(`[Worker] Successfully saved new flyer ID: ${newFlyer.flyer_id}`);
// 4. Log activity
await db.logActivity({
userId,
action: 'flyer_processed',
displayText: `Processed a new flyer for ${flyerData.store_name}.`,
details: { flyerId: newFlyer.flyer_id, storeName: flyerData.store_name },
});
// TODO: Cleanup temporary files (original PDF and generated images)
return { flyerId: newFlyer.flyer_id };
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred';
logger.error(`[Worker] Job ${job.id} failed for file ${originalFileName}. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
jobData: job.data,
});
// Update the job's progress to show the error in the UI
await job.updateProgress({ message: `Error: ${errorMessage}` });
// Re-throw the error to let BullMQ know the job has failed and should be retried or marked as failed.
throw error;
}
},
{
connection,
// Control the number of concurrent jobs. This directly limits parallel calls to the AI API.
// It's sourced from an environment variable for easy configuration without code changes.
// The Google AI free tier limit is 60 RPM, so a low concurrency is safe.
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5', 10),
}
);
/**
* A dedicated worker process for sending emails.
*/
export const emailWorker = new Worker<EmailJobData>(
'email-sending',
async (job: Job<EmailJobData>) => {
const { to, subject } = job.data;
logger.info(`[EmailWorker] Sending email for job ${job.id}`, { to, subject });
try {
await emailService.sendEmail(job.data);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown email error occurred';
logger.error(`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: errorMessage,
jobData: job.data,
});
// Re-throw to let BullMQ handle the failure and retry.
throw error;
}
},
{
connection,
// Sending emails is less resource-intensive, so we can allow higher concurrency.
concurrency: 10,
}
);
/**
* A dedicated worker for generating daily analytics reports.
* This is a placeholder for the actual report generation logic.
*/
export const analyticsWorker = new Worker<AnalyticsJobData>(
'analytics-reporting',
async (job: Job<AnalyticsJobData>) => {
const { reportDate } = job.data;
logger.info(`[AnalyticsWorker] Starting report generation for job ${job.id}`, { reportDate });
try {
// In a real implementation, you would call a database function here.
// For example: await db.generateDailyAnalyticsReport(reportDate);
await new Promise(resolve => setTimeout(resolve, 10000)); // Simulate a 10-second task
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'An unknown analytics error occurred';
logger.error(`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: errorMessage,
jobData: job.data,
});
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
{
connection,
concurrency: 1, // Analytics jobs are often resource-intensive, so process them one at a time.
}
);
logger.info('All workers started and listening for jobs.');