Files
flyer-crawler.projectium.com/src/services/flyerProcessingService.server.ts
Torben Sorensen 11aeac5edd
Some checks failed
Deploy to Test Environment / deploy-to-test (push) Failing after 1m10s
whoa - so much - new features (UPC,etc) - Sentry for app logging! so much more !
2026-01-11 19:07:02 -08:00

479 lines
18 KiB
TypeScript

// src/services/flyerProcessingService.server.ts
import { UnrecoverableError, type Job, type Queue } from 'bullmq';
import path from 'path';
import type { Logger } from 'pino';
import type { FlyerFileHandler, IFileSystem } from './flyerFileHandler.server';
import type { FlyerAiProcessor } from './flyerAiProcessor.server';
import * as db from './db/index.db';
import { FlyerDataTransformer } from './flyerDataTransformer';
import type { FlyerJobData, CleanupJobData } from '../types/job-data';
import {
FlyerProcessingError,
PdfConversionError,
AiDataValidationError,
} from './processingErrors';
import { NotFoundError } from './db/errors.db';
import { createScopedLogger } from './logger.server';
import { generateFlyerIcon } from '../utils/imageProcessor';
import type { FlyerPersistenceService } from './flyerPersistenceService.server';
const globalLogger = createScopedLogger('flyer-processing-service');
// Define ProcessingStage locally as it's not exported from the types file.
export type ProcessingStage = {
name: string;
status: 'pending' | 'in-progress' | 'completed' | 'failed' | 'skipped';
critical: boolean;
detail?: string;
};
/**
* This service orchestrates the entire flyer processing workflow. It's responsible for
* coordinating various sub-services (file handling, AI processing, data transformation,
* and database operations) to process a flyer from upload to completion.
*/
export class FlyerProcessingService {
constructor(
private fileHandler: FlyerFileHandler,
private aiProcessor: FlyerAiProcessor,
private fs: IFileSystem,
// By depending on `Pick<Queue, 'add'>`, we specify that this service only needs
// an object with an `add` method that matches the Queue's `add` method signature.
// This decouples the service from the full BullMQ Queue implementation, making it more modular and easier to test.
private cleanupQueue: Pick<Queue<CleanupJobData>, 'add'>,
private transformer: FlyerDataTransformer,
private persistenceService: FlyerPersistenceService,
) {}
/**
* Provides access to the persistence service for testing purposes.
* @internal
*/
_getPersistenceService(): FlyerPersistenceService {
return this.persistenceService;
}
/**
* Provides access to the AI processor for testing purposes.
* @internal
*/
_getAiProcessor(): FlyerAiProcessor {
return this.aiProcessor;
}
/**
* Replaces the cleanup queue for testing purposes.
* This allows tests to prevent file cleanup to verify file contents.
* @internal
*/
_setCleanupQueue(queue: Pick<Queue<CleanupJobData>, 'add'>): void {
console.error(`[DEBUG] FlyerProcessingService._setCleanupQueue called`);
this.cleanupQueue = queue;
}
/**
* Orchestrates the processing of a flyer job.
* @param job The BullMQ job containing flyer data.
* @returns An object containing the ID of the newly created flyer.
*/
async processJob(job: Job<FlyerJobData>): Promise<{ flyerId: number }> {
// Extract context metadata (ADR-051) for request tracing
const { meta, ...jobDataWithoutMeta } = job.data;
// Create a logger instance with job-specific context for better traceability.
// Uses request_id from the original API request if available (ADR-051).
const logger = globalLogger.child({
jobId: job.id,
jobName: job.name,
request_id: meta?.requestId, // Propagate original request ID
user_id: meta?.userId,
origin: meta?.origin || 'unknown',
service: 'flyer-worker',
...jobDataWithoutMeta,
});
logger.info('Picked up flyer processing job.');
const stages: ProcessingStage[] = [
{
name: 'Preparing Inputs',
status: 'pending',
critical: true,
detail: 'Validating and preparing file...',
},
{
name: 'Image Optimization',
status: 'pending',
critical: true,
detail: 'Compressing and resizing images...',
},
{
name: 'Extracting Data with AI',
status: 'pending',
critical: true,
detail: 'Communicating with AI model...',
},
{ name: 'Transforming AI Data', status: 'pending', critical: true },
{ name: 'Saving to Database', status: 'pending', critical: true },
];
// Keep track of all created file paths for eventual cleanup.
const allFilePaths: string[] = [job.data.filePath];
try {
// Stage 1: Prepare Inputs (e.g., convert PDF to images)
stages[0].status = 'in-progress';
await job.updateProgress({ stages });
console.error(
`[WORKER DEBUG] ProcessingService: Calling fileHandler.prepareImageInputs for ${job.data.filePath}`,
);
const { imagePaths, createdImagePaths } = await this.fileHandler.prepareImageInputs(
job.data.filePath,
job,
logger,
);
allFilePaths.push(...createdImagePaths);
console.error(
`[WORKER DEBUG] ProcessingService: fileHandler returned ${imagePaths.length} images.`,
);
stages[0].status = 'completed';
stages[0].detail = `${imagePaths.length} page(s) ready for AI.`;
await job.updateProgress({ stages });
// Stage 2: Image Optimization
stages[1].status = 'in-progress';
await job.updateProgress({ stages });
await this.fileHandler.optimizeImages(imagePaths, logger);
stages[1].status = 'completed';
await job.updateProgress({ stages });
// Stage 3: Extract Data with AI
stages[2].status = 'in-progress';
await job.updateProgress({ stages });
console.error(`[WORKER DEBUG] ProcessingService: Calling aiProcessor.extractAndValidateData`);
const aiResult = await this.aiProcessor.extractAndValidateData(imagePaths, job.data, logger);
console.error(
`[WORKER DEBUG] ProcessingService: aiProcessor returned data for store: ${aiResult.data.store_name}`,
);
stages[2].status = 'completed';
await job.updateProgress({ stages });
// Stage 4: Transform AI Data into DB format
stages[3].status = 'in-progress';
await job.updateProgress({ stages });
// The fileHandler has already prepared the primary image (e.g., by stripping EXIF data).
// We now generate an icon from it and prepare the filenames for the transformer.
const primaryImagePath = imagePaths[0].path;
const imageFileName = path.basename(primaryImagePath);
const iconsDir = path.join(path.dirname(primaryImagePath), 'icons');
console.error(
`[WORKER DEBUG] ProcessingService: Generating icon from ${primaryImagePath} to ${iconsDir}`,
);
const iconFileName = await generateFlyerIcon(primaryImagePath, iconsDir, logger);
console.error(`[WORKER DEBUG] ProcessingService: Icon generated: ${iconFileName}`);
// Add the newly generated icon to the list of files to be cleaned up.
// The main processed image path is already in `allFilePaths` via `createdImagePaths`.
allFilePaths.push(path.join(iconsDir, iconFileName));
// Ensure we have a valid base URL, preferring the one from the job data.
// This is critical for workers where process.env.FRONTEND_URL might be undefined.
const baseUrl = job.data.baseUrl || process.env.FRONTEND_URL || 'http://localhost:3000';
console.error(
`[DEBUG] FlyerProcessingService resolved baseUrl: "${baseUrl}" (job.data.baseUrl: "${job.data.baseUrl}", env.FRONTEND_URL: "${process.env.FRONTEND_URL}")`,
);
console.error('[DEBUG] FlyerProcessingService calling transformer with:', {
originalFileName: job.data.originalFileName,
imageFileName,
iconFileName,
checksum: job.data.checksum,
baseUrl,
});
const { flyerData, itemsForDb } = await this.transformer.transform(
aiResult,
job.data.originalFileName,
imageFileName,
iconFileName,
job.data.checksum,
job.data.userId,
logger,
baseUrl,
);
console.error('[DEBUG] FlyerProcessingService transformer output URLs:', {
imageUrl: flyerData.image_url,
iconUrl: flyerData.icon_url,
});
console.error('[DEBUG] Full Flyer Data to be saved:', JSON.stringify(flyerData, null, 2));
stages[3].status = 'completed';
await job.updateProgress({ stages });
// Stage 5: Save to Database
stages[4].status = 'in-progress';
await job.updateProgress({ stages });
let flyerId: number;
try {
const flyer = await this.persistenceService.saveFlyer(
flyerData,
itemsForDb,
job.data.userId,
logger,
);
flyerId = flyer.flyer_id;
} catch (error) {
// Errors are already normalized by the persistence service or are critical.
// We re-throw to trigger the catch block below which handles reporting.
throw error;
}
stages[4].status = 'completed';
await job.updateProgress({ stages });
// Enqueue a job to clean up the original and any generated files.
await this.cleanupQueue.add(
'cleanup-flyer-files',
{ flyerId, paths: allFilePaths },
{ removeOnComplete: true },
);
logger.info(`Successfully processed job and enqueued cleanup for flyer ID: ${flyerId}`);
return { flyerId };
} catch (error) {
logger.warn(
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
);
// Add detailed logging of the raw error object
if (error instanceof Error) {
logger.error(
{ err: error, stack: error.stack },
'Raw error object in processJob catch block',
);
} else {
logger.error({ error }, 'Raw non-Error object in processJob catch block');
}
// This private method handles error reporting and re-throwing.
await this._reportErrorAndThrow(error, job, logger, stages);
// This line is technically unreachable because the above method always throws,
// but it's required to satisfy TypeScript's control flow analysis.
throw error;
}
}
/**
* Processes a job to clean up temporary files associated with a flyer.
* @param job The BullMQ job containing cleanup data.
* @returns An object indicating the status of the cleanup operation.
*/
async processCleanupJob(
job: Job<CleanupJobData>,
): Promise<{ status: string; deletedCount?: number; reason?: string }> {
const logger = globalLogger.child({ jobId: job.id, jobName: job.name, ...job.data });
logger.info('Picked up file cleanup job.');
const { flyerId, paths } = job.data;
let pathsToDelete = paths;
// If no paths are provided (e.g., from a manual trigger), attempt to derive them from the database.
if (!pathsToDelete || pathsToDelete.length === 0) {
logger.warn(
`Cleanup job for flyer ${flyerId} received no paths. Attempting to derive paths from DB.`,
);
try {
const flyer = await db.flyerRepo.getFlyerById(flyerId);
const derivedPaths: string[] = [];
// This path needs to be configurable and match where multer saves files.
const storagePath =
process.env.STORAGE_PATH || '/var/www/flyer-crawler.projectium.com/flyer-images';
if (flyer.image_url) {
try {
const imageName = path.basename(new URL(flyer.image_url).pathname);
derivedPaths.push(path.join(storagePath, imageName));
} catch (urlError) {
logger.error(
{ err: urlError, url: flyer.image_url },
'Failed to parse flyer.image_url to derive file path.',
);
}
}
if (flyer.icon_url) {
try {
const iconName = path.basename(new URL(flyer.icon_url).pathname);
derivedPaths.push(path.join(storagePath, 'icons', iconName));
} catch (urlError) {
logger.error(
{ err: urlError, url: flyer.icon_url },
'Failed to parse flyer.icon_url to derive file path.',
);
}
}
pathsToDelete = derivedPaths;
} catch (error) {
if (error instanceof NotFoundError) {
logger.error(
{ flyerId },
'Cannot derive cleanup paths because flyer was not found in DB.',
);
// Do not throw. Allow the job to be marked as skipped if no paths are found.
} else {
throw error; // Re-throw other DB errors to allow for retries.
}
}
}
if (!pathsToDelete || pathsToDelete.length === 0) {
logger.warn('Job received no paths and could not derive any from the database. Skipping.');
return { status: 'skipped', reason: 'no paths derived' };
}
const results = await Promise.allSettled(
pathsToDelete.map(async (filePath) => {
try {
await this.fs.unlink(filePath);
logger.info(`Successfully deleted temporary file: ${filePath}`);
} catch (error) {
const nodeError = error as NodeJS.ErrnoException;
if (nodeError.code === 'ENOENT') {
// This is not a critical error; the file might have been deleted already.
logger.warn(`File not found during cleanup (already deleted?): ${filePath}`);
} else {
logger.error({ err: nodeError, path: filePath }, 'Failed to delete temporary file.');
throw error; // Re-throw to mark this specific deletion as failed.
}
}
}),
);
const failedDeletions = results.filter((r) => r.status === 'rejected');
if (failedDeletions.length > 0) {
const failedPaths = pathsToDelete.filter((_, i) => results[i].status === 'rejected');
throw new Error(
`Failed to delete ${failedDeletions.length} file(s): ${failedPaths.join(', ')}`,
);
}
logger.info(`Successfully deleted all ${pathsToDelete.length} temporary files.`);
return { status: 'success', deletedCount: pathsToDelete.length };
}
/**
* A private helper to normalize errors, update job progress with an error state,
* and re-throw the error to be handled by BullMQ.
* @param error The error that was caught.
* @param job The BullMQ job instance.
* @param logger The logger instance.
*/
private async _reportErrorAndThrow(
error: unknown,
job: Job,
logger: Logger,
initialStages: ProcessingStage[],
): Promise<never> {
// Map specific error codes to their corresponding processing stage names.
// This is more maintainable than a long if/else if chain.
const errorCodeToStageMap = new Map<string, string>([
['PDF_CONVERSION_FAILED', 'Preparing Inputs'],
['UNSUPPORTED_FILE_TYPE', 'Preparing Inputs'],
['IMAGE_CONVERSION_FAILED', 'Image Optimization'],
['AI_VALIDATION_FAILED', 'Extracting Data with AI'],
['TRANSFORMATION_FAILED', 'Transforming AI Data'],
['DATABASE_ERROR', 'Saving to Database'],
]);
const normalizedError = error instanceof Error ? error : new Error(String(error));
let errorPayload: {
errorCode: string;
message: string;
stages?: ProcessingStage[];
[key: string]: unknown;
};
const stagesToReport: ProcessingStage[] = [...initialStages]; // Create a mutable copy
if (normalizedError instanceof FlyerProcessingError) {
errorPayload = normalizedError.toErrorPayload();
} else {
const message = normalizedError.message || 'An unknown error occurred.';
errorPayload = { errorCode: 'UNKNOWN_ERROR', message };
}
// Determine which stage failed
const failedStageName = errorCodeToStageMap.get(errorPayload.errorCode);
let errorStageIndex = failedStageName
? stagesToReport.findIndex((s) => s.name === failedStageName)
: -1;
// 2. If not mapped, find the currently running stage
if (errorStageIndex === -1) {
errorStageIndex = stagesToReport.findIndex((s) => s.status === 'in-progress');
}
// 3. Fallback to the last stage
if (errorStageIndex === -1 && stagesToReport.length > 0) {
errorStageIndex = stagesToReport.length - 1;
}
// Update stages
if (errorStageIndex !== -1) {
stagesToReport[errorStageIndex] = {
...stagesToReport[errorStageIndex],
status: 'failed',
detail: errorPayload.message, // Use the user-friendly message as detail
};
// Mark subsequent critical stages as skipped
for (let i = errorStageIndex + 1; i < stagesToReport.length; i++) {
if (stagesToReport[i].critical) {
// When a stage is skipped, we create a clean 'skipped' state object without the 'detail' property.
stagesToReport[i] = {
name: stagesToReport[i].name,
status: 'skipped',
critical: stagesToReport[i].critical,
};
}
}
}
errorPayload.stages = stagesToReport;
// Logging logic
if (normalizedError instanceof FlyerProcessingError) {
// Simplify log object creation
const logDetails: Record<string, unknown> = { ...errorPayload, err: normalizedError };
if (normalizedError instanceof AiDataValidationError) {
logDetails.validationErrors = normalizedError.validationErrors;
logDetails.rawData = normalizedError.rawData;
}
if (normalizedError instanceof PdfConversionError) {
logDetails.stderr = normalizedError.stderr;
}
logger.error(logDetails, `A known processing error occurred: ${normalizedError.name}`);
} else {
logger.error(
{ err: normalizedError, ...errorPayload },
`An unknown error occurred: ${errorPayload.message}`,
);
}
// Check for specific error messages that indicate a non-retriable failure, like quota exhaustion.
if (
errorPayload.message.toLowerCase().includes('quota') ||
errorPayload.message.toLowerCase().includes('resource_exhausted')
) {
const unrecoverablePayload = {
errorCode: 'QUOTA_EXCEEDED',
message: 'An AI quota has been exceeded. Please try again later.',
stages: errorPayload.stages,
};
await job.updateProgress(unrecoverablePayload);
throw new UnrecoverableError(unrecoverablePayload.message);
}
await job.updateProgress(errorPayload);
throw normalizedError;
}
}