Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9e56bc707 | ||
| e5d09c73b7 |
4
package-lock.json
generated
4
package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "flyer-crawler",
|
||||
"version": "0.9.18",
|
||||
"version": "0.9.19",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "flyer-crawler",
|
||||
"version": "0.9.18",
|
||||
"version": "0.9.19",
|
||||
"dependencies": {
|
||||
"@bull-board/api": "^6.14.2",
|
||||
"@bull-board/express": "^6.14.2",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "flyer-crawler",
|
||||
"private": true,
|
||||
"version": "0.9.18",
|
||||
"version": "0.9.19",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "concurrently \"npm:start:dev\" \"vite\"",
|
||||
|
||||
@@ -3,8 +3,8 @@ import { describe, it, expect, vi, beforeEach, afterAll, afterEach } from 'vites
|
||||
import supertest from 'supertest';
|
||||
import express, { Request, Response, NextFunction } from 'express';
|
||||
import { errorHandler } from './errorHandler'; // This was a duplicate, fixed.
|
||||
import { DatabaseError } from '../services/processingErrors';
|
||||
import {
|
||||
DatabaseError,
|
||||
ForeignKeyConstraintError,
|
||||
UniqueConstraintError,
|
||||
ValidationError,
|
||||
@@ -69,7 +69,7 @@ app.get('/unique-error', (req, res, next) => {
|
||||
});
|
||||
|
||||
app.get('/db-error-500', (req, res, next) => {
|
||||
next(new DatabaseError('A database connection issue occurred.', 500));
|
||||
next(new DatabaseError('A database connection issue occurred.'));
|
||||
});
|
||||
|
||||
app.get('/unauthorized-error-no-status', (req, res, next) => {
|
||||
|
||||
@@ -115,14 +115,14 @@ describe('User DB Service', () => {
|
||||
});
|
||||
|
||||
describe('createUser', () => {
|
||||
it('should execute a transaction to create a user and profile', async () => {
|
||||
it('should create a user and profile using the provided client', async () => {
|
||||
const mockUser = {
|
||||
user_id: 'new-user-id',
|
||||
email: 'new@example.com',
|
||||
created_at: new Date().toISOString(),
|
||||
updated_at: new Date().toISOString(),
|
||||
};
|
||||
// This is the flat structure returned by the DB query inside createUser
|
||||
|
||||
const mockDbProfile = {
|
||||
user_id: 'new-user-id',
|
||||
email: 'new@example.com',
|
||||
@@ -136,7 +136,7 @@ describe('User DB Service', () => {
|
||||
user_created_at: new Date().toISOString(),
|
||||
user_updated_at: new Date().toISOString(),
|
||||
};
|
||||
// This is the nested structure the function is expected to return
|
||||
|
||||
const expectedProfile: UserProfile = {
|
||||
user: {
|
||||
user_id: mockDbProfile.user_id,
|
||||
@@ -696,14 +696,14 @@ describe('User DB Service', () => {
|
||||
|
||||
describe('createPasswordResetToken', () => {
|
||||
it('should execute DELETE and INSERT queries', async () => {
|
||||
mockPoolInstance.query.mockResolvedValue({ rows: [] });
|
||||
const mockClient = { query: vi.fn().mockResolvedValue({ rows: [] }) };
|
||||
const expires = new Date();
|
||||
await userRepo.createPasswordResetToken('123', 'token-hash', expires, mockLogger);
|
||||
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
||||
await userRepo.createPasswordResetToken('123', 'token-hash', expires, mockLogger, mockClient as unknown as PoolClient);
|
||||
expect(mockClient.query).toHaveBeenCalledWith(
|
||||
'DELETE FROM public.password_reset_tokens WHERE user_id = $1',
|
||||
['123'],
|
||||
);
|
||||
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
||||
expect(mockClient.query).toHaveBeenCalledWith(
|
||||
expect.stringContaining('INSERT INTO public.password_reset_tokens'),
|
||||
['123', 'token-hash', expires],
|
||||
);
|
||||
@@ -712,18 +712,18 @@ describe('User DB Service', () => {
|
||||
it('should throw ForeignKeyConstraintError if user does not exist', async () => {
|
||||
const dbError = new Error('violates foreign key constraint');
|
||||
(dbError as Error & { code: string }).code = '23503';
|
||||
mockPoolInstance.query.mockRejectedValue(dbError);
|
||||
const mockClient = { query: vi.fn().mockRejectedValue(dbError) };
|
||||
await expect(
|
||||
userRepo.createPasswordResetToken('non-existent-user', 'hash', new Date(), mockLogger),
|
||||
userRepo.createPasswordResetToken('non-existent-user', 'hash', new Date(), mockLogger, mockClient as unknown as PoolClient),
|
||||
).rejects.toThrow(ForeignKeyConstraintError);
|
||||
});
|
||||
|
||||
it('should throw a generic error if the database query fails', async () => {
|
||||
const dbError = new Error('DB Error');
|
||||
mockPoolInstance.query.mockRejectedValue(dbError);
|
||||
const mockClient = { query: vi.fn().mockRejectedValue(dbError) };
|
||||
const expires = new Date();
|
||||
await expect(
|
||||
userRepo.createPasswordResetToken('123', 'token-hash', expires, mockLogger),
|
||||
userRepo.createPasswordResetToken('123', 'token-hash', expires, mockLogger, mockClient as unknown as PoolClient),
|
||||
).rejects.toThrow('Failed to create password reset token.');
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
{ err: dbError, userId: '123' },
|
||||
|
||||
@@ -115,7 +115,6 @@ describe('FlyerProcessingService', () => {
|
||||
service = new FlyerProcessingService(
|
||||
mockFileHandler,
|
||||
mockAiProcessor,
|
||||
mockedDb,
|
||||
mockFs,
|
||||
mockCleanupQueue,
|
||||
new FlyerDataTransformer(),
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
// src/services/flyerProcessingService.server.ts
|
||||
import type { Job, Queue } from 'bullmq';
|
||||
import { UnrecoverableError } from 'bullmq';
|
||||
import path from 'path';
|
||||
import type { Logger } from 'pino';
|
||||
import type { FlyerFileHandler, IFileSystem, ICommandExecutor } from './flyerFileHandler.server';
|
||||
import type { FlyerAiProcessor } from './flyerAiProcessor.server';
|
||||
import type * as Db from './db/index.db';
|
||||
import * as db from './db/index.db';
|
||||
import { AdminRepository } from './db/admin.db';
|
||||
import { FlyerDataTransformer } from './flyerDataTransformer';
|
||||
import type { FlyerJobData, CleanupJobData } from '../types/job-data';
|
||||
@@ -13,11 +14,11 @@ import {
|
||||
PdfConversionError,
|
||||
AiDataValidationError,
|
||||
UnsupportedFileTypeError,
|
||||
DatabaseError,
|
||||
DatabaseError, // This is from processingErrors
|
||||
} from './processingErrors';
|
||||
import { NotFoundError } from './db/errors.db';
|
||||
import { createFlyerAndItems } from './db/flyer.db';
|
||||
import { logger as globalLogger } from './logger.server';
|
||||
import { withTransaction } from './db/index.db';
|
||||
|
||||
// Define ProcessingStage locally as it's not exported from the types file.
|
||||
export type ProcessingStage = {
|
||||
@@ -36,9 +37,6 @@ export class FlyerProcessingService {
|
||||
constructor(
|
||||
private fileHandler: FlyerFileHandler,
|
||||
private aiProcessor: FlyerAiProcessor,
|
||||
// This service only needs the `logActivity` method from the `adminRepo`.
|
||||
// By using `Pick`, we create a more focused and testable dependency.
|
||||
private db: { adminRepo: Pick<AdminRepository, 'logActivity'> },
|
||||
private fs: IFileSystem,
|
||||
// By depending on `Pick<Queue, 'add'>`, we specify that this service only needs
|
||||
// an object with an `add` method that matches the Queue's `add` method signature.
|
||||
@@ -110,7 +108,7 @@ export class FlyerProcessingService {
|
||||
stages[3].status = 'in-progress';
|
||||
await job.updateProgress({ stages });
|
||||
|
||||
const { flyer } = await withTransaction(async (client) => {
|
||||
const { flyer } = await db.withTransaction(async (client) => {
|
||||
// This assumes createFlyerAndItems is refactored to accept a transactional client.
|
||||
const { flyer: newFlyer } = await createFlyerAndItems(flyerData, itemsForDb, logger, client);
|
||||
|
||||
@@ -167,14 +165,51 @@ export class FlyerProcessingService {
|
||||
const logger = globalLogger.child({ jobId: job.id, jobName: job.name, ...job.data });
|
||||
logger.info('Picked up file cleanup job.');
|
||||
|
||||
const { paths } = job.data;
|
||||
if (!paths || paths.length === 0) {
|
||||
logger.warn('Job received no paths to clean. Skipping.');
|
||||
return { status: 'skipped', reason: 'no paths' };
|
||||
const { flyerId, paths } = job.data;
|
||||
let pathsToDelete = paths;
|
||||
|
||||
// If no paths are provided (e.g., from a manual trigger), attempt to derive them from the database.
|
||||
if (!pathsToDelete || pathsToDelete.length === 0) {
|
||||
logger.warn(`Cleanup job for flyer ${flyerId} received no paths. Attempting to derive paths from DB.`);
|
||||
try {
|
||||
const flyer = await db.flyerRepo.getFlyerById(flyerId);
|
||||
const derivedPaths: string[] = [];
|
||||
// This path needs to be configurable and match where multer saves files.
|
||||
const storagePath = process.env.STORAGE_PATH || '/var/www/flyer-crawler.projectium.com/flyer-images';
|
||||
|
||||
if (flyer.image_url) {
|
||||
try {
|
||||
const imageName = path.basename(new URL(flyer.image_url).pathname);
|
||||
derivedPaths.push(path.join(storagePath, imageName));
|
||||
} catch (urlError) {
|
||||
logger.error({ err: urlError, url: flyer.image_url }, 'Failed to parse flyer.image_url to derive file path.');
|
||||
}
|
||||
}
|
||||
if (flyer.icon_url) {
|
||||
try {
|
||||
const iconName = path.basename(new URL(flyer.icon_url).pathname);
|
||||
derivedPaths.push(path.join(storagePath, 'icons', iconName));
|
||||
} catch (urlError) {
|
||||
logger.error({ err: urlError, url: flyer.icon_url }, 'Failed to parse flyer.icon_url to derive file path.');
|
||||
}
|
||||
}
|
||||
pathsToDelete = derivedPaths;
|
||||
} catch (error) {
|
||||
if (error instanceof NotFoundError) {
|
||||
logger.error({ flyerId }, 'Cannot derive cleanup paths because flyer was not found in DB.');
|
||||
throw new UnrecoverableError(`Cleanup failed: Flyer with ID ${flyerId} not found.`);
|
||||
}
|
||||
throw error; // Re-throw other DB errors to allow for retries.
|
||||
}
|
||||
}
|
||||
|
||||
if (!pathsToDelete || pathsToDelete.length === 0) {
|
||||
logger.warn('Job received no paths and could not derive any from the database. Skipping.');
|
||||
return { status: 'skipped', reason: 'no paths derived' };
|
||||
}
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
paths.map(async (filePath) => {
|
||||
pathsToDelete.map(async (filePath) => {
|
||||
try {
|
||||
await this.fs.unlink(filePath);
|
||||
logger.info(`Successfully deleted temporary file: ${filePath}`);
|
||||
@@ -193,12 +228,12 @@ export class FlyerProcessingService {
|
||||
|
||||
const failedDeletions = results.filter((r) => r.status === 'rejected');
|
||||
if (failedDeletions.length > 0) {
|
||||
const failedPaths = paths.filter((_, i) => results[i].status === 'rejected');
|
||||
const failedPaths = pathsToDelete.filter((_, i) => results[i].status === 'rejected');
|
||||
throw new Error(`Failed to delete ${failedDeletions.length} file(s): ${failedPaths.join(', ')}`);
|
||||
}
|
||||
|
||||
logger.info(`Successfully deleted all ${paths.length} temporary files.`);
|
||||
return { status: 'success', deletedCount: paths.length };
|
||||
logger.info(`Successfully deleted all ${pathsToDelete.length} temporary files.`);
|
||||
return { status: 'success', deletedCount: pathsToDelete.length };
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -44,7 +44,6 @@ const fsAdapter: IFileSystem = {
|
||||
const flyerProcessingService = new FlyerProcessingService(
|
||||
new FlyerFileHandler(fsAdapter, execAsync),
|
||||
new FlyerAiProcessor(aiService, db.personalizationRepo),
|
||||
db,
|
||||
fsAdapter,
|
||||
cleanupQueue,
|
||||
new FlyerDataTransformer(),
|
||||
|
||||
@@ -19,7 +19,7 @@ export interface FlyerJobData {
|
||||
*/
|
||||
export interface CleanupJobData {
|
||||
flyerId: number;
|
||||
paths: string[];
|
||||
paths?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -27,4 +27,29 @@ export interface CleanupJobData {
|
||||
*/
|
||||
export interface TokenCleanupJobData {
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the shape of the data payload for a daily analytics report job.
|
||||
*/
|
||||
export interface AnalyticsJobData {
|
||||
reportDate: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the shape of the data payload for a weekly analytics report job.
|
||||
*/
|
||||
export interface WeeklyAnalyticsJobData {
|
||||
reportYear: number;
|
||||
reportWeek: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the shape of the data payload for an email sending job.
|
||||
*/
|
||||
export interface EmailJobData {
|
||||
to: string;
|
||||
subject: string;
|
||||
text: string;
|
||||
html: string;
|
||||
}
|
||||
Reference in New Issue
Block a user