All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 26m27s
357 lines
15 KiB
TypeScript
357 lines
15 KiB
TypeScript
// src/services/flyerProcessingService.server.ts
|
|
import { UnrecoverableError, type Job, type Queue } from 'bullmq';
|
|
import path from 'path';
|
|
import type { Logger } from 'pino';
|
|
import type { FlyerFileHandler, IFileSystem, ICommandExecutor } from './flyerFileHandler.server';
|
|
import type { FlyerAiProcessor } from './flyerAiProcessor.server';
|
|
import * as db from './db/index.db';
|
|
import { AdminRepository } from './db/admin.db';
|
|
import { FlyerDataTransformer } from './flyerDataTransformer';
|
|
import type { FlyerJobData, CleanupJobData } from '../types/job-data';
|
|
import {
|
|
FlyerProcessingError,
|
|
PdfConversionError,
|
|
AiDataValidationError,
|
|
UnsupportedFileTypeError,
|
|
DatabaseError, // This is from processingErrors
|
|
} from './processingErrors';
|
|
import { NotFoundError } from './db/errors.db';
|
|
import { createFlyerAndItems } from './db/flyer.db';
|
|
import { logger as globalLogger } from './logger.server'; // This was a duplicate, fixed.
|
|
import { generateFlyerIcon } from '../utils/imageProcessor';
|
|
|
|
// Define ProcessingStage locally as it's not exported from the types file.
|
|
export type ProcessingStage = {
|
|
name: string;
|
|
status: 'pending' | 'in-progress' | 'completed' | 'failed' | 'skipped';
|
|
critical: boolean;
|
|
detail?: string;
|
|
};
|
|
|
|
/**
|
|
* This service orchestrates the entire flyer processing workflow. It's responsible for
|
|
* coordinating various sub-services (file handling, AI processing, data transformation,
|
|
* and database operations) to process a flyer from upload to completion.
|
|
*/
|
|
export class FlyerProcessingService {
|
|
constructor(
|
|
private fileHandler: FlyerFileHandler,
|
|
private aiProcessor: FlyerAiProcessor,
|
|
private fs: IFileSystem,
|
|
// By depending on `Pick<Queue, 'add'>`, we specify that this service only needs
|
|
// an object with an `add` method that matches the Queue's `add` method signature.
|
|
// This decouples the service from the full BullMQ Queue implementation, making it more modular and easier to test.
|
|
private cleanupQueue: Pick<Queue<CleanupJobData>, 'add'>,
|
|
private transformer: FlyerDataTransformer,
|
|
) {}
|
|
|
|
/**
|
|
* Orchestrates the processing of a flyer job.
|
|
* @param job The BullMQ job containing flyer data.
|
|
* @returns An object containing the ID of the newly created flyer.
|
|
*/
|
|
async processJob(job: Job<FlyerJobData>): Promise<{ flyerId: number }> {
|
|
// Create a logger instance with job-specific context for better traceability.
|
|
const logger = globalLogger.child({ jobId: job.id, jobName: job.name, ...job.data });
|
|
logger.info('Picked up flyer processing job.');
|
|
|
|
const stages: ProcessingStage[] = [
|
|
{ name: 'Preparing Inputs', status: 'pending', critical: true, detail: 'Validating and preparing file...' },
|
|
{ name: 'Extracting Data with AI', status: 'pending', critical: true, detail: 'Communicating with AI model...' },
|
|
{ name: 'Transforming AI Data', status: 'pending', critical: true },
|
|
{ name: 'Saving to Database', status: 'pending', critical: true },
|
|
];
|
|
|
|
// Keep track of all created file paths for eventual cleanup.
|
|
const allFilePaths: string[] = [job.data.filePath];
|
|
|
|
try {
|
|
// Stage 1: Prepare Inputs (e.g., convert PDF to images)
|
|
stages[0].status = 'in-progress';
|
|
await job.updateProgress({ stages });
|
|
|
|
const { imagePaths, createdImagePaths } = await this.fileHandler.prepareImageInputs(
|
|
job.data.filePath,
|
|
job,
|
|
logger,
|
|
);
|
|
allFilePaths.push(...createdImagePaths);
|
|
stages[0].status = 'completed';
|
|
stages[0].detail = `${imagePaths.length} page(s) ready for AI.`;
|
|
await job.updateProgress({ stages });
|
|
|
|
// Stage 2: Extract Data with AI
|
|
stages[1].status = 'in-progress';
|
|
await job.updateProgress({ stages });
|
|
|
|
const aiResult = await this.aiProcessor.extractAndValidateData(imagePaths, job.data, logger);
|
|
stages[1].status = 'completed';
|
|
await job.updateProgress({ stages });
|
|
|
|
// Stage 3: Transform AI Data into DB format
|
|
stages[2].status = 'in-progress';
|
|
await job.updateProgress({ stages });
|
|
|
|
// The fileHandler has already prepared the primary image (e.g., by stripping EXIF data).
|
|
// We now generate an icon from it and prepare the filenames for the transformer.
|
|
const primaryImagePath = imagePaths[0].path;
|
|
const imageFileName = path.basename(primaryImagePath);
|
|
const iconsDir = path.join(path.dirname(primaryImagePath), 'icons');
|
|
const iconFileName = await generateFlyerIcon(primaryImagePath, iconsDir, logger);
|
|
|
|
// Add the newly generated icon to the list of files to be cleaned up.
|
|
// The main processed image path is already in `allFilePaths` via `createdImagePaths`.
|
|
allFilePaths.push(path.join(iconsDir, iconFileName));
|
|
|
|
console.log('[DEBUG] FlyerProcessingService calling transformer with:', { originalFileName: job.data.originalFileName, imageFileName, iconFileName, checksum: job.data.checksum, baseUrl: job.data.baseUrl });
|
|
|
|
const { flyerData, itemsForDb } = await this.transformer.transform(
|
|
aiResult,
|
|
job.data.originalFileName,
|
|
imageFileName,
|
|
iconFileName,
|
|
job.data.checksum,
|
|
job.data.userId,
|
|
logger,
|
|
job.data.baseUrl,
|
|
);
|
|
stages[2].status = 'completed';
|
|
await job.updateProgress({ stages });
|
|
|
|
// Stage 4: Save to Database
|
|
stages[3].status = 'in-progress';
|
|
await job.updateProgress({ stages });
|
|
|
|
let flyerId: number;
|
|
try {
|
|
const { flyer } = await db.withTransaction(async (client) => {
|
|
// This assumes createFlyerAndItems is refactored to accept a transactional client.
|
|
const { flyer: newFlyer } = await createFlyerAndItems(flyerData, itemsForDb, logger, client);
|
|
|
|
// Instantiate a new AdminRepository with the transactional client to ensure
|
|
// the activity log is part of the same transaction.
|
|
const transactionalAdminRepo = new AdminRepository(client);
|
|
await transactionalAdminRepo.logActivity(
|
|
{
|
|
action: 'flyer_processed',
|
|
displayText: `Processed flyer for ${flyerData.store_name}`,
|
|
details: { flyer_id: newFlyer.flyer_id, store_name: flyerData.store_name },
|
|
userId: job.data.userId,
|
|
},
|
|
logger,
|
|
);
|
|
|
|
return { flyer: newFlyer };
|
|
});
|
|
flyerId = flyer.flyer_id;
|
|
} catch (error) {
|
|
if (error instanceof FlyerProcessingError) throw error;
|
|
throw new DatabaseError(error instanceof Error ? error.message : String(error));
|
|
}
|
|
|
|
stages[3].status = 'completed';
|
|
await job.updateProgress({ stages });
|
|
|
|
// Enqueue a job to clean up the original and any generated files.
|
|
await this.cleanupQueue.add(
|
|
'cleanup-flyer-files',
|
|
{ flyerId, paths: allFilePaths },
|
|
{ removeOnComplete: true },
|
|
);
|
|
logger.info(`Successfully processed job and enqueued cleanup for flyer ID: ${flyerId}`);
|
|
|
|
return { flyerId };
|
|
} catch (error) {
|
|
logger.warn('Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.');
|
|
// Add detailed logging of the raw error object
|
|
if (error instanceof Error) {
|
|
logger.error({ err: error, stack: error.stack }, 'Raw error object in processJob catch block');
|
|
} else {
|
|
logger.error({ error }, 'Raw non-Error object in processJob catch block');
|
|
}
|
|
// This private method handles error reporting and re-throwing.
|
|
await this._reportErrorAndThrow(error, job, logger, stages);
|
|
// This line is technically unreachable because the above method always throws,
|
|
// but it's required to satisfy TypeScript's control flow analysis.
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Processes a job to clean up temporary files associated with a flyer.
|
|
* @param job The BullMQ job containing cleanup data.
|
|
* @returns An object indicating the status of the cleanup operation.
|
|
*/
|
|
async processCleanupJob(job: Job<CleanupJobData>): Promise<{ status: string; deletedCount?: number; reason?: string }> {
|
|
const logger = globalLogger.child({ jobId: job.id, jobName: job.name, ...job.data });
|
|
logger.info('Picked up file cleanup job.');
|
|
|
|
const { flyerId, paths } = job.data;
|
|
let pathsToDelete = paths;
|
|
|
|
// If no paths are provided (e.g., from a manual trigger), attempt to derive them from the database.
|
|
if (!pathsToDelete || pathsToDelete.length === 0) {
|
|
logger.warn(`Cleanup job for flyer ${flyerId} received no paths. Attempting to derive paths from DB.`);
|
|
try {
|
|
const flyer = await db.flyerRepo.getFlyerById(flyerId);
|
|
const derivedPaths: string[] = [];
|
|
// This path needs to be configurable and match where multer saves files.
|
|
const storagePath = process.env.STORAGE_PATH || '/var/www/flyer-crawler.projectium.com/flyer-images';
|
|
|
|
if (flyer.image_url) {
|
|
try {
|
|
const imageName = path.basename(new URL(flyer.image_url).pathname);
|
|
derivedPaths.push(path.join(storagePath, imageName));
|
|
} catch (urlError) {
|
|
logger.error({ err: urlError, url: flyer.image_url }, 'Failed to parse flyer.image_url to derive file path.');
|
|
}
|
|
}
|
|
if (flyer.icon_url) {
|
|
try {
|
|
const iconName = path.basename(new URL(flyer.icon_url).pathname);
|
|
derivedPaths.push(path.join(storagePath, 'icons', iconName));
|
|
} catch (urlError) {
|
|
logger.error({ err: urlError, url: flyer.icon_url }, 'Failed to parse flyer.icon_url to derive file path.');
|
|
}
|
|
}
|
|
pathsToDelete = derivedPaths;
|
|
} catch (error) {
|
|
if (error instanceof NotFoundError) {
|
|
logger.error({ flyerId }, 'Cannot derive cleanup paths because flyer was not found in DB.');
|
|
// Do not throw. Allow the job to be marked as skipped if no paths are found.
|
|
} else {
|
|
throw error; // Re-throw other DB errors to allow for retries.
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!pathsToDelete || pathsToDelete.length === 0) {
|
|
logger.warn('Job received no paths and could not derive any from the database. Skipping.');
|
|
return { status: 'skipped', reason: 'no paths derived' };
|
|
}
|
|
|
|
const results = await Promise.allSettled(
|
|
pathsToDelete.map(async (filePath) => {
|
|
try {
|
|
await this.fs.unlink(filePath);
|
|
logger.info(`Successfully deleted temporary file: ${filePath}`);
|
|
} catch (error) {
|
|
const nodeError = error as NodeJS.ErrnoException;
|
|
if (nodeError.code === 'ENOENT') {
|
|
// This is not a critical error; the file might have been deleted already.
|
|
logger.warn(`File not found during cleanup (already deleted?): ${filePath}`);
|
|
} else {
|
|
logger.error({ err: nodeError, path: filePath }, 'Failed to delete temporary file.');
|
|
throw error; // Re-throw to mark this specific deletion as failed.
|
|
}
|
|
}
|
|
}),
|
|
);
|
|
|
|
const failedDeletions = results.filter((r) => r.status === 'rejected');
|
|
if (failedDeletions.length > 0) {
|
|
const failedPaths = pathsToDelete.filter((_, i) => results[i].status === 'rejected');
|
|
throw new Error(`Failed to delete ${failedDeletions.length} file(s): ${failedPaths.join(', ')}`);
|
|
}
|
|
|
|
logger.info(`Successfully deleted all ${pathsToDelete.length} temporary files.`);
|
|
return { status: 'success', deletedCount: pathsToDelete.length };
|
|
}
|
|
|
|
/**
|
|
* A private helper to normalize errors, update job progress with an error state,
|
|
* and re-throw the error to be handled by BullMQ.
|
|
* @param error The error that was caught.
|
|
* @param job The BullMQ job instance.
|
|
* @param logger The logger instance.
|
|
*/
|
|
private async _reportErrorAndThrow(
|
|
error: unknown,
|
|
job: Job,
|
|
logger: Logger,
|
|
initialStages: ProcessingStage[],
|
|
): Promise<never> {
|
|
// Map specific error codes to their corresponding processing stage names.
|
|
// This is more maintainable than a long if/else if chain.
|
|
const errorCodeToStageMap = new Map<string, string>([
|
|
['PDF_CONVERSION_FAILED', 'Preparing Inputs'],
|
|
['UNSUPPORTED_FILE_TYPE', 'Preparing Inputs'],
|
|
['AI_VALIDATION_FAILED', 'Extracting Data with AI'],
|
|
['TRANSFORMATION_FAILED', 'Transforming AI Data'],
|
|
['DATABASE_ERROR', 'Saving to Database'],
|
|
]);
|
|
const normalizedError = error instanceof Error ? error : new Error(String(error));
|
|
let errorPayload: { errorCode: string; message: string; [key: string]: any };
|
|
let stagesToReport: ProcessingStage[] = [...initialStages]; // Create a mutable copy
|
|
|
|
if (normalizedError instanceof FlyerProcessingError) {
|
|
errorPayload = normalizedError.toErrorPayload();
|
|
} else {
|
|
const message = normalizedError.message || 'An unknown error occurred.';
|
|
errorPayload = { errorCode: 'UNKNOWN_ERROR', message };
|
|
}
|
|
|
|
// Determine which stage failed
|
|
const failedStageName = errorCodeToStageMap.get(errorPayload.errorCode);
|
|
let errorStageIndex = failedStageName ? stagesToReport.findIndex(s => s.name === failedStageName) : -1;
|
|
|
|
// 2. If not mapped, find the currently running stage
|
|
if (errorStageIndex === -1) {
|
|
errorStageIndex = stagesToReport.findIndex(s => s.status === 'in-progress');
|
|
}
|
|
|
|
// 3. Fallback to the last stage
|
|
if (errorStageIndex === -1 && stagesToReport.length > 0) {
|
|
errorStageIndex = stagesToReport.length - 1;
|
|
}
|
|
|
|
// Update stages
|
|
if (errorStageIndex !== -1) {
|
|
stagesToReport[errorStageIndex] = {
|
|
...stagesToReport[errorStageIndex],
|
|
status: 'failed',
|
|
detail: errorPayload.message, // Use the user-friendly message as detail
|
|
};
|
|
// Mark subsequent critical stages as skipped
|
|
for (let i = errorStageIndex + 1; i < stagesToReport.length; i++) {
|
|
if (stagesToReport[i].critical) {
|
|
// When a stage is skipped, we don't need its previous 'detail' property.
|
|
// This creates a clean 'skipped' state object by removing `detail` and keeping the rest.
|
|
const { detail, ...restOfStage } = stagesToReport[i];
|
|
stagesToReport[i] = { ...restOfStage, status: 'skipped' };
|
|
}
|
|
}
|
|
}
|
|
|
|
errorPayload.stages = stagesToReport;
|
|
|
|
// Logging logic
|
|
if (normalizedError instanceof FlyerProcessingError) {
|
|
// Simplify log object creation
|
|
const logDetails: Record<string, any> = { ...errorPayload, err: normalizedError };
|
|
|
|
if (normalizedError instanceof AiDataValidationError) {
|
|
logDetails.validationErrors = normalizedError.validationErrors;
|
|
logDetails.rawData = normalizedError.rawData;
|
|
}
|
|
if (normalizedError instanceof PdfConversionError) {
|
|
logDetails.stderr = normalizedError.stderr;
|
|
}
|
|
|
|
logger.error(logDetails, `A known processing error occurred: ${normalizedError.name}`);
|
|
} else {
|
|
logger.error({ err: normalizedError, ...errorPayload }, `An unknown error occurred: ${errorPayload.message}`);
|
|
}
|
|
|
|
// Check for specific error messages that indicate a non-retriable failure, like quota exhaustion.
|
|
if (errorPayload.message.toLowerCase().includes('quota') || errorPayload.message.toLowerCase().includes('resource_exhausted')) {
|
|
const unrecoverablePayload = { errorCode: 'QUOTA_EXCEEDED', message: 'An AI quota has been exceeded. Please try again later.', stages: errorPayload.stages };
|
|
await job.updateProgress(unrecoverablePayload);
|
|
throw new UnrecoverableError(unrecoverablePayload.message);
|
|
}
|
|
|
|
await job.updateProgress(errorPayload);
|
|
throw normalizedError;
|
|
}
|
|
}
|