Files
flyer-crawler.projectium.com/src/services/flyerProcessingService.server.ts
Torben Sorensen 517a268307
All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 12m5s
fix unit tests
2025-12-29 17:18:52 -08:00

292 lines
12 KiB
TypeScript

// src/services/flyerProcessingService.server.ts
import type { Job, Queue } from 'bullmq';
import { UnrecoverableError } from 'bullmq';
import type { Logger } from 'pino';
import type { FlyerFileHandler, IFileSystem, ICommandExecutor } from './flyerFileHandler.server';
import type { FlyerAiProcessor } from './flyerAiProcessor.server';
import type * as Db from './db/index.db';
import type { AdminRepository } from './db/admin.db';
import { FlyerDataTransformer } from './flyerDataTransformer';
import type { FlyerJobData, CleanupJobData } from '../types/job-data';
import {
FlyerProcessingError,
PdfConversionError,
AiDataValidationError,
UnsupportedFileTypeError,
} from './processingErrors';
import { createFlyerAndItems } from './db/flyer.db';
import { logger as globalLogger } from './logger.server';
// Define ProcessingStage locally as it's not exported from the types file.
export type ProcessingStage = {
name: string;
status: 'pending' | 'in-progress' | 'completed' | 'failed' | 'skipped';
critical: boolean;
detail?: string;
};
/**
* This service orchestrates the entire flyer processing workflow. It's responsible for
* coordinating various sub-services (file handling, AI processing, data transformation,
* and database operations) to process a flyer from upload to completion.
*/
export class FlyerProcessingService {
constructor(
private fileHandler: FlyerFileHandler,
private aiProcessor: FlyerAiProcessor,
// This service only needs the `logActivity` method from the `adminRepo`.
// By using `Pick`, we create a more focused and testable dependency.
private db: { adminRepo: Pick<AdminRepository, 'logActivity'> },
private fs: IFileSystem,
// By depending on `Pick<Queue, 'add'>`, we specify that this service only needs
// an object with an `add` method that matches the Queue's `add` method signature.
// This decouples the service from the full BullMQ Queue implementation, making it more modular and easier to test.
private cleanupQueue: Pick<Queue<CleanupJobData>, 'add'>,
private transformer: FlyerDataTransformer,
) {}
/**
* Orchestrates the processing of a flyer job.
* @param job The BullMQ job containing flyer data.
* @returns An object containing the ID of the newly created flyer.
*/
async processJob(job: Job<FlyerJobData>): Promise<{ flyerId: number }> {
// Create a logger instance with job-specific context for better traceability.
const logger = globalLogger.child({ jobId: job.id, jobName: job.name, ...job.data });
logger.info('Picked up flyer processing job.');
const stages: ProcessingStage[] = [
{ name: 'Preparing Inputs', status: 'pending', critical: true, detail: 'Validating and preparing file...' },
{ name: 'Extracting Data with AI', status: 'pending', critical: true, detail: 'Communicating with AI model...' },
{ name: 'Transforming AI Data', status: 'pending', critical: true },
{ name: 'Saving to Database', status: 'pending', critical: true },
];
// Keep track of all created file paths for eventual cleanup.
const allFilePaths: string[] = [job.data.filePath];
try {
// Stage 1: Prepare Inputs (e.g., convert PDF to images)
stages[0].status = 'in-progress';
await job.updateProgress({ stages });
const { imagePaths, createdImagePaths } = await this.fileHandler.prepareImageInputs(
job.data.filePath,
job,
logger,
);
allFilePaths.push(...createdImagePaths);
stages[0].status = 'completed';
stages[0].detail = `${imagePaths.length} page(s) ready for AI.`;
await job.updateProgress({ stages });
// Stage 2: Extract Data with AI
stages[1].status = 'in-progress';
await job.updateProgress({ stages });
const aiResult = await this.aiProcessor.extractAndValidateData(imagePaths, job.data, logger);
stages[1].status = 'completed';
await job.updateProgress({ stages });
// Stage 3: Transform AI Data into DB format
stages[2].status = 'in-progress';
await job.updateProgress({ stages });
const { flyerData, itemsForDb } = await this.transformer.transform(
aiResult,
imagePaths,
job.data.originalFileName,
job.data.checksum,
job.data.userId,
logger,
);
stages[2].status = 'completed';
await job.updateProgress({ stages });
// Stage 4: Save to Database
stages[3].status = 'in-progress';
await job.updateProgress({ stages });
const { flyer } = await createFlyerAndItems(flyerData, itemsForDb, logger);
stages[3].status = 'completed';
await job.updateProgress({ stages });
// Stage 5: Log Activity
await this.db.adminRepo.logActivity(
{
action: 'flyer_processed',
displayText: `Processed flyer for ${flyerData.store_name}`,
details: { flyer_id: flyer.flyer_id, store_name: flyerData.store_name },
userId: job.data.userId,
},
logger,
);
// Enqueue a job to clean up the original and any generated files.
await this.cleanupQueue.add(
'cleanup-flyer-files',
{ flyerId: flyer.flyer_id, paths: allFilePaths },
{ removeOnComplete: true },
);
logger.info(`Successfully processed job and enqueued cleanup for flyer ID: ${flyer.flyer_id}`);
return { flyerId: flyer.flyer_id };
} catch (error) {
logger.warn('Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.');
// This private method handles error reporting and re-throwing.
await this._reportErrorAndThrow(error, job, logger, stages);
// This line is technically unreachable because the above method always throws,
// but it's required to satisfy TypeScript's control flow analysis.
throw error;
}
}
/**
* Processes a job to clean up temporary files associated with a flyer.
* @param job The BullMQ job containing cleanup data.
* @returns An object indicating the status of the cleanup operation.
*/
async processCleanupJob(job: Job<CleanupJobData>): Promise<{ status: string; deletedCount?: number; reason?: string }> {
const logger = globalLogger.child({ jobId: job.id, jobName: job.name, ...job.data });
logger.info('Picked up file cleanup job.');
const { paths } = job.data;
if (!paths || paths.length === 0) {
logger.warn('Job received no paths to clean. Skipping.');
return { status: 'skipped', reason: 'no paths' };
}
const results = await Promise.allSettled(
paths.map(async (filePath) => {
try {
await this.fs.unlink(filePath);
logger.info(`Successfully deleted temporary file: ${filePath}`);
} catch (error) {
const nodeError = error as NodeJS.ErrnoException;
if (nodeError.code === 'ENOENT') {
// This is not a critical error; the file might have been deleted already.
logger.warn(`File not found during cleanup (already deleted?): ${filePath}`);
} else {
logger.error({ err: nodeError, path: filePath }, 'Failed to delete temporary file.');
throw error; // Re-throw to mark this specific deletion as failed.
}
}
}),
);
const failedDeletions = results.filter((r) => r.status === 'rejected');
if (failedDeletions.length > 0) {
const failedPaths = paths.filter((_, i) => results[i].status === 'rejected');
throw new Error(`Failed to delete ${failedDeletions.length} file(s): ${failedPaths.join(', ')}`);
}
logger.info(`Successfully deleted all ${paths.length} temporary files.`);
return { status: 'success', deletedCount: paths.length };
}
/**
* A private helper to normalize errors, update job progress with an error state,
* and re-throw the error to be handled by BullMQ.
* @param error The error that was caught.
* @param job The BullMQ job instance.
* @param logger The logger instance.
*/
private async _reportErrorAndThrow(
error: unknown,
job: Job,
logger: Logger,
initialStages: ProcessingStage[],
): Promise<never> {
const normalizedError = error instanceof Error ? error : new Error(String(error));
let errorPayload: { errorCode: string; message: string; [key: string]: any };
let stagesToReport: ProcessingStage[] = [...initialStages]; // Create a mutable copy
if (normalizedError instanceof FlyerProcessingError) {
errorPayload = normalizedError.toErrorPayload();
} else {
const message = normalizedError.message || 'An unknown error occurred.';
errorPayload = { errorCode: 'UNKNOWN_ERROR', message };
}
// Determine which stage failed
let errorStageIndex = -1;
// 1. Try to map specific error codes/messages to stages
if (errorPayload.errorCode === 'PDF_CONVERSION_FAILED' || errorPayload.errorCode === 'UNSUPPORTED_FILE_TYPE') {
errorStageIndex = stagesToReport.findIndex(s => s.name === 'Preparing Inputs');
} else if (errorPayload.errorCode === 'AI_VALIDATION_FAILED') {
errorStageIndex = stagesToReport.findIndex(s => s.name === 'Extracting Data with AI');
} else if (errorPayload.message.includes('Icon generation failed')) {
errorStageIndex = stagesToReport.findIndex(s => s.name === 'Transforming AI Data');
} else if (errorPayload.message.includes('Database transaction failed')) {
errorStageIndex = stagesToReport.findIndex(s => s.name === 'Saving to Database');
}
// 2. If not mapped, find the currently running stage
if (errorStageIndex === -1) {
errorStageIndex = stagesToReport.findIndex(s => s.status === 'in-progress');
}
// 3. Fallback to the last stage
if (errorStageIndex === -1 && stagesToReport.length > 0) {
errorStageIndex = stagesToReport.length - 1;
}
// Update stages
if (errorStageIndex !== -1) {
stagesToReport[errorStageIndex] = {
...stagesToReport[errorStageIndex],
status: 'failed',
detail: errorPayload.message, // Use the user-friendly message as detail
};
// Mark subsequent critical stages as skipped
for (let i = errorStageIndex + 1; i < stagesToReport.length; i++) {
if (stagesToReport[i].critical) {
// When a stage is skipped, we don't need its previous 'detail' property.
// This creates a clean 'skipped' state object by removing `detail` and keeping the rest.
const { detail, ...restOfStage } = stagesToReport[i];
stagesToReport[i] = { ...restOfStage, status: 'skipped' };
}
}
}
errorPayload.stages = stagesToReport;
// Logging logic
if (normalizedError instanceof FlyerProcessingError) {
const logDetails: Record<string, any> = { err: normalizedError };
if (normalizedError instanceof AiDataValidationError) {
logDetails.validationErrors = normalizedError.validationErrors;
logDetails.rawData = normalizedError.rawData;
}
// Also include stderr for PdfConversionError in logs
if (normalizedError instanceof PdfConversionError) {
logDetails.stderr = normalizedError.stderr;
}
// Include the errorPayload details in the log, but avoid duplicating err, validationErrors, rawData
Object.assign(logDetails, errorPayload);
// Remove the duplicated err property if it was assigned by Object.assign
if ('err' in logDetails && logDetails.err === normalizedError) {
// This check prevents accidental deletion if 'err' was a legitimate property of errorPayload
delete logDetails.err;
}
// Ensure the original error object is always passed as 'err' for consistency in logging
logDetails.err = normalizedError;
logger.error(logDetails, `A known processing error occurred: ${normalizedError.name}`);
} else {
logger.error({ err: normalizedError, ...errorPayload }, `An unknown error occurred: ${errorPayload.message}`);
}
// Check for specific error messages that indicate a non-retriable failure, like quota exhaustion.
if (errorPayload.message.toLowerCase().includes('quota') || errorPayload.message.toLowerCase().includes('resource_exhausted')) {
const unrecoverablePayload = { errorCode: 'QUOTA_EXCEEDED', message: 'An AI quota has been exceeded. Please try again later.', stages: errorPayload.stages };
await job.updateProgress(unrecoverablePayload);
throw new UnrecoverableError(unrecoverablePayload.message);
}
await job.updateProgress(errorPayload);
throw normalizedError;
}
}