Compare commits

...

2 Commits

Author SHA1 Message Date
Gitea Actions
e0b13f26fb ci: Bump version to 0.9.53 [skip ci] 2026-01-07 09:57:37 +05:00
eee7f36756 switch to instantiating the pm2 worker in the testing threads
All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 30m53s
2026-01-06 20:56:39 -08:00
14 changed files with 418 additions and 105 deletions

4
package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "flyer-crawler",
"version": "0.9.52",
"version": "0.9.53",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "flyer-crawler",
"version": "0.9.52",
"version": "0.9.53",
"dependencies": {
"@bull-board/api": "^6.14.2",
"@bull-board/express": "^6.14.2",

View File

@@ -1,7 +1,7 @@
{
"name": "flyer-crawler",
"private": true,
"version": "0.9.52",
"version": "0.9.53",
"type": "module",
"scripts": {
"dev": "concurrently \"npm:start:dev\" \"vite\"",

View File

@@ -544,19 +544,6 @@ export class AIService {
`[extractCoreDataFromFlyerImage] Entering method with ${imagePaths.length} image(s).`,
);
// [TEST HOOK] Simulate an AI failure if the filename contains specific text.
// This allows integration tests to verify error handling.
if (imagePaths.some((f) => f.path.includes('ai-fail-test'))) {
logger.warn('[TEST HOOK] Simulating AI failure for test file.');
throw new Error('AI model failed to extract data.');
}
// [TEST HOOK] Simulate a specific failure for the cleanup test
if (imagePaths.some((f) => f.path.includes('cleanup-fail-test'))) {
logger.warn('[TEST HOOK] Simulating AI failure for cleanup test.');
throw new Error('Simulated AI failure for cleanup test.');
}
const prompt = this._buildFlyerExtractionPrompt(masterItems, submitterIp, userProfileAddress);
const imageParts = await Promise.all(

View File

@@ -64,13 +64,6 @@ export class FlyerRepository {
*/
async insertFlyer(flyerData: FlyerDbInsert, logger: Logger): Promise<Flyer> {
console.error('[DEBUG] FlyerRepository.insertFlyer called with:', JSON.stringify(flyerData, null, 2));
// [TEST HOOK] Simulate a database failure if the filename contains specific text.
// This allows integration tests to verify error handling without mocking the entire DB connection.
if (flyerData.file_name.includes('db-fail-test')) {
logger.warn('[TEST HOOK] Simulating DB transaction failure for test file.');
throw new Error('DB transaction failed for test.');
}
try {
// Sanitize icon_url: Ensure empty strings become NULL to avoid regex constraint violations
const iconUrl = flyerData.icon_url && flyerData.icon_url.trim() !== '' ? flyerData.icon_url : null;

View File

@@ -55,6 +55,7 @@ describe('FlyerFileHandler', () => {
mockFs = {
readdir: vi.fn().mockResolvedValue([]),
unlink: vi.fn(),
rename: vi.fn(),
};
mockExec = vi.fn().mockResolvedValue({ stdout: 'success', stderr: '' });
@@ -182,4 +183,20 @@ describe('FlyerFileHandler', () => {
await expect(service.prepareImageInputs('/tmp/flyer.png', job, logger)).rejects.toThrow(ImageConversionError);
});
});
describe('optimizeImages', () => {
it('should optimize images and rename them', async () => {
const imagePaths = [{ path: '/tmp/image1.jpg', mimetype: 'image/jpeg' }];
const mockSharpInstance = sharp('/tmp/image1.jpg');
vi.mocked(mockSharpInstance.toFile).mockResolvedValue({} as any);
await service.optimizeImages(imagePaths, logger);
expect(sharp).toHaveBeenCalledWith('/tmp/image1.jpg');
expect(mockSharpInstance.resize).toHaveBeenCalledWith({ width: 2000, withoutEnlargement: true });
expect(mockSharpInstance.jpeg).toHaveBeenCalledWith({ quality: 80, mozjpeg: true });
expect(mockSharpInstance.toFile).toHaveBeenCalledWith('/tmp/image1.jpg.tmp');
expect(mockFs.rename).toHaveBeenCalledWith('/tmp/image1.jpg.tmp', '/tmp/image1.jpg');
});
});
});

View File

@@ -14,6 +14,7 @@ const CONVERTIBLE_IMAGE_EXTENSIONS = ['.gif', '.tiff', '.svg', '.bmp'];
export interface IFileSystem {
readdir(path: string, options: { withFileTypes: true }): Promise<Dirent[]>;
unlink(path: string): Promise<void>;
rename(oldPath: string, newPath: string): Promise<void>;
}
export interface ICommandExecutor {
@@ -269,4 +270,33 @@ export class FlyerFileHandler {
return this._handleUnsupportedInput(fileExt, job.data.originalFileName, logger);
}
/**
* Optimizes images for web delivery (compression, resizing).
* This is a distinct processing stage.
*/
public async optimizeImages(
imagePaths: { path: string; mimetype: string }[],
logger: Logger,
): Promise<void> {
logger.info(`Starting image optimization for ${imagePaths.length} images.`);
for (const image of imagePaths) {
const tempPath = `${image.path}.tmp`;
try {
// Optimize: Resize to max width 2000px (preserving aspect ratio) and compress
await sharp(image.path)
.resize({ width: 2000, withoutEnlargement: true })
.jpeg({ quality: 80, mozjpeg: true }) // Use mozjpeg for better compression
.toFile(tempPath);
// Replace the original file with the optimized version
await this.fs.rename(tempPath, image.path);
} catch (error) {
logger.error({ err: error, path: image.path }, 'Failed to optimize image.');
throw new ImageConversionError(`Image optimization failed for ${path.basename(image.path)}.`);
}
}
logger.info('Image optimization complete.');
}
}

View File

@@ -0,0 +1,142 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { FlyerPersistenceService } from './flyerPersistenceService.server';
import { withTransaction } from './db/connection.db';
import { createFlyerAndItems } from './db/flyer.db';
import { AdminRepository } from './db/admin.db';
import type { FlyerInsert, FlyerItemInsert, Flyer } from '../types';
import type { Logger } from 'pino';
import type { PoolClient } from 'pg';
// Mock dependencies
vi.mock('./db/connection.db', () => ({
withTransaction: vi.fn(),
}));
vi.mock('./db/flyer.db', () => ({
createFlyerAndItems: vi.fn(),
}));
vi.mock('./db/admin.db', () => ({
AdminRepository: vi.fn(),
}));
describe('FlyerPersistenceService', () => {
let service: FlyerPersistenceService;
let mockLogger: Logger;
let mockClient: PoolClient;
beforeEach(() => {
vi.clearAllMocks();
service = new FlyerPersistenceService();
mockLogger = {
info: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger;
mockClient = { query: vi.fn() } as unknown as PoolClient;
// Mock withTransaction to execute the callback immediately with a mock client
vi.mocked(withTransaction).mockImplementation(async (callback) => {
return callback(mockClient);
});
});
describe('saveFlyer', () => {
const mockFlyerData = {
file_name: 'test.jpg',
store_name: 'Test Store',
image_url: 'http://example.com/image.jpg',
icon_url: 'http://example.com/icon.jpg',
checksum: 'abc',
status: 'processed',
item_count: 0,
} as FlyerInsert;
const mockItemsForDb: FlyerItemInsert[] = [];
const mockCreatedFlyer = {
flyer_id: 1,
file_name: 'test.jpg',
store_id: 10,
// ... other fields
} as Flyer;
const mockCreatedItems: any[] = [];
it('should save flyer and items, and log activity if userId is provided', async () => {
const userId = 'user-123';
vi.mocked(createFlyerAndItems).mockResolvedValue({
flyer: mockCreatedFlyer,
items: mockCreatedItems,
});
const mockLogActivity = vi.fn();
// Mock the AdminRepository constructor to return an object with logActivity
vi.mocked(AdminRepository).mockImplementation(() => ({
logActivity: mockLogActivity,
} as any));
const result = await service.saveFlyer(mockFlyerData, mockItemsForDb, userId, mockLogger);
expect(withTransaction).toHaveBeenCalled();
expect(createFlyerAndItems).toHaveBeenCalledWith(
mockFlyerData,
mockItemsForDb,
mockLogger,
mockClient
);
expect(mockLogger.info).toHaveBeenCalledWith(
expect.stringContaining('Successfully processed flyer')
);
// Verify AdminRepository usage
expect(AdminRepository).toHaveBeenCalledWith(mockClient);
expect(mockLogActivity).toHaveBeenCalledWith(
expect.objectContaining({
userId,
action: 'flyer_processed',
displayText: `Processed a new flyer for ${mockFlyerData.store_name}.`,
details: { flyerId: mockCreatedFlyer.flyer_id, storeName: mockFlyerData.store_name },
}),
mockLogger
);
expect(result).toEqual(mockCreatedFlyer);
});
it('should save flyer and items, but NOT log activity if userId is undefined', async () => {
const userId = undefined;
vi.mocked(createFlyerAndItems).mockResolvedValue({
flyer: mockCreatedFlyer,
items: mockCreatedItems,
});
const mockLogActivity = vi.fn();
vi.mocked(AdminRepository).mockImplementation(() => ({
logActivity: mockLogActivity,
} as any));
const result = await service.saveFlyer(mockFlyerData, mockItemsForDb, userId, mockLogger);
expect(createFlyerAndItems).toHaveBeenCalled();
expect(AdminRepository).not.toHaveBeenCalled();
expect(mockLogActivity).not.toHaveBeenCalled();
expect(result).toEqual(mockCreatedFlyer);
});
it('should propagate errors from createFlyerAndItems', async () => {
const error = new Error('DB Error');
vi.mocked(createFlyerAndItems).mockRejectedValue(error);
await expect(
service.saveFlyer(mockFlyerData, mockItemsForDb, 'user-1', mockLogger)
).rejects.toThrow(error);
});
});
});

View File

@@ -0,0 +1,42 @@
// src/services/flyerPersistenceService.server.ts
import type { Logger } from 'pino';
import { withTransaction } from './db/connection.db';
import { createFlyerAndItems } from './db/flyer.db';
import { AdminRepository } from './db/admin.db';
import type { FlyerInsert, FlyerItemInsert, Flyer } from '../types';
export class FlyerPersistenceService {
/**
* Saves the flyer and its items to the database within a transaction.
* Also logs the activity.
*/
async saveFlyer(
flyerData: FlyerInsert,
itemsForDb: FlyerItemInsert[],
userId: string | undefined,
logger: Logger,
): Promise<Flyer> {
return withTransaction(async (client) => {
const { flyer, items } = await createFlyerAndItems(flyerData, itemsForDb, logger, client);
logger.info(
`Successfully processed flyer: ${flyer.file_name} (ID: ${flyer.flyer_id}) with ${items.length} items.`,
);
// Log activity if a user uploaded it
if (userId) {
const transactionalAdminRepo = new AdminRepository(client);
await transactionalAdminRepo.logActivity(
{
userId: userId,
action: 'flyer_processed',
displayText: `Processed a new flyer for ${flyerData.store_name}.`,
details: { flyerId: flyer.flyer_id, storeName: flyerData.store_name },
},
logger,
);
}
return flyer;
});
}
}

View File

@@ -8,6 +8,7 @@ import type { CleanupJobData, FlyerJobData } from '../types/job-data';
// 1. Create hoisted mocks FIRST
const mocks = vi.hoisted(() => ({
unlink: vi.fn(),
rename: vi.fn(),
readdir: vi.fn(),
execAsync: vi.fn(),
mockAdminLogActivity: vi.fn(),
@@ -22,13 +23,13 @@ vi.mock('node:fs/promises', async (importOriginal) => {
default: actual, // Ensure default export exists
unlink: mocks.unlink,
readdir: mocks.readdir,
rename: mocks.rename,
};
});
// Import service and dependencies (FlyerJobData already imported from types above)
import { FlyerProcessingService } from './flyerProcessingService.server';
import * as db from './db/index.db';
import { createFlyerAndItems } from './db/flyer.db';
import { createMockFlyer } from '../tests/utils/mockFactories';
import { FlyerDataTransformer } from './flyerDataTransformer';
import {
@@ -44,6 +45,7 @@ import { FlyerAiProcessor } from './flyerAiProcessor.server';
import type { IFileSystem, ICommandExecutor } from './flyerFileHandler.server';
import { generateFlyerIcon } from '../utils/imageProcessor';
import type { AIService } from './aiService.server';
import { FlyerPersistenceService } from './flyerPersistenceService.server';
// Mock image processor functions
vi.mock('../utils/imageProcessor', () => ({
@@ -56,9 +58,6 @@ vi.mock('./aiService.server', () => ({
extractCoreDataFromFlyerImage: vi.fn(),
},
}));
vi.mock('./db/flyer.db', () => ({
createFlyerAndItems: vi.fn(),
}));
vi.mock('./db/index.db', () => ({
personalizationRepo: { getAllMasterItems: vi.fn() },
adminRepo: { logActivity: vi.fn() },
@@ -81,6 +80,7 @@ vi.mock('./logger.server', () => ({
}));
vi.mock('./flyerFileHandler.server');
vi.mock('./flyerAiProcessor.server');
vi.mock('./flyerPersistenceService.server');
const mockedDb = db as Mocked<typeof db>;
@@ -88,6 +88,7 @@ describe('FlyerProcessingService', () => {
let service: FlyerProcessingService;
let mockFileHandler: Mocked<FlyerFileHandler>;
let mockAiProcessor: Mocked<FlyerAiProcessor>;
let mockPersistenceService: Mocked<FlyerPersistenceService>;
const mockCleanupQueue = {
add: vi.fn(),
};
@@ -123,6 +124,7 @@ describe('FlyerProcessingService', () => {
const mockFs: IFileSystem = {
readdir: mocks.readdir,
unlink: mocks.unlink,
rename: mocks.rename,
};
mockFileHandler = new FlyerFileHandler(mockFs, vi.fn()) as Mocked<FlyerFileHandler>;
@@ -130,6 +132,7 @@ describe('FlyerProcessingService', () => {
{} as AIService,
mockedDb.personalizationRepo,
) as Mocked<FlyerAiProcessor>;
mockPersistenceService = new FlyerPersistenceService() as Mocked<FlyerPersistenceService>;
// Instantiate the service with all its dependencies mocked
service = new FlyerProcessingService(
@@ -138,6 +141,7 @@ describe('FlyerProcessingService', () => {
mockFs,
mockCleanupQueue,
new FlyerDataTransformer(),
mockPersistenceService,
);
// Provide default successful mock implementations for dependencies
@@ -165,15 +169,12 @@ describe('FlyerProcessingService', () => {
createdImagePaths: [],
});
vi.mocked(createFlyerAndItems).mockResolvedValue({
flyer: createMockFlyer({
flyer_id: 1,
file_name: 'test.jpg',
image_url: 'https://example.com/test.jpg',
item_count: 1,
}),
items: [],
});
mockPersistenceService.saveFlyer.mockResolvedValue(createMockFlyer({
flyer_id: 1,
file_name: 'test.jpg',
image_url: 'https://example.com/test.jpg',
item_count: 1,
}));
vi.mocked(mockedDb.adminRepo.logActivity).mockResolvedValue();
// FIX: Provide a default mock for getAllMasterItems to prevent a TypeError on `.length`.
vi.mocked(mockedDb.personalizationRepo.getAllMasterItems).mockResolvedValue([]);
@@ -226,13 +227,16 @@ describe('FlyerProcessingService', () => {
// 1. File handler was called
expect(mockFileHandler.prepareImageInputs).toHaveBeenCalledWith(job.data.filePath, job, expect.any(Object));
// 2. AI processor was called
// 2. Optimization was called
expect(mockFileHandler.optimizeImages).toHaveBeenCalledWith(expect.any(Array), expect.any(Object));
// 3. AI processor was called
expect(mockAiProcessor.extractAndValidateData).toHaveBeenCalledTimes(1);
// 3. Icon was generated from the processed image
// 4. Icon was generated from the processed image
expect(generateFlyerIcon).toHaveBeenCalledWith('/tmp/flyer-processed.jpeg', '/tmp/icons', expect.any(Object));
// 4. Transformer was called with the correct filenames
// 5. Transformer was called with the correct filenames
expect(FlyerDataTransformer.prototype.transform).toHaveBeenCalledWith(
expect.any(Object), // aiResult
'flyer.jpg', // originalFileName
@@ -244,12 +248,15 @@ describe('FlyerProcessingService', () => {
'https://example.com', // baseUrl
);
// 5. DB transaction was initiated
expect(mockedDb.withTransaction).toHaveBeenCalledTimes(1);
expect(createFlyerAndItems).toHaveBeenCalledTimes(1);
expect(mocks.mockAdminLogActivity).toHaveBeenCalledTimes(1);
// 6. Persistence service was called
expect(mockPersistenceService.saveFlyer).toHaveBeenCalledWith(
expect.any(Object), // flyerData
[], // itemsForDb
undefined, // userId
expect.any(Object), // logger
);
// 6. Cleanup job was enqueued with all generated files
// 7. Cleanup job was enqueued with all generated files
expect(mockCleanupQueue.add).toHaveBeenCalledWith(
'cleanup-flyer-files',
{
@@ -281,10 +288,8 @@ describe('FlyerProcessingService', () => {
await service.processJob(job);
// Verify transaction and inner calls
expect(mockedDb.withTransaction).toHaveBeenCalledTimes(1);
expect(mockFileHandler.prepareImageInputs).toHaveBeenCalledWith('/tmp/flyer.pdf', job, expect.any(Object));
expect(mockAiProcessor.extractAndValidateData).toHaveBeenCalledTimes(1);
expect(createFlyerAndItems).toHaveBeenCalledTimes(1);
// Verify icon generation was called for the first page
expect(generateFlyerIcon).toHaveBeenCalledWith('/tmp/flyer-1.jpg', '/tmp/icons', expect.any(Object));
// Verify cleanup job includes original PDF and all generated/processed images
@@ -316,6 +321,7 @@ describe('FlyerProcessingService', () => {
message: 'AI model exploded',
stages: [
{ name: 'Preparing Inputs', status: 'completed', critical: true, detail: '1 page(s) ready for AI.' },
{ name: 'Image Optimization', status: 'completed', critical: true },
{ name: 'Extracting Data with AI', status: 'failed', critical: true, detail: 'AI model exploded' },
{ name: 'Transforming AI Data', status: 'skipped', critical: true },
{ name: 'Saving to Database', status: 'skipped', critical: true },
@@ -341,6 +347,7 @@ describe('FlyerProcessingService', () => {
message: 'An AI quota has been exceeded. Please try again later.',
stages: [
{ name: 'Preparing Inputs', status: 'completed', critical: true, detail: '1 page(s) ready for AI.' },
{ name: 'Image Optimization', status: 'completed', critical: true },
{ name: 'Extracting Data with AI', status: 'failed', critical: true, detail: 'AI model quota exceeded' },
{ name: 'Transforming AI Data', status: 'skipped', critical: true },
{ name: 'Saving to Database', status: 'skipped', critical: true },
@@ -368,6 +375,7 @@ describe('FlyerProcessingService', () => {
stderr: 'pdftocairo error',
stages: [
{ name: 'Preparing Inputs', status: 'failed', critical: true, detail: 'The uploaded PDF could not be processed. It might be blank, corrupt, or password-protected.' },
{ name: 'Image Optimization', status: 'skipped', critical: true },
{ name: 'Extracting Data with AI', status: 'skipped', critical: true },
{ name: 'Transforming AI Data', status: 'skipped', critical: true },
{ name: 'Saving to Database', status: 'skipped', critical: true },
@@ -409,6 +417,7 @@ describe('FlyerProcessingService', () => {
rawData: {},
stages: [
{ name: 'Preparing Inputs', status: 'completed', critical: true, detail: '1 page(s) ready for AI.' },
{ name: 'Image Optimization', status: 'completed', critical: true },
{ name: 'Extracting Data with AI', status: 'failed', critical: true, detail: "The AI couldn't read the flyer's format. Please try a clearer image or a different flyer." },
{ name: 'Transforming AI Data', status: 'skipped', critical: true },
{ name: 'Saving to Database', status: 'skipped', critical: true },
@@ -434,7 +443,6 @@ describe('FlyerProcessingService', () => {
await service.processJob(job);
// Verify transaction and inner calls
expect(mockedDb.withTransaction).toHaveBeenCalledTimes(1);
expect(mockFileHandler.prepareImageInputs).toHaveBeenCalledWith('/tmp/flyer.gif', job, expect.any(Object));
expect(mockAiProcessor.extractAndValidateData).toHaveBeenCalledTimes(1);
// Verify icon generation was called for the converted image
@@ -458,9 +466,7 @@ describe('FlyerProcessingService', () => {
const { logger } = await import('./logger.server');
const dbError = new Error('Database transaction failed');
// To test the DB failure, we make the transaction itself fail when called.
// This is more realistic than mocking the inner function `createFlyerAndItems`.
vi.mocked(mockedDb.withTransaction).mockRejectedValue(dbError);
mockPersistenceService.saveFlyer.mockRejectedValue(new DatabaseError('Database transaction failed'));
// The service wraps the generic DB error in a DatabaseError.
await expect(service.processJob(job)).rejects.toThrow(DatabaseError);
@@ -471,6 +477,7 @@ describe('FlyerProcessingService', () => {
message: 'A database operation failed. Please try again later.',
stages: [
{ name: 'Preparing Inputs', status: 'completed', critical: true, detail: '1 page(s) ready for AI.' },
{ name: 'Image Optimization', status: 'completed', critical: true },
{ name: 'Extracting Data with AI', status: 'completed', critical: true, detail: 'Communicating with AI model...' },
{ name: 'Transforming AI Data', status: 'completed', critical: true },
{ name: 'Saving to Database', status: 'failed', critical: true, detail: 'A database operation failed. Please try again later.' },

View File

@@ -5,7 +5,6 @@ import type { Logger } from 'pino';
import type { FlyerFileHandler, IFileSystem, ICommandExecutor } from './flyerFileHandler.server';
import type { FlyerAiProcessor } from './flyerAiProcessor.server';
import * as db from './db/index.db';
import { AdminRepository } from './db/admin.db';
import { FlyerDataTransformer } from './flyerDataTransformer';
import type { FlyerJobData, CleanupJobData } from '../types/job-data';
import {
@@ -13,12 +12,11 @@ import {
PdfConversionError,
AiDataValidationError,
UnsupportedFileTypeError,
DatabaseError, // This is from processingErrors
} from './processingErrors';
import { NotFoundError } from './db/errors.db';
import { createFlyerAndItems } from './db/flyer.db';
import { logger as globalLogger } from './logger.server'; // This was a duplicate, fixed.
import { generateFlyerIcon } from '../utils/imageProcessor';
import type { FlyerPersistenceService } from './flyerPersistenceService.server';
// Define ProcessingStage locally as it's not exported from the types file.
export type ProcessingStage = {
@@ -43,6 +41,7 @@ export class FlyerProcessingService {
// This decouples the service from the full BullMQ Queue implementation, making it more modular and easier to test.
private cleanupQueue: Pick<Queue<CleanupJobData>, 'add'>,
private transformer: FlyerDataTransformer,
private persistenceService: FlyerPersistenceService,
) {}
/**
@@ -57,6 +56,7 @@ export class FlyerProcessingService {
const stages: ProcessingStage[] = [
{ name: 'Preparing Inputs', status: 'pending', critical: true, detail: 'Validating and preparing file...' },
{ name: 'Image Optimization', status: 'pending', critical: true, detail: 'Compressing and resizing images...' },
{ name: 'Extracting Data with AI', status: 'pending', critical: true, detail: 'Communicating with AI model...' },
{ name: 'Transforming AI Data', status: 'pending', critical: true },
{ name: 'Saving to Database', status: 'pending', critical: true },
@@ -82,18 +82,26 @@ export class FlyerProcessingService {
stages[0].detail = `${imagePaths.length} page(s) ready for AI.`;
await job.updateProgress({ stages });
// Stage 2: Extract Data with AI
// Stage 2: Image Optimization
stages[1].status = 'in-progress';
await job.updateProgress({ stages });
await this.fileHandler.optimizeImages(imagePaths, logger);
stages[1].status = 'completed';
await job.updateProgress({ stages });
// Stage 3: Extract Data with AI
stages[2].status = 'in-progress';
await job.updateProgress({ stages });
console.error(`[WORKER DEBUG] ProcessingService: Calling aiProcessor.extractAndValidateData`);
const aiResult = await this.aiProcessor.extractAndValidateData(imagePaths, job.data, logger);
console.error(`[WORKER DEBUG] ProcessingService: aiProcessor returned data for store: ${aiResult.data.store_name}`);
stages[1].status = 'completed';
stages[2].status = 'completed';
await job.updateProgress({ stages });
// Stage 3: Transform AI Data into DB format
stages[2].status = 'in-progress';
// Stage 4: Transform AI Data into DB format
stages[3].status = 'in-progress';
await job.updateProgress({ stages });
// The fileHandler has already prepared the primary image (e.g., by stripping EXIF data).
@@ -127,47 +135,29 @@ export class FlyerProcessingService {
);
console.error('[DEBUG] FlyerProcessingService transformer output URLs:', { imageUrl: flyerData.image_url, iconUrl: flyerData.icon_url });
console.error('[DEBUG] Full Flyer Data to be saved:', JSON.stringify(flyerData, null, 2));
stages[2].status = 'completed';
stages[3].status = 'completed';
await job.updateProgress({ stages });
// Stage 4: Save to Database
stages[3].status = 'in-progress';
// Stage 5: Save to Database
stages[4].status = 'in-progress';
await job.updateProgress({ stages });
let flyerId: number;
try {
const { flyer } = await db.withTransaction(async (client) => {
// This assumes createFlyerAndItems is refactored to accept a transactional client.
const { flyer: newFlyer } = await createFlyerAndItems(flyerData, itemsForDb, logger, client);
// Instantiate a new AdminRepository with the transactional client to ensure
// the activity log is part of the same transaction.
const transactionalAdminRepo = new AdminRepository(client);
await transactionalAdminRepo.logActivity(
{
action: 'flyer_processed',
displayText: `Processed flyer for ${flyerData.store_name}`,
details: { flyer_id: newFlyer.flyer_id, store_name: flyerData.store_name },
userId: job.data.userId,
},
logger,
);
return { flyer: newFlyer };
});
const flyer = await this.persistenceService.saveFlyer(
flyerData,
itemsForDb,
job.data.userId,
logger,
);
flyerId = flyer.flyer_id;
} catch (error) {
// Capture specific validation errors and append context for debugging
if (error instanceof Error && error.message.includes('Invalid URL')) {
const msg = `DB Validation Failed: ${error.message}. ImageURL: '${flyerData.image_url}', IconURL: '${flyerData.icon_url}'`;
console.error('[ERROR] ' + msg);
throw new Error(msg);
}
if (error instanceof FlyerProcessingError) throw error;
throw new DatabaseError(error instanceof Error ? error.message : String(error));
// Errors are already normalized by the persistence service or are critical.
// We re-throw to trigger the catch block below which handles reporting.
throw error;
}
stages[3].status = 'completed';
stages[4].status = 'completed';
await job.updateProgress({ stages });
// Enqueue a job to clean up the original and any generated files.
@@ -294,6 +284,7 @@ export class FlyerProcessingService {
const errorCodeToStageMap = new Map<string, string>([
['PDF_CONVERSION_FAILED', 'Preparing Inputs'],
['UNSUPPORTED_FILE_TYPE', 'Preparing Inputs'],
['IMAGE_CONVERSION_FAILED', 'Image Optimization'],
['AI_VALIDATION_FAILED', 'Extracting Data with AI'],
['TRANSFORMATION_FAILED', 'Transforming AI Data'],
['DATABASE_ERROR', 'Saving to Database'],

View File

@@ -14,6 +14,7 @@ import * as db from './db/index.db';
import { FlyerProcessingService } from './flyerProcessingService.server';
import { FlyerAiProcessor } from './flyerAiProcessor.server';
import { FlyerDataTransformer } from './flyerDataTransformer';
import { FlyerPersistenceService } from './flyerPersistenceService.server';
import {
cleanupQueue,
flyerQueue,
@@ -39,6 +40,7 @@ const execAsync = promisify(exec);
export const fsAdapter: IFileSystem = {
readdir: (path: string, options: { withFileTypes: true }) => fsPromises.readdir(path, options),
unlink: (path: string) => fsPromises.unlink(path),
rename: (oldPath: string, newPath: string) => fsPromises.rename(oldPath, newPath),
};
const flyerProcessingService = new FlyerProcessingService(
@@ -47,6 +49,7 @@ const flyerProcessingService = new FlyerProcessingService(
fsAdapter,
cleanupQueue,
new FlyerDataTransformer(),
new FlyerPersistenceService(),
);
const normalizeError = (error: unknown): Error => {
@@ -152,6 +155,21 @@ logger.info('All workers started and listening for jobs.');
const SHUTDOWN_TIMEOUT = 30000; // 30 seconds
/**
* Closes all workers. Used primarily for integration testing to ensure clean teardown
* without exiting the process.
*/
export const closeWorkers = async () => {
await Promise.all([
flyerWorker.close(),
emailWorker.close(),
analyticsWorker.close(),
cleanupWorker.close(),
weeklyAnalyticsWorker.close(),
tokenCleanupWorker.close(),
]);
};
export const gracefulShutdown = async (signal: string) => {
logger.info(
`[Shutdown] Received ${signal}. Initiating graceful shutdown (timeout: ${SHUTDOWN_TIMEOUT / 1000}s)...`,

View File

@@ -45,6 +45,7 @@ describe('Flyer Processing Background Job Integration Test', () => {
const createdUserIds: string[] = [];
const createdFlyerIds: number[] = [];
const createdFilePaths: string[] = [];
let workersModule: typeof import('../../services/workers.server');
beforeAll(async () => {
// FIX: Stub FRONTEND_URL to ensure valid absolute URLs (http://...) are generated
@@ -57,6 +58,11 @@ describe('Flyer Processing Background Job Integration Test', () => {
// imports 'aiService', it gets the instance we are controlling here.
vi.spyOn(aiService, 'extractCoreDataFromFlyerImage').mockImplementation(mockExtractCoreData);
// NEW: Import workers to start them IN-PROCESS.
// This ensures they run in the same memory space as our mocks.
console.log('[TEST SETUP] Starting in-process workers...');
workersModule = await import('../../services/workers.server');
const appModule = await import('../../../server');
const app = appModule.default;
request = supertest(app);
@@ -104,6 +110,16 @@ describe('Flyer Processing Background Job Integration Test', () => {
// Use the centralized file cleanup utility.
await cleanupFiles(createdFilePaths);
// NEW: Clean up workers and Redis connection to prevent tests from hanging.
if (workersModule) {
console.log('[TEST TEARDOWN] Closing in-process workers...');
await workersModule.closeWorkers();
}
// Close the shared redis connection used by the workers/queues
const { connection } = await import('../../services/redis.server');
await connection.quit();
});
/**
@@ -413,8 +429,8 @@ it(
// Arrange: Prepare a unique flyer file for upload.
const imagePath = path.resolve(__dirname, '../assets/test-flyer-image.jpg');
const imageBuffer = await fs.readFile(imagePath);
const uniqueContent = Buffer.concat([imageBuffer, Buffer.from(`fail-test-${Date.now()}`)]);
const uniqueFileName = `ai-fail-test-${Date.now()}.jpg`;
const uniqueContent = Buffer.concat([imageBuffer, Buffer.from(`ai-error-test-${Date.now()}`)]);
const uniqueFileName = `ai-error-test-${Date.now()}.jpg`;
const mockImageFile = new File([new Uint8Array(uniqueContent)], uniqueFileName, { type: 'image/jpeg' });
const checksum = await generateFileChecksum(mockImageFile);
@@ -465,8 +481,8 @@ it(
// Arrange: Prepare a unique flyer file for upload.
const imagePath = path.resolve(__dirname, '../assets/test-flyer-image.jpg');
const imageBuffer = await fs.readFile(imagePath);
const uniqueContent = Buffer.concat([imageBuffer, Buffer.from(`db-fail-test-${Date.now()}`)]);
const uniqueFileName = `db-fail-test-${Date.now()}.jpg`;
const uniqueContent = Buffer.concat([imageBuffer, Buffer.from(`db-error-test-${Date.now()}`)]);
const uniqueFileName = `db-error-test-${Date.now()}.jpg`;
const mockImageFile = new File([new Uint8Array(uniqueContent)], uniqueFileName, { type: 'image/jpeg' });
const checksum = await generateFileChecksum(mockImageFile);
@@ -517,9 +533,9 @@ it(
const imageBuffer = await fs.readFile(imagePath);
const uniqueContent = Buffer.concat([
imageBuffer,
Buffer.from(`cleanup-fail-test-${Date.now()}`),
Buffer.from(`cleanup-test-${Date.now()}`),
]);
const uniqueFileName = `cleanup-fail-test-${Date.now()}.jpg`;
const uniqueFileName = `cleanup-test-${Date.now()}.jpg`;
const mockImageFile = new File([new Uint8Array(uniqueContent)], uniqueFileName, { type: 'image/jpeg' });
const checksum = await generateFileChecksum(mockImageFile);

View File

@@ -20,6 +20,7 @@ import type {
} from '../../types';
import type { Flyer } from '../../types';
import { cleanupFiles } from '../utils/cleanupFiles';
import { aiService } from '../../services/aiService.server';
/**
* @vitest-environment node
@@ -29,17 +30,6 @@ const { mockExtractCoreData } = vi.hoisted(() => ({
mockExtractCoreData: vi.fn(),
}));
// Mock the AI service to prevent real API calls during integration tests.
// This is crucial for making the tests reliable and fast. We don't want to
// depend on the external Gemini API.
vi.mock('../../services/aiService.server', async (importOriginal) => {
const actual = await importOriginal<typeof import('../../services/aiService.server')>();
// To preserve the class instance methods of `aiService`, we must modify the
// instance directly rather than creating a new plain object with spread syntax.
actual.aiService.extractCoreDataFromFlyerImage = mockExtractCoreData;
return actual;
});
// Mock the image processor to control icon generation for legacy uploads
vi.mock('../../utils/imageProcessor', async () => {
const actual = await vi.importActual<typeof imageProcessor>('../../utils/imageProcessor');
@@ -56,11 +46,21 @@ describe('Gamification Flow Integration Test', () => {
const createdFlyerIds: number[] = [];
const createdFilePaths: string[] = [];
const createdStoreIds: number[] = [];
let workersModule: typeof import('../../services/workers.server');
beforeAll(async () => {
// Stub environment variables for URL generation in the background worker.
// This needs to be in beforeAll to ensure it's set before any code that might use it is imported.
vi.stubEnv('FRONTEND_URL', 'https://example.com');
// Spy on the actual singleton instance. This ensures that when the worker
// imports 'aiService', it gets the instance we are controlling here.
vi.spyOn(aiService, 'extractCoreDataFromFlyerImage').mockImplementation(mockExtractCoreData);
// Import workers to start them IN-PROCESS.
// This ensures they run in the same memory space as our mocks.
workersModule = await import('../../services/workers.server');
const app = (await import('../../../server')).default;
request = supertest(app);
@@ -91,12 +91,23 @@ describe('Gamification Flow Integration Test', () => {
afterAll(async () => {
vi.unstubAllEnvs();
vi.restoreAllMocks(); // Restore the AI spy
await cleanupDb({
userIds: testUser ? [testUser.user.user_id] : [],
flyerIds: createdFlyerIds,
storeIds: createdStoreIds,
});
await cleanupFiles(createdFilePaths);
// Clean up workers and Redis connection to prevent tests from hanging.
if (workersModule) {
await workersModule.closeWorkers();
}
// Close the shared redis connection used by the workers/queues
const { connection } = await import('../../services/redis.server');
await connection.quit();
});
it(

View File

@@ -0,0 +1,59 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { Request } from 'express';
describe('rateLimit utils', () => {
beforeEach(() => {
vi.resetModules();
vi.unstubAllEnvs();
});
afterEach(() => {
vi.unstubAllEnvs();
});
describe('shouldSkipRateLimit', () => {
it('should return false (do not skip) when NODE_ENV is "production"', async () => {
vi.stubEnv('NODE_ENV', 'production');
const { shouldSkipRateLimit } = await import('./rateLimit');
const req = { headers: {} } as Request;
expect(shouldSkipRateLimit(req)).toBe(false);
});
it('should return false (do not skip) when NODE_ENV is "development"', async () => {
vi.stubEnv('NODE_ENV', 'development');
const { shouldSkipRateLimit } = await import('./rateLimit');
const req = { headers: {} } as Request;
expect(shouldSkipRateLimit(req)).toBe(false);
});
it('should return true (skip) when NODE_ENV is "test" and header is missing', async () => {
vi.stubEnv('NODE_ENV', 'test');
const { shouldSkipRateLimit } = await import('./rateLimit');
const req = { headers: {} } as Request;
expect(shouldSkipRateLimit(req)).toBe(true);
});
it('should return false (do not skip) when NODE_ENV is "test" and header is "true"', async () => {
vi.stubEnv('NODE_ENV', 'test');
const { shouldSkipRateLimit } = await import('./rateLimit');
const req = {
headers: { 'x-test-rate-limit-enable': 'true' },
} as unknown as Request;
expect(shouldSkipRateLimit(req)).toBe(false);
});
it('should return true (skip) when NODE_ENV is "test" and header is "false"', async () => {
vi.stubEnv('NODE_ENV', 'test');
const { shouldSkipRateLimit } = await import('./rateLimit');
const req = {
headers: { 'x-test-rate-limit-enable': 'false' },
} as unknown as Request;
expect(shouldSkipRateLimit(req)).toBe(true);
});
});
});