Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be5bda169e | ||
| 4ede403356 | |||
| 5d31605b80 | |||
| ddd4ad024e | |||
|
|
4e927f48bd | ||
| af5644d17a | |||
|
|
016c0a883a | ||
| c6a5f889b4 |
@@ -158,7 +158,7 @@ jobs:
|
||||
else
|
||||
echo "Version mismatch (Running: $RUNNING_VERSION -> Deployed: $NEW_VERSION) or app not running. Reloading PM2..."
|
||||
fi
|
||||
pm2 startOrReload ecosystem.config.cjs --env production && pm2 save
|
||||
pm2 startOrReload ecosystem.config.cjs --env production --update-env && pm2 save
|
||||
echo "Production backend server reloaded successfully."
|
||||
else
|
||||
echo "Version $NEW_VERSION is already running. Skipping PM2 reload."
|
||||
|
||||
@@ -90,10 +90,11 @@ jobs:
|
||||
# integration test suite can launch its own, fresh server instance.
|
||||
# '|| true' ensures the workflow doesn't fail if the process isn't running.
|
||||
run: |
|
||||
pm2 stop flyer-crawler-api-test || true
|
||||
pm2 stop flyer-crawler-worker-test || true
|
||||
pm2 delete flyer-crawler-api-test || true
|
||||
pm2 delete flyer-crawler-worker-test || true
|
||||
echo "--- Stopping and deleting all test processes ---"
|
||||
# Use a script to parse pm2's JSON output and delete any process whose name ends with '-test'.
|
||||
# This is safer than 'pm2 delete all' and more robust than naming each process individually.
|
||||
# It prevents the accumulation of duplicate processes from previous test runs.
|
||||
node -e "const exec = require('child_process').execSync; try { const list = JSON.parse(exec('pm2 jlist').toString()); list.forEach(p => { if (p.name && p.name.endsWith('-test')) { console.log('Deleting test process: ' + p.name + ' (' + p.pm2_env.pm_id + ')'); try { exec('pm2 delete ' + p.pm2_env.pm_id); } catch(e) { console.error('Failed to delete ' + p.pm2_env.pm_id, e.message); } } }); console.log('✅ Test process cleanup complete.'); } catch (e) { if (e.stdout.toString().includes('No process found')) { console.log('No PM2 processes running, cleanup not needed.'); } else { console.error('Error cleaning up test processes:', e.message); } }" || true
|
||||
|
||||
- name: Run All Tests and Generate Merged Coverage Report
|
||||
# This single step runs both unit and integration tests, then merges their
|
||||
@@ -405,7 +406,7 @@ jobs:
|
||||
# Use `startOrReload` with the ecosystem file. This is the standard, idempotent way to deploy.
|
||||
# It will START the process if it's not running, or RELOAD it if it is.
|
||||
# We also add `&& pm2 save` to persist the process list across server reboots.
|
||||
pm2 startOrReload ecosystem.config.cjs --env test && pm2 save
|
||||
pm2 startOrReload ecosystem.config.cjs --env test --update-env && pm2 save
|
||||
echo "Test backend server reloaded successfully."
|
||||
|
||||
# After a successful deployment, update the schema hash in the database.
|
||||
|
||||
@@ -157,7 +157,7 @@ jobs:
|
||||
else
|
||||
echo "Version mismatch (Running: $RUNNING_VERSION -> Deployed: $NEW_VERSION) or app not running. Reloading PM2..."
|
||||
fi
|
||||
pm2 startOrReload ecosystem.config.cjs --env production && pm2 save
|
||||
pm2 startOrReload ecosystem.config.cjs --env production --update-env && pm2 save
|
||||
echo "Production backend server reloaded successfully."
|
||||
else
|
||||
echo "Version $NEW_VERSION is already running. Skipping PM2 reload."
|
||||
|
||||
@@ -3,23 +3,37 @@
|
||||
// It allows us to define all the settings for our application in one place.
|
||||
// The .cjs extension is required because the project's package.json has "type": "module".
|
||||
|
||||
// --- Environment Variable Validation ---
|
||||
const requiredSecrets = ['DB_HOST', 'JWT_SECRET', 'GEMINI_API_KEY'];
|
||||
const missingSecrets = requiredSecrets.filter(key => !process.env[key]);
|
||||
|
||||
if (missingSecrets.length > 0) {
|
||||
console.warn('\n[ecosystem.config.cjs] ⚠️ WARNING: The following environment variables are MISSING in the shell:');
|
||||
missingSecrets.forEach(key => console.warn(` - ${key}`));
|
||||
console.warn('[ecosystem.config.cjs] The application may crash if these are required for startup.\n');
|
||||
} else {
|
||||
console.log('[ecosystem.config.cjs] ✅ Critical environment variables are present.');
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
apps: [
|
||||
{
|
||||
// --- API Server ---
|
||||
// The name is now dynamically set based on the environment.
|
||||
// This is a common pattern but requires you to call pm2 with the correct name.
|
||||
// The deploy script handles this by using 'flyer-crawler-api' for prod and 'flyer-crawler-api-test' for test.
|
||||
name: 'flyer-crawler-api',
|
||||
script: './node_modules/.bin/tsx',
|
||||
args: 'server.ts', // tsx will execute this file
|
||||
max_memory_restart: '500M', // Restart if memory usage exceeds 500MB
|
||||
args: 'server.ts',
|
||||
max_memory_restart: '500M',
|
||||
|
||||
// Restart Logic
|
||||
max_restarts: 40,
|
||||
exp_backoff_restart_delay: 100,
|
||||
min_uptime: '10s',
|
||||
|
||||
// Production Environment Settings
|
||||
env_production: {
|
||||
NODE_ENV: 'production', // Set the Node.js environment to production
|
||||
NODE_ENV: 'production',
|
||||
name: 'flyer-crawler-api',
|
||||
cwd: '/var/www/flyer-crawler.projectium.com',
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -39,10 +53,9 @@ module.exports = {
|
||||
},
|
||||
// Test Environment Settings
|
||||
env_test: {
|
||||
NODE_ENV: 'test', // Set to 'test' to match the environment purpose and disable pino-pretty
|
||||
NODE_ENV: 'test',
|
||||
name: 'flyer-crawler-api-test',
|
||||
cwd: '/var/www/flyer-crawler-test.projectium.com',
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -66,7 +79,6 @@ module.exports = {
|
||||
name: 'flyer-crawler-api-dev',
|
||||
watch: true,
|
||||
ignore_watch: ['node_modules', 'logs', '*.log', 'flyer-images', '.git'],
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -89,14 +101,19 @@ module.exports = {
|
||||
// --- General Worker ---
|
||||
name: 'flyer-crawler-worker',
|
||||
script: './node_modules/.bin/tsx',
|
||||
args: 'src/services/worker.ts', // tsx will execute this file
|
||||
max_memory_restart: '1G', // Restart if memory usage exceeds 1GB
|
||||
args: 'src/services/worker.ts',
|
||||
max_memory_restart: '1G',
|
||||
|
||||
// Restart Logic
|
||||
max_restarts: 40,
|
||||
exp_backoff_restart_delay: 100,
|
||||
min_uptime: '10s',
|
||||
|
||||
// Production Environment Settings
|
||||
env_production: {
|
||||
NODE_ENV: 'production',
|
||||
name: 'flyer-crawler-worker',
|
||||
cwd: '/var/www/flyer-crawler.projectium.com',
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -119,7 +136,6 @@ module.exports = {
|
||||
NODE_ENV: 'test',
|
||||
name: 'flyer-crawler-worker-test',
|
||||
cwd: '/var/www/flyer-crawler-test.projectium.com',
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -143,7 +159,6 @@ module.exports = {
|
||||
name: 'flyer-crawler-worker-dev',
|
||||
watch: true,
|
||||
ignore_watch: ['node_modules', 'logs', '*.log', 'flyer-images', '.git'],
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -166,14 +181,19 @@ module.exports = {
|
||||
// --- Analytics Worker ---
|
||||
name: 'flyer-crawler-analytics-worker',
|
||||
script: './node_modules/.bin/tsx',
|
||||
args: 'src/services/worker.ts', // tsx will execute this file
|
||||
max_memory_restart: '1G', // Restart if memory usage exceeds 1GB
|
||||
args: 'src/services/worker.ts',
|
||||
max_memory_restart: '1G',
|
||||
|
||||
// Restart Logic
|
||||
max_restarts: 40,
|
||||
exp_backoff_restart_delay: 100,
|
||||
min_uptime: '10s',
|
||||
|
||||
// Production Environment Settings
|
||||
env_production: {
|
||||
NODE_ENV: 'production',
|
||||
name: 'flyer-crawler-analytics-worker',
|
||||
cwd: '/var/www/flyer-crawler.projectium.com',
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -196,7 +216,6 @@ module.exports = {
|
||||
NODE_ENV: 'test',
|
||||
name: 'flyer-crawler-analytics-worker-test',
|
||||
cwd: '/var/www/flyer-crawler-test.projectium.com',
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
@@ -220,7 +239,6 @@ module.exports = {
|
||||
name: 'flyer-crawler-analytics-worker-dev',
|
||||
watch: true,
|
||||
ignore_watch: ['node_modules', 'logs', '*.log', 'flyer-images', '.git'],
|
||||
// Inherit secrets from the deployment environment
|
||||
DB_HOST: process.env.DB_HOST,
|
||||
DB_USER: process.env.DB_USER,
|
||||
DB_PASSWORD: process.env.DB_PASSWORD,
|
||||
|
||||
4
package-lock.json
generated
4
package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "flyer-crawler",
|
||||
"version": "0.2.8",
|
||||
"version": "0.2.11",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "flyer-crawler",
|
||||
"version": "0.2.8",
|
||||
"version": "0.2.11",
|
||||
"dependencies": {
|
||||
"@bull-board/api": "^6.14.2",
|
||||
"@bull-board/express": "^6.14.2",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "flyer-crawler",
|
||||
"private": true,
|
||||
"version": "0.2.8",
|
||||
"version": "0.2.11",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "concurrently \"npm:start:dev\" \"vite\"",
|
||||
|
||||
@@ -21,7 +21,6 @@ import {
|
||||
mockUseFlyerItems,
|
||||
} from './tests/setup/mockHooks';
|
||||
import { useAppInitialization } from './hooks/useAppInitialization';
|
||||
import { useModal } from './hooks/useModal';
|
||||
|
||||
// Mock top-level components rendered by App's routes
|
||||
|
||||
@@ -57,10 +56,6 @@ vi.mock('./hooks/useFlyerItems', async () => {
|
||||
vi.mock('./hooks/useAppInitialization');
|
||||
const mockedUseAppInitialization = vi.mocked(useAppInitialization);
|
||||
|
||||
// Mock useModal directly in this file to avoid dependency on mockHooks.ts
|
||||
vi.mock('./hooks/useModal');
|
||||
const mockedUseModal = vi.mocked(useModal);
|
||||
|
||||
vi.mock('./hooks/useAuth', async () => {
|
||||
const hooks = await import('./tests/setup/mockHooks');
|
||||
return { useAuth: hooks.mockUseAuth };
|
||||
@@ -131,11 +126,21 @@ vi.mock('./layouts/MainLayout', async () => {
|
||||
return { MainLayout: MockMainLayout };
|
||||
});
|
||||
|
||||
vi.mock('./components/AppGuard', () => ({
|
||||
AppGuard: ({ children }: { children: React.ReactNode }) => (
|
||||
<div data-testid="app-guard-mock">{children}</div>
|
||||
),
|
||||
}));
|
||||
vi.mock('./components/AppGuard', async () => {
|
||||
// We need to use the real useModal hook inside our mock AppGuard
|
||||
const { useModal } = await vi.importActual<typeof import('./hooks/useModal')>('./hooks/useModal');
|
||||
return {
|
||||
AppGuard: ({ children }: { children: React.ReactNode }) => {
|
||||
const { isModalOpen } = useModal();
|
||||
return (
|
||||
<div data-testid="app-guard-mock">
|
||||
{children}
|
||||
{isModalOpen('whatsNew') && <div data-testid="whats-new-modal-mock" />}
|
||||
</div>
|
||||
);
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const mockedAiApiClient = vi.mocked(aiApiClient);
|
||||
const mockedApiClient = vi.mocked(apiClient);
|
||||
@@ -196,11 +201,6 @@ describe('App Component', () => {
|
||||
error: null,
|
||||
});
|
||||
mockedUseAppInitialization.mockReturnValue({ isDarkMode: false, unitSystem: 'imperial' });
|
||||
mockedUseModal.mockReturnValue({
|
||||
isModalOpen: vi.fn(),
|
||||
openModal: vi.fn(),
|
||||
closeModal: vi.fn(),
|
||||
});
|
||||
|
||||
// Default mocks for API calls
|
||||
// Use mockImplementation to create a new Response object for each call,
|
||||
@@ -391,6 +391,7 @@ describe('App Component', () => {
|
||||
|
||||
describe('Modal Interactions', () => {
|
||||
it('should open and close the ProfileManager modal', async () => {
|
||||
console.log('[TEST DEBUG] Test Start: should open and close the ProfileManager modal');
|
||||
renderApp();
|
||||
expect(screen.queryByTestId('profile-manager-mock')).not.toBeInTheDocument();
|
||||
|
||||
@@ -398,11 +399,13 @@ describe('App Component', () => {
|
||||
fireEvent.click(screen.getByText('Open Profile'));
|
||||
expect(await screen.findByTestId('profile-manager-mock')).toBeInTheDocument();
|
||||
|
||||
console.log('[TEST DEBUG] ProfileManager modal opened. Now closing...');
|
||||
// Close modal
|
||||
fireEvent.click(screen.getByText('Close Profile'));
|
||||
await waitFor(() => {
|
||||
expect(screen.queryByTestId('profile-manager-mock')).not.toBeInTheDocument();
|
||||
});
|
||||
console.log('[TEST DEBUG] ProfileManager modal closed.');
|
||||
});
|
||||
|
||||
it('should open and close the VoiceAssistant modal for authenticated users', async () => {
|
||||
@@ -427,7 +430,7 @@ describe('App Component', () => {
|
||||
fireEvent.click(screen.getByText('Open Voice Assistant'));
|
||||
|
||||
console.log('[TEST DEBUG] Waiting for voice-assistant-mock');
|
||||
expect(await screen.findByTestId('voice-assistant-mock')).toBeInTheDocument();
|
||||
expect(await screen.findByTestId('voice-assistant-mock', {}, { timeout: 3000 })).toBeInTheDocument();
|
||||
|
||||
// Close modal
|
||||
fireEvent.click(screen.getByText('Close Voice Assistant'));
|
||||
@@ -586,7 +589,7 @@ describe('App Component', () => {
|
||||
renderApp();
|
||||
console.log('[TEST DEBUG] Opening Profile');
|
||||
fireEvent.click(screen.getByText('Open Profile'));
|
||||
const loginButton = await screen.findByText('Login');
|
||||
const loginButton = await screen.findByRole('button', { name: 'Login' });
|
||||
console.log('[TEST DEBUG] Clicking Login');
|
||||
fireEvent.click(loginButton);
|
||||
|
||||
@@ -622,7 +625,8 @@ describe('App Component', () => {
|
||||
renderApp();
|
||||
const openButton = await screen.findByTitle("Show what's new in this version");
|
||||
fireEvent.click(openButton);
|
||||
expect(mockedUseModal().openModal).toHaveBeenCalledWith('whatsNew');
|
||||
// The mock AppGuard now renders the modal when it's open
|
||||
expect(await screen.findByTestId('whats-new-modal-mock')).toBeInTheDocument();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@ const mockedUseAuth = vi.mocked(useAuth);
|
||||
const mockedUseModal = vi.mocked(useModal);
|
||||
const mockedUseNavigate = vi.mocked(useNavigate);
|
||||
|
||||
const mockLogin = vi.fn();
|
||||
const mockLogin = vi.fn().mockResolvedValue(undefined);
|
||||
const mockNavigate = vi.fn();
|
||||
const mockOpenModal = vi.fn();
|
||||
|
||||
@@ -61,7 +61,7 @@ describe('useAppInitialization Hook', () => {
|
||||
// Mock localStorage
|
||||
Object.defineProperty(window, 'localStorage', {
|
||||
value: {
|
||||
getItem: vi.fn(),
|
||||
getItem: vi.fn().mockReturnValue(null),
|
||||
setItem: vi.fn(),
|
||||
removeItem: vi.fn(),
|
||||
clear: vi.fn(),
|
||||
@@ -74,6 +74,7 @@ describe('useAppInitialization Hook', () => {
|
||||
matches: false, // default to light mode
|
||||
})),
|
||||
writable: true,
|
||||
configurable: true,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ export const useAppInitialization = () => {
|
||||
if (userProfile && userProfile.preferences?.darkMode !== undefined) {
|
||||
localStorage.setItem('darkMode', String(userProfile.preferences.darkMode));
|
||||
}
|
||||
}, [userProfile?.preferences?.darkMode, userProfile?.user.user_id]);
|
||||
}, [userProfile]);
|
||||
|
||||
// Effect to set initial unit system based on user profile or local storage
|
||||
useEffect(() => {
|
||||
|
||||
@@ -79,7 +79,7 @@ vi.mock('../pages/admin/ActivityLog', async () => {
|
||||
),
|
||||
};
|
||||
});
|
||||
vi.mock('../pages/admin/components/AnonymousUserBanner', () => ({
|
||||
vi.mock('../components/AnonymousUserBanner', () => ({
|
||||
AnonymousUserBanner: () => <div data-testid="anonymous-banner" />,
|
||||
}));
|
||||
vi.mock('../components/ErrorDisplay', () => ({
|
||||
|
||||
@@ -374,7 +374,11 @@ describe('AuthView', () => {
|
||||
fireEvent.submit(screen.getByTestId('auth-form'));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(screen.getByRole('button', { name: 'Register' })).toBeDisabled();
|
||||
const submitButton = screen.getByTestId('auth-form').querySelector('button[type="submit"]');
|
||||
expect(submitButton).toBeInTheDocument();
|
||||
expect(submitButton).toBeDisabled();
|
||||
// Verify the text 'Register' is gone from any button
|
||||
expect(screen.queryByRole('button', { name: 'Register' })).not.toBeInTheDocument();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
79
src/services/analyticsService.server.ts
Normal file
79
src/services/analyticsService.server.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
// src/services/analyticsService.server.ts
|
||||
import type { Job } from 'bullmq';
|
||||
import { logger as globalLogger } from './logger.server';
|
||||
import type { AnalyticsJobData, WeeklyAnalyticsJobData } from './queues.server';
|
||||
|
||||
/**
|
||||
* A service class to encapsulate business logic for analytics-related background jobs.
|
||||
*/
|
||||
export class AnalyticsService {
|
||||
/**
|
||||
* Processes a job to generate a daily analytics report.
|
||||
* This is currently a mock implementation.
|
||||
* @param job The BullMQ job object.
|
||||
*/
|
||||
async processDailyReportJob(job: Job<AnalyticsJobData>) {
|
||||
const { reportDate } = job.data;
|
||||
const logger = globalLogger.child({
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
reportDate,
|
||||
});
|
||||
|
||||
logger.info(`Picked up daily analytics job.`);
|
||||
|
||||
try {
|
||||
// This is mock logic, but we keep it in the service
|
||||
if (reportDate === 'FAIL') {
|
||||
throw new Error('This is a test failure for the analytics job.');
|
||||
}
|
||||
// Simulate work
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||
logger.info(`Successfully generated report for ${reportDate}.`);
|
||||
return { status: 'success', reportDate };
|
||||
} catch (error) {
|
||||
const wrappedError = error instanceof Error ? error : new Error(String(error));
|
||||
logger.error(
|
||||
{
|
||||
err: wrappedError,
|
||||
attemptsMade: job.attemptsMade,
|
||||
},
|
||||
`Daily analytics job failed.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a job to generate a weekly analytics report.
|
||||
* This is currently a mock implementation.
|
||||
* @param job The BullMQ job object.
|
||||
*/
|
||||
async processWeeklyReportJob(job: Job<WeeklyAnalyticsJobData>) {
|
||||
const { reportYear, reportWeek } = job.data;
|
||||
const logger = globalLogger.child({
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
reportYear,
|
||||
reportWeek,
|
||||
});
|
||||
|
||||
logger.info(`Picked up weekly analytics job.`);
|
||||
|
||||
try {
|
||||
// Mock logic
|
||||
await new Promise((resolve) => setTimeout(resolve, 30000));
|
||||
logger.info(`Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`);
|
||||
return { status: 'success', reportYear, reportWeek };
|
||||
} catch (error) {
|
||||
const wrappedError = error instanceof Error ? error : new Error(String(error));
|
||||
logger.error(
|
||||
{ err: wrappedError, attemptsMade: job.attemptsMade },
|
||||
`Weekly analytics job failed.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const analyticsService = new AnalyticsService();
|
||||
@@ -4,8 +4,11 @@
|
||||
* It is configured via environment variables and should only be used on the server.
|
||||
*/
|
||||
import nodemailer from 'nodemailer';
|
||||
import type { Job } from 'bullmq';
|
||||
import type { Logger } from 'pino';
|
||||
import { logger as globalLogger } from './logger.server';
|
||||
import { WatchedItemDeal } from '../types';
|
||||
import type { EmailJobData } from './queues.server';
|
||||
|
||||
// 1. Create a Nodemailer transporter using SMTP configuration from environment variables.
|
||||
// For development, you can use a service like Ethereal (https://ethereal.email/)
|
||||
@@ -20,18 +23,11 @@ const transporter = nodemailer.createTransport({
|
||||
},
|
||||
});
|
||||
|
||||
interface EmailOptions {
|
||||
to: string;
|
||||
subject: string;
|
||||
text: string;
|
||||
html: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an email using the pre-configured transporter.
|
||||
* @param options The email options, including recipient, subject, and body.
|
||||
*/
|
||||
export const sendEmail = async (options: EmailOptions, logger: Logger) => {
|
||||
export const sendEmail = async (options: EmailJobData, logger: Logger) => {
|
||||
const mailOptions = {
|
||||
from: `"Flyer Crawler" <${process.env.SMTP_FROM_EMAIL}>`, // sender address
|
||||
to: options.to,
|
||||
@@ -40,16 +36,37 @@ export const sendEmail = async (options: EmailOptions, logger: Logger) => {
|
||||
html: options.html,
|
||||
};
|
||||
|
||||
const info = await transporter.sendMail(mailOptions);
|
||||
logger.info(
|
||||
{ to: options.to, subject: options.subject, messageId: info.messageId },
|
||||
`Email sent successfully.`,
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* Processes an email sending job from the queue.
|
||||
* This is the entry point for the email worker.
|
||||
* It encapsulates logging and error handling for the job.
|
||||
* @param job The BullMQ job object.
|
||||
*/
|
||||
export const processEmailJob = async (job: Job<EmailJobData>) => {
|
||||
const jobLogger = globalLogger.child({
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
recipient: job.data.to,
|
||||
});
|
||||
|
||||
jobLogger.info(`Picked up email job.`);
|
||||
|
||||
try {
|
||||
const info = await transporter.sendMail(mailOptions);
|
||||
logger.info(
|
||||
{ to: options.to, subject: options.subject, messageId: info.messageId },
|
||||
`Email sent successfully.`,
|
||||
);
|
||||
await sendEmail(job.data, jobLogger);
|
||||
} catch (error) {
|
||||
logger.error({ err: error, to: options.to, subject: options.subject }, 'Failed to send email.');
|
||||
// Re-throwing the error is important so the background job knows it failed.
|
||||
throw error;
|
||||
const wrappedError = error instanceof Error ? error : new Error(String(error));
|
||||
jobLogger.error(
|
||||
{ err: wrappedError, jobData: job.data, attemptsMade: job.attemptsMade },
|
||||
`Email job failed.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -153,6 +153,9 @@ describe('FlyerDataTransformer', () => {
|
||||
expect(mockLogger.info).toHaveBeenCalledWith(
|
||||
'Starting data transformation from AI output to database format.',
|
||||
);
|
||||
expect(mockLogger.warn).toHaveBeenCalledWith(
|
||||
'AI did not return a store name. Using fallback "Unknown Store (auto)".',
|
||||
);
|
||||
expect(mockLogger.info).toHaveBeenCalledWith(
|
||||
{ itemCount: 0, storeName: 'Unknown Store (auto)' },
|
||||
'Data transformation complete.',
|
||||
|
||||
@@ -60,12 +60,17 @@ export class FlyerDataTransformer {
|
||||
updated_at: new Date().toISOString(),
|
||||
}));
|
||||
|
||||
const storeName = extractedData.store_name || 'Unknown Store (auto)';
|
||||
if (!extractedData.store_name) {
|
||||
logger.warn('AI did not return a store name. Using fallback "Unknown Store (auto)".');
|
||||
}
|
||||
|
||||
const flyerData: FlyerInsert = {
|
||||
file_name: originalFileName,
|
||||
image_url: `/flyer-images/${path.basename(firstImage)}`,
|
||||
icon_url: `/flyer-images/icons/${iconFileName}`,
|
||||
checksum,
|
||||
store_name: extractedData.store_name || 'Unknown Store (auto)',
|
||||
store_name: storeName,
|
||||
valid_from: extractedData.valid_from,
|
||||
valid_to: extractedData.valid_to,
|
||||
store_address: extractedData.store_address, // The number of items is now calculated directly from the transformed data.
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
// src/services/flyerProcessingService.server.test.ts
|
||||
import { describe, it, expect, vi, beforeEach, type Mocked } from 'vitest';
|
||||
import sharp from 'sharp';
|
||||
import { Job } from 'bullmq';
|
||||
import { Job, UnrecoverableError } from 'bullmq';
|
||||
import type { Dirent } from 'node:fs';
|
||||
import type { Logger } from 'pino';
|
||||
import { z } from 'zod';
|
||||
import { AiFlyerDataSchema } from './flyerProcessingService.server';
|
||||
import type { Flyer, FlyerInsert } from '../types';
|
||||
|
||||
import type { Flyer, FlyerInsert, FlyerItemInsert } from '../types';
|
||||
import type { CleanupJobData } from './flyerProcessingService.server';
|
||||
export interface FlyerJobData {
|
||||
filePath: string;
|
||||
originalFileName: string;
|
||||
@@ -55,7 +55,7 @@ import * as imageProcessor from '../utils/imageProcessor';
|
||||
import { createMockFlyer } from '../tests/utils/mockFactories';
|
||||
import { FlyerDataTransformer } from './flyerDataTransformer';
|
||||
import {
|
||||
AiDataValidationError,
|
||||
AiDataValidationError, // This was a duplicate, fixed.
|
||||
PdfConversionError,
|
||||
UnsupportedFileTypeError,
|
||||
} from './processingErrors';
|
||||
@@ -181,6 +181,16 @@ describe('FlyerProcessingService', () => {
|
||||
} as unknown as Job<FlyerJobData>;
|
||||
};
|
||||
|
||||
const createMockCleanupJob = (data: CleanupJobData): Job<CleanupJobData> => {
|
||||
return {
|
||||
id: `cleanup-job-${data.flyerId}`,
|
||||
data,
|
||||
opts: { attempts: 3 },
|
||||
attemptsMade: 1,
|
||||
updateProgress: vi.fn(),
|
||||
} as unknown as Job<CleanupJobData>;
|
||||
};
|
||||
|
||||
describe('processJob (Orchestrator)', () => {
|
||||
it('should process an image file successfully and enqueue a cleanup job', async () => {
|
||||
const job = createMockJob({ filePath: '/tmp/flyer.jpg', originalFileName: 'flyer.jpg' });
|
||||
@@ -243,6 +253,7 @@ describe('FlyerProcessingService', () => {
|
||||
|
||||
it('should throw an error and not enqueue cleanup if the AI service fails', async () => {
|
||||
const job = createMockJob({});
|
||||
const { logger } = await import('./logger.server');
|
||||
const aiError = new Error('AI model exploded');
|
||||
vi.mocked(mockedAiService.aiService.extractCoreDataFromFlyerImage).mockRejectedValue(aiError);
|
||||
|
||||
@@ -251,12 +262,37 @@ describe('FlyerProcessingService', () => {
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'UNKNOWN_ERROR',
|
||||
message: 'AI model exploded',
|
||||
}); // This was a duplicate, fixed.
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw UnrecoverableError for quota issues and not enqueue cleanup', async () => {
|
||||
const job = createMockJob({});
|
||||
// Simulate an AI error that contains a keyword for unrecoverable errors
|
||||
const quotaError = new Error('AI model quota exceeded');
|
||||
const { logger } = await import('./logger.server');
|
||||
vi.mocked(mockedAiService.aiService.extractCoreDataFromFlyerImage).mockRejectedValue(
|
||||
quotaError,
|
||||
);
|
||||
|
||||
await expect(service.processJob(job)).rejects.toThrow(UnrecoverableError);
|
||||
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'QUOTA_EXCEEDED',
|
||||
message: 'An AI quota has been exceeded. Please try again later.',
|
||||
});
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw PdfConversionError and not enqueue cleanup if PDF conversion fails', async () => {
|
||||
const job = createMockJob({ filePath: '/tmp/bad.pdf', originalFileName: 'bad.pdf' });
|
||||
const { logger } = await import('./logger.server');
|
||||
const conversionError = new PdfConversionError('Conversion failed', 'pdftocairo error');
|
||||
// Make the conversion step fail
|
||||
mocks.execAsync.mockRejectedValue(conversionError);
|
||||
@@ -266,9 +302,12 @@ describe('FlyerProcessingService', () => {
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'PDF_CONVERSION_FAILED',
|
||||
message:
|
||||
'The uploaded PDF could not be processed. It might be blank, corrupt, or password-protected.',
|
||||
'The uploaded PDF could not be processed. It might be blank, corrupt, or password-protected.', // This was a duplicate, fixed.
|
||||
});
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw AiDataValidationError and not enqueue cleanup if AI validation fails', async () => {
|
||||
@@ -290,9 +329,12 @@ describe('FlyerProcessingService', () => {
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'AI_VALIDATION_FAILED',
|
||||
message:
|
||||
"The AI couldn't read the flyer's format. Please try a clearer image or a different flyer.",
|
||||
"The AI couldn't read the flyer's format. Please try a clearer image or a different flyer.", // This was a duplicate, fixed.
|
||||
});
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
|
||||
// FIX: This test was incorrect. The service *does* support GIF conversion.
|
||||
@@ -358,6 +400,7 @@ describe('FlyerProcessingService', () => {
|
||||
|
||||
it('should throw an error and not enqueue cleanup if the database service fails', async () => {
|
||||
const job = createMockJob({});
|
||||
const { logger } = await import('./logger.server');
|
||||
const dbError = new Error('Database transaction failed');
|
||||
vi.mocked(createFlyerAndItems).mockRejectedValue(dbError);
|
||||
|
||||
@@ -366,8 +409,11 @@ describe('FlyerProcessingService', () => {
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'UNKNOWN_ERROR',
|
||||
message: 'Database transaction failed',
|
||||
});
|
||||
}); // This was a duplicate, fixed.
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw UnsupportedFileTypeError for an unsupported file type', async () => {
|
||||
@@ -375,25 +421,23 @@ describe('FlyerProcessingService', () => {
|
||||
filePath: '/tmp/document.txt',
|
||||
originalFileName: 'document.txt',
|
||||
});
|
||||
const { logger } = await import('./logger.server');
|
||||
|
||||
await expect(service.processJob(job)).rejects.toThrow(UnsupportedFileTypeError);
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'UNSUPPORTED_FILE_TYPE',
|
||||
message:
|
||||
'Unsupported file type: .txt. Supported types are PDF, JPG, PNG, WEBP, HEIC, HEIF, GIF, TIFF, SVG, BMP.',
|
||||
'Unsupported file type: .txt. Supported types are PDF, JPG, PNG, WEBP, HEIC, HEIF, GIF, TIFF, SVG, BMP.', // This was a duplicate, fixed.
|
||||
});
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should log a warning and not enqueue cleanup if the job fails but a flyer ID was somehow generated', async () => {
|
||||
const job = createMockJob({});
|
||||
vi.mocked(createFlyerAndItems).mockRejectedValue(new Error('DB Error'));
|
||||
await expect(service.processJob(job)).rejects.toThrow();
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw an error and not enqueue cleanup if icon generation fails', async () => {
|
||||
const job = createMockJob({});
|
||||
const { logger } = await import('./logger.server');
|
||||
const iconError = new Error('Icon generation failed.');
|
||||
// The `transform` method calls `generateFlyerIcon`. In `beforeEach`, `transform` is mocked
|
||||
// to always succeed. For this test, we override that mock to simulate a failure
|
||||
@@ -405,8 +449,11 @@ describe('FlyerProcessingService', () => {
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'UNKNOWN_ERROR',
|
||||
message: 'Icon generation failed.',
|
||||
});
|
||||
}); // This was a duplicate, fixed.
|
||||
expect(mockCleanupQueue.add).not.toHaveBeenCalled();
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.',
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -443,6 +490,26 @@ describe('FlyerProcessingService', () => {
|
||||
});
|
||||
|
||||
describe('_extractFlyerDataWithAI (private method)', () => {
|
||||
it('should call AI service and return validated data on success', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
const jobData = createMockJob({}).data;
|
||||
const mockAiResponse = {
|
||||
store_name: 'AI Store',
|
||||
valid_from: '2024-01-01',
|
||||
valid_to: '2024-01-07',
|
||||
store_address: '123 AI St',
|
||||
items: [],
|
||||
};
|
||||
vi.mocked(mockedAiService.aiService.extractCoreDataFromFlyerImage).mockResolvedValue(
|
||||
mockAiResponse,
|
||||
);
|
||||
|
||||
const result = await (service as any)._extractFlyerDataWithAI([], jobData, logger);
|
||||
|
||||
expect(mockedAiService.aiService.extractCoreDataFromFlyerImage).toHaveBeenCalledTimes(1);
|
||||
expect(result).toEqual(mockAiResponse);
|
||||
});
|
||||
|
||||
it('should throw AiDataValidationError if AI response validation fails', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
const jobData = createMockJob({}).data;
|
||||
@@ -458,6 +525,45 @@ describe('FlyerProcessingService', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('_validateAiData (private method)', () => {
|
||||
it('should return validated data on success', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
const validData = {
|
||||
store_name: 'Test Store',
|
||||
valid_from: '2024-01-01',
|
||||
valid_to: '2024-01-07',
|
||||
store_address: '123 Test St',
|
||||
items: [
|
||||
{ item: 'Apple', price_display: '$1', price_in_cents: 100, quantity: '1', category_name: 'Produce' },
|
||||
],
|
||||
};
|
||||
|
||||
const result = (service as any)._validateAiData(validData, logger);
|
||||
|
||||
expect(result).toEqual(validData);
|
||||
expect(logger.info).toHaveBeenCalledWith('AI extracted 1 items.');
|
||||
});
|
||||
|
||||
it('should throw AiDataValidationError on invalid data', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
const invalidData = {
|
||||
// store_name is missing, which will fail validation
|
||||
items: [],
|
||||
};
|
||||
|
||||
expect(() => (service as any)._validateAiData(invalidData, logger)).toThrow(
|
||||
AiDataValidationError,
|
||||
);
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
errors: expect.any(Object),
|
||||
rawData: invalidData,
|
||||
}),
|
||||
'AI response failed validation.',
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('_enqueueCleanup (private method)', () => {
|
||||
it('should enqueue a cleanup job with the correct parameters', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
@@ -492,43 +598,40 @@ describe('FlyerProcessingService', () => {
|
||||
});
|
||||
|
||||
describe('_saveProcessedFlyerData (private method)', () => {
|
||||
it('should transform data, create flyer in DB, and log activity', async () => {
|
||||
it('should save flyer to DB and log activity', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
// Arrange
|
||||
const mockExtractedData = {
|
||||
store_name: 'Test Store',
|
||||
valid_from: '2024-01-01',
|
||||
valid_to: '2024-01-07',
|
||||
store_address: '123 Mock St',
|
||||
items: [
|
||||
{
|
||||
item: 'Test Item',
|
||||
price_display: '$1.99',
|
||||
price_in_cents: 199,
|
||||
quantity: 'each',
|
||||
category_name: 'Test Category',
|
||||
master_item_id: 1,
|
||||
},
|
||||
],
|
||||
};
|
||||
const mockImagePaths = [{ path: '/tmp/flyer.jpg', mimetype: 'image/jpeg' }];
|
||||
const mockJobData = {
|
||||
filePath: '/tmp/flyer.jpg',
|
||||
originalFileName: 'flyer.jpg',
|
||||
const mockFlyerData: FlyerInsert = {
|
||||
file_name: 'flyer.jpg',
|
||||
image_url: '/flyer-images/flyer.jpg',
|
||||
icon_url: '/flyer-images/icons/icon-flyer.webp',
|
||||
checksum: 'checksum-123',
|
||||
userId: 'user-abc',
|
||||
store_name: 'Test Store',
|
||||
item_count: 1,
|
||||
valid_from: null,
|
||||
valid_to: null,
|
||||
store_address: null,
|
||||
uploaded_by: 'user-abc',
|
||||
};
|
||||
const mockItemsForDb: FlyerItemInsert[] = [
|
||||
{
|
||||
item: 'Test Item',
|
||||
price_display: '$1.99',
|
||||
price_in_cents: 199,
|
||||
quantity: 'each',
|
||||
category_name: 'Test Category',
|
||||
master_item_id: 1,
|
||||
view_count: 0,
|
||||
click_count: 0,
|
||||
},
|
||||
];
|
||||
const userId = 'user-abc';
|
||||
|
||||
// The DB create function is also mocked in beforeEach.
|
||||
// Create a complete mock that satisfies the Flyer type.
|
||||
const mockNewFlyer = createMockFlyer({
|
||||
flyer_id: 1,
|
||||
file_name: 'flyer.jpg',
|
||||
image_url: '/flyer-images/flyer.jpg',
|
||||
icon_url: '/flyer-images/icons/icon-flyer.webp',
|
||||
checksum: 'checksum-123',
|
||||
store_id: 1,
|
||||
item_count: 1,
|
||||
store: { name: 'Test Store' }, // Match the logActivity expectation
|
||||
});
|
||||
vi.mocked(createFlyerAndItems).mockResolvedValue({ flyer: mockNewFlyer, items: [] });
|
||||
|
||||
@@ -536,30 +639,22 @@ describe('FlyerProcessingService', () => {
|
||||
const result = await (
|
||||
service as unknown as {
|
||||
_saveProcessedFlyerData: (
|
||||
extractedData: z.infer<typeof AiFlyerDataSchema>,
|
||||
imagePaths: { path: string; mimetype: string }[],
|
||||
jobData: FlyerJobData,
|
||||
flyerData: FlyerInsert,
|
||||
itemsForDb: FlyerItemInsert[],
|
||||
userId: string | undefined,
|
||||
logger: Logger,
|
||||
) => Promise<Flyer>;
|
||||
}
|
||||
)._saveProcessedFlyerData(mockExtractedData, mockImagePaths, mockJobData, logger);
|
||||
)._saveProcessedFlyerData(mockFlyerData, mockItemsForDb, userId, logger);
|
||||
|
||||
// Assert
|
||||
// 1. Transformer was called correctly
|
||||
expect(FlyerDataTransformer.prototype.transform).toHaveBeenCalledWith(
|
||||
mockExtractedData,
|
||||
mockImagePaths,
|
||||
mockJobData.originalFileName,
|
||||
mockJobData.checksum,
|
||||
mockJobData.userId,
|
||||
logger,
|
||||
);
|
||||
// 1. Transformer should NOT be called from this method anymore.
|
||||
expect(FlyerDataTransformer.prototype.transform).not.toHaveBeenCalled();
|
||||
|
||||
// 2. DB function was called with the transformed data
|
||||
// The data comes from the mock defined in `beforeEach`.
|
||||
expect(createFlyerAndItems).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ store_name: 'Mock Store', checksum: 'checksum-123' }),
|
||||
[], // itemsForDb from the mock
|
||||
mockFlyerData,
|
||||
mockItemsForDb,
|
||||
logger,
|
||||
);
|
||||
|
||||
@@ -568,8 +663,8 @@ describe('FlyerProcessingService', () => {
|
||||
{
|
||||
userId: 'user-abc',
|
||||
action: 'flyer_processed' as const,
|
||||
displayText: 'Processed a new flyer for Mock Store.', // This was a duplicate, fixed.
|
||||
details: { flyerId: 1, storeName: 'Mock Store' },
|
||||
displayText: 'Processed a new flyer for Test Store.',
|
||||
details: { flyerId: 1, storeName: 'Test Store' },
|
||||
},
|
||||
logger,
|
||||
);
|
||||
@@ -579,6 +674,45 @@ describe('FlyerProcessingService', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('_logFlyerProcessedActivity (private method)', () => {
|
||||
it('should log the flyer processing activity with a user ID', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
const mockFlyer = createMockFlyer({ flyer_id: 1, store: { name: 'Test Store' } });
|
||||
const userId = 'user-123';
|
||||
|
||||
// Access and call the private method for testing
|
||||
await (service as any)._logFlyerProcessedActivity(mockFlyer, userId, logger);
|
||||
|
||||
expect(mockedDb.adminRepo.logActivity).toHaveBeenCalledWith(
|
||||
{
|
||||
userId: 'user-123',
|
||||
action: 'flyer_processed',
|
||||
displayText: 'Processed a new flyer for Test Store.',
|
||||
details: { flyerId: 1, storeName: 'Test Store' },
|
||||
},
|
||||
logger,
|
||||
);
|
||||
});
|
||||
|
||||
it('should log the activity without a user ID for anonymous uploads', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
const mockFlyer = createMockFlyer({ flyer_id: 2, store: { name: 'Another Store' } });
|
||||
|
||||
// Call with undefined userId
|
||||
await (service as any)._logFlyerProcessedActivity(mockFlyer, undefined, logger);
|
||||
|
||||
expect(mockedDb.adminRepo.logActivity).toHaveBeenCalledWith(
|
||||
{
|
||||
userId: undefined,
|
||||
action: 'flyer_processed',
|
||||
displayText: 'Processed a new flyer for Another Store.',
|
||||
details: { flyerId: 2, storeName: 'Another Store' },
|
||||
},
|
||||
logger,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('_convertPdfToImages (private method)', () => {
|
||||
it('should call pdftocairo and return sorted image paths on success', async () => {
|
||||
const { logger } = await import('./logger.server');
|
||||
@@ -636,4 +770,106 @@ describe('FlyerProcessingService', () => {
|
||||
).rejects.toThrow(commandError);
|
||||
});
|
||||
});
|
||||
|
||||
describe('_reportErrorAndThrow (private method)', () => {
|
||||
it('should update progress and throw UnrecoverableError for quota messages', async () => {
|
||||
const job = createMockJob({});
|
||||
const quotaError = new Error('RESOURCE_EXHAUSTED');
|
||||
const privateMethod = (service as any)._reportErrorAndThrow;
|
||||
|
||||
await expect(privateMethod(quotaError, job, {} as Logger)).rejects.toThrow(
|
||||
UnrecoverableError,
|
||||
);
|
||||
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'QUOTA_EXCEEDED',
|
||||
message: 'An AI quota has been exceeded. Please try again later.',
|
||||
});
|
||||
});
|
||||
|
||||
it('should update progress and re-throw standard errors', async () => {
|
||||
const job = createMockJob({});
|
||||
const genericError = new Error('A standard failure');
|
||||
const privateMethod = (service as any)._reportErrorAndThrow;
|
||||
|
||||
await expect(privateMethod(genericError, job, {} as Logger)).rejects.toThrow(genericError);
|
||||
|
||||
expect(job.updateProgress).toHaveBeenCalledWith({
|
||||
errorCode: 'UNKNOWN_ERROR',
|
||||
message: 'A standard failure', // This was a duplicate, fixed.
|
||||
});
|
||||
});
|
||||
|
||||
it('should wrap and throw non-Error objects', async () => {
|
||||
const job = createMockJob({});
|
||||
const nonError = 'just a string error';
|
||||
const privateMethod = (service as any)._reportErrorAndThrow;
|
||||
|
||||
await expect(privateMethod(nonError, job, {} as Logger)).rejects.toThrow('just a string error');
|
||||
});
|
||||
});
|
||||
|
||||
describe('processCleanupJob', () => {
|
||||
it('should delete all files successfully', async () => {
|
||||
const job = createMockCleanupJob({ flyerId: 1, paths: ['/tmp/file1', '/tmp/file2'] });
|
||||
mocks.unlink.mockResolvedValue(undefined);
|
||||
|
||||
const result = await service.processCleanupJob(job);
|
||||
|
||||
expect(mocks.unlink).toHaveBeenCalledTimes(2);
|
||||
expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file1');
|
||||
expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file2');
|
||||
expect(result).toEqual({ status: 'success', deletedCount: 2 });
|
||||
});
|
||||
|
||||
it('should handle ENOENT errors gracefully and still succeed', async () => {
|
||||
const job = createMockCleanupJob({ flyerId: 1, paths: ['/tmp/file1', '/tmp/file2'] });
|
||||
const enoentError: NodeJS.ErrnoException = new Error('File not found');
|
||||
enoentError.code = 'ENOENT';
|
||||
|
||||
mocks.unlink.mockResolvedValueOnce(undefined).mockRejectedValueOnce(enoentError);
|
||||
|
||||
const result = await service.processCleanupJob(job);
|
||||
|
||||
expect(mocks.unlink).toHaveBeenCalledTimes(2);
|
||||
expect(result).toEqual({ status: 'success', deletedCount: 2 });
|
||||
// Check that the warning was logged
|
||||
const { logger } = await import('./logger.server');
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'File not found during cleanup (already deleted?): /tmp/file2',
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw an aggregate error if a non-ENOENT error occurs', async () => {
|
||||
const job = createMockCleanupJob({
|
||||
flyerId: 1,
|
||||
paths: ['/tmp/file1', '/tmp/permission-denied'],
|
||||
});
|
||||
const permissionError: NodeJS.ErrnoException = new Error('Permission denied');
|
||||
permissionError.code = 'EACCES';
|
||||
|
||||
mocks.unlink.mockResolvedValueOnce(undefined).mockRejectedValueOnce(permissionError);
|
||||
|
||||
await expect(service.processCleanupJob(job)).rejects.toThrow(
|
||||
'Failed to delete 1 file(s): /tmp/permission-denied',
|
||||
);
|
||||
|
||||
// Check that the error was logged
|
||||
const { logger } = await import('./logger.server');
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ err: permissionError, path: '/tmp/permission-denied' }),
|
||||
'Failed to delete temporary file.',
|
||||
);
|
||||
});
|
||||
|
||||
it('should skip processing and return "skipped" if paths array is empty', async () => {
|
||||
const job = createMockCleanupJob({ flyerId: 1, paths: [] });
|
||||
const result = await service.processCleanupJob(job);
|
||||
|
||||
expect(mocks.unlink).not.toHaveBeenCalled();
|
||||
expect(result).toEqual({ status: 'skipped', reason: 'no paths' });
|
||||
const { logger } = await import('./logger.server');
|
||||
expect(logger.warn).toHaveBeenCalledWith('Job received no paths to clean. Skipping.');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
// src/services/flyerProcessingService.server.ts
|
||||
import type { Job, JobsOptions } from 'bullmq';
|
||||
import { Job, JobsOptions, UnrecoverableError } from 'bullmq';
|
||||
import sharp from 'sharp';
|
||||
import path from 'path';
|
||||
import type { Dirent } from 'node:fs';
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
import { FlyerDataTransformer } from './flyerDataTransformer';
|
||||
import { logger as globalLogger } from './logger.server';
|
||||
import type { Logger } from 'pino';
|
||||
import type { Flyer, FlyerInsert, FlyerItemInsert } from '../types';
|
||||
|
||||
// Helper for consistent required string validation (handles missing/null/empty)
|
||||
const requiredString = (message: string) =>
|
||||
@@ -47,7 +48,7 @@ export interface FlyerJobData {
|
||||
userProfileAddress?: string;
|
||||
}
|
||||
|
||||
interface CleanupJobData {
|
||||
export interface CleanupJobData {
|
||||
flyerId: number;
|
||||
// An array of absolute file paths to be deleted. Made optional for manual cleanup triggers.
|
||||
paths?: string[];
|
||||
@@ -93,6 +94,49 @@ export class FlyerProcessingService {
|
||||
private transformer: FlyerDataTransformer,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Executes the pdftocairo command to convert the PDF.
|
||||
*/
|
||||
private async _executePdfConversion(
|
||||
filePath: string,
|
||||
outputFilePrefix: string,
|
||||
logger: Logger,
|
||||
): Promise<{ stdout: string; stderr: string }> {
|
||||
const command = `pdftocairo -jpeg -r 150 "${filePath}" "${outputFilePrefix}"`;
|
||||
logger.info(`Executing PDF conversion command`);
|
||||
logger.debug({ command });
|
||||
const { stdout, stderr } = await this.exec(command);
|
||||
|
||||
if (stdout) logger.debug({ stdout }, `[Worker] pdftocairo stdout for ${filePath}:`);
|
||||
if (stderr) logger.warn({ stderr }, `[Worker] pdftocairo stderr for ${filePath}:`);
|
||||
|
||||
return { stdout, stderr };
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans the output directory for generated JPEG images and returns their paths.
|
||||
*/
|
||||
private async _collectGeneratedImages(
|
||||
outputDir: string,
|
||||
outputFilePrefix: string,
|
||||
logger: Logger,
|
||||
): Promise<string[]> {
|
||||
logger.debug(`[Worker] Reading contents of output directory: ${outputDir}`);
|
||||
const filesInDir = await this.fs.readdir(outputDir, { withFileTypes: true });
|
||||
logger.debug(`[Worker] Found ${filesInDir.length} total entries in output directory.`);
|
||||
|
||||
const generatedImages = filesInDir
|
||||
.filter((f) => f.name.startsWith(path.basename(outputFilePrefix)) && f.name.endsWith('.jpg'))
|
||||
.sort((a, b) => a.name.localeCompare(b.name, undefined, { numeric: true }));
|
||||
|
||||
logger.debug(
|
||||
{ imageNames: generatedImages.map((f) => f.name) },
|
||||
`Filtered down to ${generatedImages.length} generated JPGs.`,
|
||||
);
|
||||
|
||||
return generatedImages.map((img) => path.join(outputDir, img.name));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a PDF file to a series of JPEG images using an external tool.
|
||||
* @param filePath The path to the PDF file.
|
||||
@@ -111,34 +155,17 @@ export class FlyerProcessingService {
|
||||
const outputFilePrefix = path.join(outputDir, path.basename(filePath, '.pdf'));
|
||||
logger.debug({ outputDir, outputFilePrefix }, `PDF output details`);
|
||||
|
||||
const command = `pdftocairo -jpeg -r 150 "${filePath}" "${outputFilePrefix}"`;
|
||||
logger.info(`Executing PDF conversion command`);
|
||||
logger.debug({ command });
|
||||
const { stdout, stderr } = await this.exec(command);
|
||||
const { stderr } = await this._executePdfConversion(filePath, outputFilePrefix, logger);
|
||||
|
||||
if (stdout) logger.debug({ stdout }, `[Worker] pdftocairo stdout for ${filePath}:`);
|
||||
if (stderr) logger.warn({ stderr }, `[Worker] pdftocairo stderr for ${filePath}:`);
|
||||
const imagePaths = await this._collectGeneratedImages(outputDir, outputFilePrefix, logger);
|
||||
|
||||
logger.debug(`[Worker] Reading contents of output directory: ${outputDir}`);
|
||||
const filesInDir = await this.fs.readdir(outputDir, { withFileTypes: true });
|
||||
logger.debug(`[Worker] Found ${filesInDir.length} total entries in output directory.`);
|
||||
|
||||
const generatedImages = filesInDir
|
||||
.filter((f) => f.name.startsWith(path.basename(outputFilePrefix)) && f.name.endsWith('.jpg'))
|
||||
.sort((a, b) => a.name.localeCompare(b.name, undefined, { numeric: true }));
|
||||
|
||||
logger.debug(
|
||||
{ imageNames: generatedImages.map((f) => f.name) },
|
||||
`Filtered down to ${generatedImages.length} generated JPGs.`,
|
||||
);
|
||||
|
||||
if (generatedImages.length === 0) {
|
||||
if (imagePaths.length === 0) {
|
||||
const errorMessage = `PDF conversion resulted in 0 images for file: ${filePath}. The PDF might be blank or corrupt.`;
|
||||
logger.error({ stderr }, `PdfConversionError: ${errorMessage}`);
|
||||
throw new PdfConversionError(errorMessage, stderr);
|
||||
}
|
||||
|
||||
return generatedImages.map((img) => path.join(outputDir, img.name));
|
||||
return imagePaths;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,33 +202,62 @@ export class FlyerProcessingService {
|
||||
logger: Logger,
|
||||
): Promise<{ imagePaths: { path: string; mimetype: string }[]; createdImagePaths: string[] }> {
|
||||
const fileExt = path.extname(filePath).toLowerCase();
|
||||
let imagePaths: { path: string; mimetype: string }[] = [];
|
||||
let createdImagePaths: string[] = [];
|
||||
|
||||
// Handle PDF conversion separately
|
||||
if (fileExt === '.pdf') {
|
||||
const createdImagePaths = await this._convertPdfToImages(filePath, job, logger);
|
||||
const imagePaths = createdImagePaths.map((p) => ({ path: p, mimetype: 'image/jpeg' }));
|
||||
createdImagePaths = await this._convertPdfToImages(filePath, job, logger);
|
||||
imagePaths = createdImagePaths.map((p) => ({ path: p, mimetype: 'image/jpeg' }));
|
||||
logger.info(`Converted PDF to ${imagePaths.length} images.`);
|
||||
return { imagePaths, createdImagePaths };
|
||||
// Handle directly supported single-image formats
|
||||
} else if (SUPPORTED_IMAGE_EXTENSIONS.includes(fileExt)) {
|
||||
logger.info(`Processing as a single image file: ${filePath}`);
|
||||
// Normalize .jpg to image/jpeg for consistency
|
||||
const mimetype =
|
||||
fileExt === '.jpg' || fileExt === '.jpeg' ? 'image/jpeg' : `image/${fileExt.slice(1)}`;
|
||||
const imagePaths = [{ path: filePath, mimetype }];
|
||||
return { imagePaths, createdImagePaths: [] };
|
||||
imagePaths = [{ path: filePath, mimetype }];
|
||||
// No new images created, so createdImagePaths remains empty.
|
||||
// Handle convertible image formats
|
||||
} else if (CONVERTIBLE_IMAGE_EXTENSIONS.includes(fileExt)) {
|
||||
const createdPngPath = await this._convertImageToPng(filePath, logger);
|
||||
const imagePaths = [{ path: createdPngPath, mimetype: 'image/png' }];
|
||||
imagePaths = [{ path: createdPngPath, mimetype: 'image/png' }];
|
||||
// The new PNG is a temporary file that needs to be cleaned up.
|
||||
return { imagePaths, createdImagePaths: [createdPngPath] };
|
||||
createdImagePaths = [createdPngPath];
|
||||
} else {
|
||||
// If the file is neither a PDF nor a supported image, throw an error.
|
||||
const errorMessage = `Unsupported file type: ${fileExt}. Supported types are PDF, JPG, PNG, WEBP, HEIC, HEIF, GIF, TIFF, SVG, BMP.`;
|
||||
logger.error({ originalFileName: job.data.originalFileName, fileExt }, errorMessage);
|
||||
throw new UnsupportedFileTypeError(errorMessage);
|
||||
}
|
||||
|
||||
return { imagePaths, createdImagePaths };
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the raw data from the AI against the Zod schema.
|
||||
* @param extractedData The raw, unknown data from the AI service.
|
||||
* @param logger The job-specific logger instance.
|
||||
* @returns The validated and typed data.
|
||||
* @throws {AiDataValidationError} If the data does not conform to the schema.
|
||||
*/
|
||||
private _validateAiData(
|
||||
extractedData: unknown,
|
||||
logger: Logger,
|
||||
): z.infer<typeof AiFlyerDataSchema> {
|
||||
const validationResult = AiFlyerDataSchema.safeParse(extractedData);
|
||||
if (!validationResult.success) {
|
||||
const errors = validationResult.error.flatten();
|
||||
logger.error({ errors, rawData: extractedData }, 'AI response failed validation.');
|
||||
throw new AiDataValidationError(
|
||||
'AI response validation failed. The returned data structure is incorrect.',
|
||||
errors,
|
||||
extractedData,
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(`AI extracted ${validationResult.data.items.length} items.`);
|
||||
return validationResult.data;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -228,19 +284,7 @@ export class FlyerProcessingService {
|
||||
logger,
|
||||
);
|
||||
|
||||
const validationResult = AiFlyerDataSchema.safeParse(extractedData);
|
||||
if (!validationResult.success) {
|
||||
const errors = validationResult.error.flatten();
|
||||
logger.error({ errors, rawData: extractedData }, 'AI response failed validation.');
|
||||
throw new AiDataValidationError(
|
||||
'AI response validation failed. The returned data structure is incorrect.',
|
||||
errors,
|
||||
extractedData,
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(`AI extracted ${validationResult.data.items.length} items.`);
|
||||
return validationResult.data;
|
||||
return this._validateAiData(extractedData, logger);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -251,47 +295,44 @@ export class FlyerProcessingService {
|
||||
* @returns A promise that resolves to the newly created flyer record.
|
||||
*/
|
||||
private async _saveProcessedFlyerData(
|
||||
extractedData: z.infer<typeof AiFlyerDataSchema>,
|
||||
imagePaths: { path: string; mimetype: string }[],
|
||||
jobData: FlyerJobData,
|
||||
flyerData: FlyerInsert,
|
||||
itemsForDb: FlyerItemInsert[],
|
||||
userId: string | undefined,
|
||||
logger: Logger,
|
||||
) {
|
||||
logger.info(`Preparing to save extracted data to database.`);
|
||||
|
||||
// Ensure store_name is a non-empty string before passing to the transformer.
|
||||
// This makes the handling of the nullable store_name explicit in this service.
|
||||
const dataForTransformer = { ...extractedData };
|
||||
if (!dataForTransformer.store_name) {
|
||||
logger.warn('AI did not return a store name. Using fallback "Unknown Store (auto)".');
|
||||
dataForTransformer.store_name = 'Unknown Store (auto)';
|
||||
}
|
||||
|
||||
// 1. Transform the AI data into database-ready records.
|
||||
const { flyerData, itemsForDb } = await this.transformer.transform(
|
||||
dataForTransformer,
|
||||
imagePaths,
|
||||
jobData.originalFileName,
|
||||
jobData.checksum,
|
||||
jobData.userId,
|
||||
// Pass the job-specific logger to the transformer
|
||||
logger,
|
||||
);
|
||||
|
||||
// 2. Save the transformed data to the database.
|
||||
// 1. Save the transformed data to the database.
|
||||
const { flyer: newFlyer } = await createFlyerAndItems(flyerData, itemsForDb, logger);
|
||||
logger.info({ newFlyerId: newFlyer.flyer_id }, `Successfully saved new flyer.`);
|
||||
|
||||
// 2. Log the activity.
|
||||
await this._logFlyerProcessedActivity(newFlyer, userId, logger);
|
||||
|
||||
return newFlyer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs the successful processing of a flyer to the admin activity log.
|
||||
* @param newFlyer The newly created flyer record from the database.
|
||||
* @param userId The ID of the user who uploaded the flyer, if available.
|
||||
* @param logger The job-specific logger instance.
|
||||
*/
|
||||
private async _logFlyerProcessedActivity(
|
||||
newFlyer: Flyer,
|
||||
userId: string | undefined,
|
||||
logger: Logger,
|
||||
) {
|
||||
const storeName = newFlyer.store?.name || 'Unknown Store';
|
||||
await this.database.adminRepo.logActivity(
|
||||
{
|
||||
userId: jobData.userId,
|
||||
userId: userId,
|
||||
action: 'flyer_processed',
|
||||
displayText: `Processed a new flyer for ${flyerData.store_name}.`,
|
||||
details: { flyerId: newFlyer.flyer_id, storeName: flyerData.store_name },
|
||||
displayText: `Processed a new flyer for ${storeName}.`,
|
||||
details: { flyerId: newFlyer.flyer_id, storeName },
|
||||
},
|
||||
logger,
|
||||
);
|
||||
|
||||
return newFlyer;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -313,10 +354,136 @@ export class FlyerProcessingService {
|
||||
logger.info({ flyerId }, `Enqueued cleanup job.`);
|
||||
}
|
||||
|
||||
async processJob(job: Job<FlyerJobData>) {
|
||||
const { filePath, originalFileName } = job.data;
|
||||
/**
|
||||
* Centralized error handler for the `processJob` method. It logs the error,
|
||||
* updates the job's progress with a user-friendly message, and re-throws the
|
||||
* error for the worker to handle retries or final failure. It also identifies
|
||||
* unrecoverable errors to prevent unnecessary retries.
|
||||
* @param error The error caught during processing.
|
||||
* @param job The BullMQ job instance.
|
||||
* @param logger The job-specific logger.
|
||||
*/
|
||||
private async _reportErrorAndThrow(
|
||||
error: unknown,
|
||||
job: Job<FlyerJobData>,
|
||||
logger: Logger,
|
||||
): Promise<never> {
|
||||
const wrappedError = error instanceof Error ? error : new Error(String(error));
|
||||
const errorMessage = wrappedError.message || '';
|
||||
|
||||
// First, check for unrecoverable quota-related errors.
|
||||
if (
|
||||
errorMessage.includes('quota') ||
|
||||
errorMessage.includes('429') ||
|
||||
errorMessage.includes('RESOURCE_EXHAUSTED')
|
||||
) {
|
||||
logger.error(
|
||||
{ err: wrappedError, jobId: job.id },
|
||||
'[FlyerProcessingService] Unrecoverable quota error detected. Failing job immediately.',
|
||||
);
|
||||
await job.updateProgress({
|
||||
errorCode: 'QUOTA_EXCEEDED',
|
||||
message: 'An AI quota has been exceeded. Please try again later.',
|
||||
});
|
||||
// This specific error type tells the BullMQ worker to fail the job without retries.
|
||||
throw new UnrecoverableError(errorMessage);
|
||||
}
|
||||
|
||||
// Define a structured error payload for job progress updates.
|
||||
// This allows the frontend to provide more specific feedback.
|
||||
let errorPayload = {
|
||||
errorCode: 'UNKNOWN_ERROR',
|
||||
message: 'An unexpected error occurred during processing.',
|
||||
};
|
||||
|
||||
if (error instanceof UnsupportedFileTypeError) {
|
||||
logger.error({ err: error }, `Unsupported file type error.`);
|
||||
errorPayload = {
|
||||
errorCode: 'UNSUPPORTED_FILE_TYPE',
|
||||
message: error.message, // The message is already user-friendly
|
||||
};
|
||||
} else if (error instanceof PdfConversionError) {
|
||||
logger.error({ err: error, stderr: error.stderr }, `PDF Conversion failed.`);
|
||||
errorPayload = {
|
||||
errorCode: 'PDF_CONVERSION_FAILED',
|
||||
message:
|
||||
'The uploaded PDF could not be processed. It might be blank, corrupt, or password-protected.',
|
||||
};
|
||||
} else if (error instanceof AiDataValidationError) {
|
||||
logger.error(
|
||||
{ err: error, validationErrors: error.validationErrors, rawData: error.rawData },
|
||||
`AI Data Validation failed.`,
|
||||
);
|
||||
errorPayload = {
|
||||
errorCode: 'AI_VALIDATION_FAILED',
|
||||
message:
|
||||
"The AI couldn't read the flyer's format. Please try a clearer image or a different flyer.",
|
||||
};
|
||||
} else if (error instanceof Error) {
|
||||
logger.error(
|
||||
{ err: error, attemptsMade: job.attemptsMade, totalAttempts: job.opts.attempts },
|
||||
`A generic error occurred in job.`,
|
||||
);
|
||||
errorPayload.message = error.message;
|
||||
}
|
||||
|
||||
await job.updateProgress(errorPayload);
|
||||
throw wrappedError;
|
||||
}
|
||||
|
||||
/**
|
||||
* Orchestrates the series of steps involved in processing a flyer.
|
||||
* This "happy path" method is called by the main `processJob` method.
|
||||
* @param job The BullMQ job instance.
|
||||
* @param logger The job-specific logger.
|
||||
* @returns A promise that resolves with the new flyer's ID.
|
||||
*/
|
||||
private async _runProcessingSteps(
|
||||
job: Job<FlyerJobData>,
|
||||
logger: Logger,
|
||||
): Promise<{ flyerId: number }> {
|
||||
const { filePath } = job.data;
|
||||
const createdImagePaths: string[] = [];
|
||||
let newFlyerId: number | undefined;
|
||||
|
||||
await job.updateProgress({ message: 'Starting process...' });
|
||||
const { imagePaths, createdImagePaths: tempImagePaths } = await this._prepareImageInputs(
|
||||
filePath,
|
||||
job,
|
||||
logger,
|
||||
);
|
||||
createdImagePaths.push(...tempImagePaths);
|
||||
|
||||
await job.updateProgress({ message: 'Extracting data...' });
|
||||
const extractedData = await this._extractFlyerDataWithAI(imagePaths, job.data, logger);
|
||||
|
||||
await job.updateProgress({ message: 'Transforming data...' });
|
||||
const { flyerData, itemsForDb } = await this.transformer.transform(
|
||||
extractedData,
|
||||
imagePaths,
|
||||
job.data.originalFileName,
|
||||
job.data.checksum,
|
||||
job.data.userId,
|
||||
logger,
|
||||
);
|
||||
|
||||
await job.updateProgress({ message: 'Saving to database...' });
|
||||
const newFlyer = await this._saveProcessedFlyerData(
|
||||
flyerData,
|
||||
itemsForDb,
|
||||
job.data.userId,
|
||||
logger,
|
||||
);
|
||||
logger.info({ flyerId: newFlyer.flyer_id }, `Job processed successfully.`);
|
||||
|
||||
// On success, enqueue the cleanup job for all temporary files.
|
||||
const pathsToClean = [filePath, ...createdImagePaths];
|
||||
await this._enqueueCleanup(newFlyer.flyer_id, pathsToClean, logger);
|
||||
|
||||
return { flyerId: newFlyer.flyer_id };
|
||||
}
|
||||
|
||||
async processJob(job: Job<FlyerJobData>) {
|
||||
const { originalFileName } = job.data;
|
||||
|
||||
// Create a job-specific logger instance with context, as per ADR-004
|
||||
const logger = globalLogger.child({
|
||||
@@ -330,80 +497,62 @@ export class FlyerProcessingService {
|
||||
logger.info(`Picked up job.`);
|
||||
|
||||
try {
|
||||
await job.updateProgress({ message: 'Starting process...' });
|
||||
const { imagePaths, createdImagePaths: tempImagePaths } = await this._prepareImageInputs(
|
||||
filePath,
|
||||
job,
|
||||
logger,
|
||||
);
|
||||
createdImagePaths.push(...tempImagePaths);
|
||||
|
||||
await job.updateProgress({ message: 'Extracting data...' });
|
||||
const extractedData = await this._extractFlyerDataWithAI(imagePaths, job.data, logger);
|
||||
|
||||
await job.updateProgress({ message: 'Saving to database...' });
|
||||
const newFlyer = await this._saveProcessedFlyerData(
|
||||
extractedData,
|
||||
imagePaths,
|
||||
job.data,
|
||||
logger,
|
||||
); // Pass logger
|
||||
|
||||
newFlyerId = newFlyer.flyer_id;
|
||||
logger.info({ flyerId: newFlyerId }, `Job processed successfully.`);
|
||||
return { flyerId: newFlyer.flyer_id };
|
||||
return await this._runProcessingSteps(job, logger);
|
||||
} catch (error: unknown) {
|
||||
// Define a structured error payload for job progress updates.
|
||||
// This allows the frontend to provide more specific feedback.
|
||||
let errorPayload = {
|
||||
errorCode: 'UNKNOWN_ERROR',
|
||||
message: 'An unexpected error occurred during processing.',
|
||||
};
|
||||
// On failure, explicitly log that we are not cleaning up files to allow for manual inspection.
|
||||
logger.warn(
|
||||
`Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.`,
|
||||
);
|
||||
// Delegate all error handling to a separate, testable method.
|
||||
await this._reportErrorAndThrow(error, job, logger);
|
||||
}
|
||||
}
|
||||
|
||||
if (error instanceof UnsupportedFileTypeError) {
|
||||
logger.error({ err: error }, `Unsupported file type error.`);
|
||||
errorPayload = {
|
||||
errorCode: 'UNSUPPORTED_FILE_TYPE',
|
||||
message: error.message, // The message is already user-friendly
|
||||
};
|
||||
} else if (error instanceof PdfConversionError) {
|
||||
logger.error({ err: error, stderr: error.stderr }, `PDF Conversion failed.`);
|
||||
errorPayload = {
|
||||
errorCode: 'PDF_CONVERSION_FAILED',
|
||||
message:
|
||||
'The uploaded PDF could not be processed. It might be blank, corrupt, or password-protected.',
|
||||
};
|
||||
} else if (error instanceof AiDataValidationError) {
|
||||
logger.error(
|
||||
{ err: error, validationErrors: error.validationErrors, rawData: error.rawData },
|
||||
`AI Data Validation failed.`,
|
||||
);
|
||||
errorPayload = {
|
||||
errorCode: 'AI_VALIDATION_FAILED',
|
||||
message:
|
||||
"The AI couldn't read the flyer's format. Please try a clearer image or a different flyer.",
|
||||
};
|
||||
} else if (error instanceof Error) {
|
||||
logger.error(
|
||||
{ err: error, attemptsMade: job.attemptsMade, totalAttempts: job.opts.attempts },
|
||||
`A generic error occurred in job.`,
|
||||
);
|
||||
// For generic errors, we can pass the message along, but still use a code.
|
||||
errorPayload.message = error.message;
|
||||
async processCleanupJob(job: Job<CleanupJobData>) {
|
||||
const { flyerId, paths } = job.data;
|
||||
const logger = globalLogger.child({
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
flyerId,
|
||||
});
|
||||
|
||||
logger.info({ paths }, `Picked up file cleanup job.`);
|
||||
|
||||
try {
|
||||
if (!paths || paths.length === 0) {
|
||||
logger.warn(`Job received no paths to clean. Skipping.`);
|
||||
return { status: 'skipped', reason: 'no paths' };
|
||||
}
|
||||
|
||||
// Update the job's progress with the structured error payload.
|
||||
await job.updateProgress(errorPayload);
|
||||
throw error;
|
||||
} finally {
|
||||
if (newFlyerId) {
|
||||
const pathsToClean = [filePath, ...createdImagePaths];
|
||||
await this._enqueueCleanup(newFlyerId, pathsToClean, logger);
|
||||
} else {
|
||||
logger.warn(
|
||||
`Job failed. Temporary files will NOT be cleaned up to allow for manual inspection.`,
|
||||
);
|
||||
for (const filePath of paths) {
|
||||
try {
|
||||
await this.fs.unlink(filePath);
|
||||
logger.info(`Deleted temporary file: ${filePath}`);
|
||||
} catch (unlinkError: unknown) {
|
||||
if (
|
||||
unlinkError instanceof Error &&
|
||||
'code' in unlinkError &&
|
||||
(unlinkError as NodeJS.ErrnoException).code === 'ENOENT'
|
||||
) {
|
||||
logger.warn(`File not found during cleanup (already deleted?): ${filePath}`);
|
||||
} else {
|
||||
// Re-throw other errors to be caught by the outer catch block
|
||||
throw unlinkError;
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(`Successfully cleaned up ${paths.length} file(s).`);
|
||||
return { status: 'success', deletedCount: paths.length };
|
||||
} catch (error) {
|
||||
const wrappedError = error instanceof Error ? error : new Error(String(error));
|
||||
logger.error(
|
||||
{
|
||||
err: wrappedError,
|
||||
attemptsMade: job.attemptsMade,
|
||||
},
|
||||
`File cleanup job failed.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,13 +3,23 @@
|
||||
/**
|
||||
* Base class for all flyer processing errors.
|
||||
* This allows for catching all processing-related errors with a single `catch` block.
|
||||
* Each custom error should define its own `errorCode` and a user-friendly `message`.
|
||||
*/
|
||||
export class FlyerProcessingError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
public errorCode: string;
|
||||
public userMessage: string;
|
||||
|
||||
constructor(message: string, errorCode: string = 'UNKNOWN_ERROR', userMessage?: string) {
|
||||
super(message); // The 'message' property of Error is for internal/developer use.
|
||||
this.name = this.constructor.name;
|
||||
this.errorCode = errorCode;
|
||||
this.userMessage = userMessage || message; // User-friendly message for UI
|
||||
Object.setPrototypeOf(this, new.target.prototype);
|
||||
}
|
||||
|
||||
toErrorPayload(): { errorCode: string; message: string; [key: string]: any } {
|
||||
return { errorCode: this.errorCode, message: this.userMessage };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -18,9 +28,17 @@ export class FlyerProcessingError extends Error {
|
||||
export class PdfConversionError extends FlyerProcessingError {
|
||||
public stderr?: string;
|
||||
constructor(message: string, stderr?: string) {
|
||||
super(message);
|
||||
super(
|
||||
message,
|
||||
'PDF_CONVERSION_FAILED',
|
||||
'The uploaded PDF could not be processed. It might be blank, corrupt, or password-protected.',
|
||||
);
|
||||
this.stderr = stderr;
|
||||
}
|
||||
|
||||
toErrorPayload(): { errorCode: string; message: string; [key: string]: any } {
|
||||
return { ...super.toErrorPayload(), stderr: this.stderr };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -32,7 +50,15 @@ export class AiDataValidationError extends FlyerProcessingError {
|
||||
public validationErrors: object,
|
||||
public rawData: unknown,
|
||||
) {
|
||||
super(message);
|
||||
super(
|
||||
message,
|
||||
'AI_VALIDATION_FAILED',
|
||||
"The AI couldn't read the flyer's format. Please try a clearer image or a different flyer.",
|
||||
);
|
||||
}
|
||||
|
||||
toErrorPayload(): { errorCode: string; message: string; [key: string]: any } {
|
||||
return { ...super.toErrorPayload(), validationErrors: this.validationErrors, rawData: this.rawData };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +67,7 @@ export class AiDataValidationError extends FlyerProcessingError {
|
||||
*/
|
||||
export class GeocodingFailedError extends FlyerProcessingError {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
super(message, 'GEOCODING_FAILED', 'Failed to geocode the address.');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +76,6 @@ export class GeocodingFailedError extends FlyerProcessingError {
|
||||
*/
|
||||
export class UnsupportedFileTypeError extends FlyerProcessingError {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
super(message, 'UNSUPPORTED_FILE_TYPE', message); // The message is already user-friendly.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,12 +8,17 @@ const mocks = vi.hoisted(() => {
|
||||
const capturedProcessors: Record<string, (job: Job) => Promise<unknown>> = {};
|
||||
|
||||
return {
|
||||
sendEmail: vi.fn(),
|
||||
unlink: vi.fn(),
|
||||
// Service method mocks
|
||||
processFlyerJob: vi.fn(),
|
||||
processCleanupJob: vi.fn(),
|
||||
processEmailJob: vi.fn(),
|
||||
processDailyReportJob: vi.fn(),
|
||||
processWeeklyReportJob: vi.fn(),
|
||||
processTokenCleanupJob: vi.fn(),
|
||||
|
||||
// Test utilities
|
||||
capturedProcessors,
|
||||
deleteExpiredResetTokens: vi.fn(),
|
||||
// Mock the Worker constructor to capture the processor function. It must be a
|
||||
// Mock the Worker constructor to capture the processor function. It must be a`
|
||||
// `function` and not an arrow function so it can be called with `new`.
|
||||
MockWorker: vi.fn(function (name: string, processor: (job: Job) => Promise<unknown>) {
|
||||
if (processor) {
|
||||
@@ -26,23 +31,20 @@ const mocks = vi.hoisted(() => {
|
||||
});
|
||||
|
||||
// --- Mock Modules ---
|
||||
vi.mock('./emailService.server', async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import('./emailService.server')>();
|
||||
return {
|
||||
...actual,
|
||||
// We only need to mock the specific function being called by the worker.
|
||||
// The rest of the module can retain its original implementation if needed elsewhere.
|
||||
sendEmail: mocks.sendEmail,
|
||||
};
|
||||
});
|
||||
vi.mock('./emailService.server', () => ({
|
||||
processEmailJob: mocks.processEmailJob,
|
||||
}));
|
||||
|
||||
// The workers use an `fsAdapter`. We can mock the underlying `fsPromises`
|
||||
// that the adapter is built from in queueService.server.ts.
|
||||
vi.mock('node:fs/promises', () => ({
|
||||
default: {
|
||||
unlink: mocks.unlink,
|
||||
// Add other fs functions if needed by other tests
|
||||
readdir: vi.fn(),
|
||||
vi.mock('./analyticsService.server', () => ({
|
||||
analyticsService: {
|
||||
processDailyReportJob: mocks.processDailyReportJob,
|
||||
processWeeklyReportJob: mocks.processWeeklyReportJob,
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('./userService', () => ({
|
||||
userService: {
|
||||
processTokenCleanupJob: mocks.processTokenCleanupJob,
|
||||
},
|
||||
}));
|
||||
|
||||
@@ -56,26 +58,22 @@ vi.mock('./logger.server', () => ({
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock('./db/index.db', () => ({
|
||||
userRepo: {
|
||||
deleteExpiredResetTokens: mocks.deleteExpiredResetTokens,
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock bullmq to capture the processor functions passed to the Worker constructor
|
||||
import { logger as mockLogger } from './logger.server';
|
||||
vi.mock('bullmq', () => ({
|
||||
Worker: mocks.MockWorker,
|
||||
// FIX: Use a standard function for the mock constructor to allow `new Queue(...)` to work.
|
||||
Queue: vi.fn(function () {
|
||||
return { add: vi.fn() };
|
||||
}),
|
||||
// Add UnrecoverableError to the mock so it can be used in tests
|
||||
UnrecoverableError: class UnrecoverableError extends Error {},
|
||||
}));
|
||||
|
||||
// Mock flyerProcessingService.server as flyerWorker depends on it
|
||||
// Mock flyerProcessingService.server as flyerWorker and cleanupWorker depend on it
|
||||
vi.mock('./flyerProcessingService.server', () => ({
|
||||
FlyerProcessingService: class {
|
||||
processJob = mocks.processFlyerJob;
|
||||
processCleanupJob = mocks.processCleanupJob;
|
||||
},
|
||||
}));
|
||||
|
||||
@@ -110,15 +108,16 @@ describe('Queue Workers', () => {
|
||||
let tokenCleanupProcessor: (job: Job) => Promise<unknown>;
|
||||
|
||||
beforeEach(async () => {
|
||||
// Reset default mock implementations for hoisted mocks
|
||||
mocks.processFlyerJob.mockResolvedValue({ flyerId: 123 });
|
||||
mocks.processCleanupJob.mockResolvedValue({ status: 'success' });
|
||||
mocks.processEmailJob.mockResolvedValue(undefined);
|
||||
mocks.processDailyReportJob.mockResolvedValue({ status: 'success' });
|
||||
mocks.processWeeklyReportJob.mockResolvedValue({ status: 'success' });
|
||||
mocks.processTokenCleanupJob.mockResolvedValue({ deletedCount: 5 });
|
||||
|
||||
vi.clearAllMocks();
|
||||
vi.resetModules();
|
||||
|
||||
// Reset default mock implementations for hoisted mocks
|
||||
mocks.sendEmail.mockResolvedValue(undefined);
|
||||
mocks.unlink.mockResolvedValue(undefined);
|
||||
mocks.processFlyerJob.mockResolvedValue({ flyerId: 123 }); // Default success for flyer processing
|
||||
mocks.deleteExpiredResetTokens.mockResolvedValue(5);
|
||||
|
||||
await import('./workers.server');
|
||||
|
||||
flyerProcessor = mocks.capturedProcessors['flyer-processing'];
|
||||
@@ -155,10 +154,24 @@ describe('Queue Workers', () => {
|
||||
|
||||
await expect(flyerProcessor(job)).rejects.toThrow('Flyer processing failed');
|
||||
});
|
||||
|
||||
it('should re-throw UnrecoverableError from the service layer', async () => {
|
||||
const { UnrecoverableError } = await import('bullmq');
|
||||
const job = createMockJob({
|
||||
filePath: '/tmp/fail.pdf',
|
||||
originalFileName: 'fail.pdf',
|
||||
checksum: 'def',
|
||||
});
|
||||
const unrecoverableError = new UnrecoverableError('Quota exceeded');
|
||||
mocks.processFlyerJob.mockRejectedValue(unrecoverableError);
|
||||
|
||||
// The worker should just let this specific error type pass through.
|
||||
await expect(flyerProcessor(job)).rejects.toThrow(unrecoverableError);
|
||||
});
|
||||
});
|
||||
|
||||
describe('emailWorker', () => {
|
||||
it('should call emailService.sendEmail with the job data', async () => {
|
||||
it('should call emailService.processEmailJob with the job', async () => {
|
||||
const jobData = {
|
||||
to: 'test@example.com',
|
||||
subject: 'Test Email',
|
||||
@@ -166,173 +179,84 @@ describe('Queue Workers', () => {
|
||||
text: 'Hello',
|
||||
};
|
||||
const job = createMockJob(jobData);
|
||||
|
||||
await emailProcessor(job);
|
||||
|
||||
expect(mocks.sendEmail).toHaveBeenCalledTimes(1);
|
||||
// The implementation passes the logger as the second argument
|
||||
expect(mocks.sendEmail).toHaveBeenCalledWith(jobData, expect.anything());
|
||||
expect(mocks.processEmailJob).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.processEmailJob).toHaveBeenCalledWith(job);
|
||||
});
|
||||
|
||||
it('should log and re-throw an error if sendEmail fails with a non-Error object', async () => {
|
||||
const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' });
|
||||
const emailError = 'SMTP server is down'; // Reject with a string
|
||||
mocks.sendEmail.mockRejectedValue(emailError);
|
||||
|
||||
await expect(emailProcessor(job)).rejects.toThrow(emailError);
|
||||
|
||||
// The worker should wrap the string in an Error object for logging
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
{ err: new Error(emailError), jobData: job.data },
|
||||
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
});
|
||||
|
||||
it('should re-throw an error if sendEmail fails', async () => {
|
||||
it('should re-throw an error if processEmailJob fails', async () => {
|
||||
const job = createMockJob({ to: 'fail@example.com', subject: 'fail', html: '', text: '' });
|
||||
const emailError = new Error('SMTP server is down');
|
||||
mocks.sendEmail.mockRejectedValue(emailError);
|
||||
|
||||
mocks.processEmailJob.mockRejectedValue(emailError);
|
||||
await expect(emailProcessor(job)).rejects.toThrow('SMTP server is down');
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
{ err: emailError, jobData: job.data },
|
||||
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('analyticsWorker', () => {
|
||||
it('should complete successfully for a valid report date', async () => {
|
||||
vi.useFakeTimers();
|
||||
it('should call analyticsService.processDailyReportJob with the job', async () => {
|
||||
const job = createMockJob({ reportDate: '2024-01-01' });
|
||||
|
||||
const promise = analyticsProcessor(job);
|
||||
// Advance timers to simulate the 10-second task completing
|
||||
await vi.advanceTimersByTimeAsync(10000);
|
||||
await promise; // Wait for the promise to resolve
|
||||
|
||||
// No error should be thrown
|
||||
expect(true).toBe(true);
|
||||
vi.useRealTimers();
|
||||
await analyticsProcessor(job);
|
||||
expect(mocks.processDailyReportJob).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.processDailyReportJob).toHaveBeenCalledWith(job);
|
||||
});
|
||||
|
||||
it('should throw an error if reportDate is "FAIL"', async () => {
|
||||
it('should re-throw an error if processDailyReportJob fails', async () => {
|
||||
const job = createMockJob({ reportDate: 'FAIL' });
|
||||
|
||||
await expect(analyticsProcessor(job)).rejects.toThrow(
|
||||
'This is a test failure for the analytics job.',
|
||||
);
|
||||
const analyticsError = new Error('Analytics processing failed');
|
||||
mocks.processDailyReportJob.mockRejectedValue(analyticsError);
|
||||
await expect(analyticsProcessor(job)).rejects.toThrow('Analytics processing failed');
|
||||
});
|
||||
});
|
||||
|
||||
describe('cleanupWorker', () => {
|
||||
it('should call unlink for each path provided in the job data', async () => {
|
||||
it('should call flyerProcessingService.processCleanupJob with the job', async () => {
|
||||
const jobData = {
|
||||
flyerId: 123,
|
||||
paths: ['/tmp/file1.jpg', '/tmp/file2.pdf'],
|
||||
};
|
||||
const job = createMockJob(jobData);
|
||||
mocks.unlink.mockResolvedValue(undefined);
|
||||
|
||||
await cleanupProcessor(job);
|
||||
|
||||
expect(mocks.unlink).toHaveBeenCalledTimes(2);
|
||||
expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file1.jpg');
|
||||
expect(mocks.unlink).toHaveBeenCalledWith('/tmp/file2.pdf');
|
||||
expect(mocks.processCleanupJob).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.processCleanupJob).toHaveBeenCalledWith(job);
|
||||
});
|
||||
|
||||
it('should not throw an error if a file is already deleted (ENOENT)', async () => {
|
||||
const jobData = {
|
||||
flyerId: 123,
|
||||
paths: ['/tmp/existing.jpg', '/tmp/already-deleted.jpg'],
|
||||
};
|
||||
it('should re-throw an error if processCleanupJob fails', async () => {
|
||||
const jobData = { flyerId: 123, paths: ['/tmp/protected-file.jpg'] };
|
||||
const job = createMockJob(jobData);
|
||||
// Use the built-in NodeJS.ErrnoException type for mock system errors.
|
||||
const enoentError: NodeJS.ErrnoException = new Error('File not found');
|
||||
enoentError.code = 'ENOENT';
|
||||
|
||||
// First call succeeds, second call fails with ENOENT
|
||||
mocks.unlink.mockResolvedValueOnce(undefined).mockRejectedValueOnce(enoentError);
|
||||
|
||||
// The processor should complete without throwing
|
||||
await expect(cleanupProcessor(job)).resolves.toBeUndefined();
|
||||
|
||||
expect(mocks.unlink).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should re-throw an error for issues other than ENOENT (e.g., permissions)', async () => {
|
||||
const jobData = {
|
||||
flyerId: 123,
|
||||
paths: ['/tmp/protected-file.jpg'],
|
||||
};
|
||||
const job = createMockJob(jobData);
|
||||
// Use the built-in NodeJS.ErrnoException type for mock system errors.
|
||||
const permissionError: NodeJS.ErrnoException = new Error('Permission denied');
|
||||
permissionError.code = 'EACCES';
|
||||
|
||||
mocks.unlink.mockRejectedValue(permissionError);
|
||||
|
||||
const cleanupError = new Error('Permission denied');
|
||||
mocks.processCleanupJob.mockRejectedValue(cleanupError);
|
||||
await expect(cleanupProcessor(job)).rejects.toThrow('Permission denied');
|
||||
|
||||
// Verify the error was logged by the worker's catch block
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
{ err: permissionError },
|
||||
expect.stringContaining(
|
||||
`[CleanupWorker] Job ${job.id} for flyer ${job.data.flyerId} failed.`,
|
||||
),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('weeklyAnalyticsWorker', () => {
|
||||
it('should complete successfully for a valid report date', async () => {
|
||||
vi.useFakeTimers();
|
||||
it('should call analyticsService.processWeeklyReportJob with the job', async () => {
|
||||
const job = createMockJob({ reportYear: 2024, reportWeek: 1 });
|
||||
|
||||
const promise = weeklyAnalyticsProcessor(job);
|
||||
// Advance timers to simulate the 30-second task completing
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
await promise; // Wait for the promise to resolve
|
||||
|
||||
// No error should be thrown
|
||||
expect(true).toBe(true);
|
||||
vi.useRealTimers();
|
||||
await weeklyAnalyticsProcessor(job);
|
||||
expect(mocks.processWeeklyReportJob).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.processWeeklyReportJob).toHaveBeenCalledWith(job);
|
||||
});
|
||||
|
||||
it('should re-throw an error if the job fails', async () => {
|
||||
vi.useFakeTimers();
|
||||
it('should re-throw an error if processWeeklyReportJob fails', async () => {
|
||||
const job = createMockJob({ reportYear: 2024, reportWeek: 1 });
|
||||
// Mock the internal logic to throw an error
|
||||
const originalSetTimeout = setTimeout;
|
||||
vi.spyOn(global, 'setTimeout').mockImplementation((callback, ms) => {
|
||||
if (ms === 30000) {
|
||||
// Target the simulated delay
|
||||
throw new Error('Weekly analytics job failed');
|
||||
}
|
||||
return originalSetTimeout(callback, ms);
|
||||
});
|
||||
|
||||
const weeklyError = new Error('Weekly analytics job failed');
|
||||
mocks.processWeeklyReportJob.mockRejectedValue(weeklyError);
|
||||
await expect(weeklyAnalyticsProcessor(job)).rejects.toThrow('Weekly analytics job failed');
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks(); // Restore setTimeout mock
|
||||
});
|
||||
});
|
||||
|
||||
describe('tokenCleanupWorker', () => {
|
||||
it('should call userRepo.deleteExpiredResetTokens and return the count', async () => {
|
||||
it('should call userService.processTokenCleanupJob with the job', async () => {
|
||||
const job = createMockJob({ timestamp: new Date().toISOString() });
|
||||
mocks.deleteExpiredResetTokens.mockResolvedValue(10);
|
||||
|
||||
const result = await tokenCleanupProcessor(job);
|
||||
|
||||
expect(mocks.deleteExpiredResetTokens).toHaveBeenCalledTimes(1);
|
||||
expect(result).toEqual({ deletedCount: 10 });
|
||||
await tokenCleanupProcessor(job);
|
||||
expect(mocks.processTokenCleanupJob).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.processTokenCleanupJob).toHaveBeenCalledWith(job);
|
||||
});
|
||||
|
||||
it('should re-throw an error if the database call fails', async () => {
|
||||
it('should re-throw an error if processTokenCleanupJob fails', async () => {
|
||||
const job = createMockJob({ timestamp: new Date().toISOString() });
|
||||
const dbError = new Error('DB cleanup failed');
|
||||
mocks.deleteExpiredResetTokens.mockRejectedValue(dbError);
|
||||
mocks.processTokenCleanupJob.mockRejectedValue(dbError);
|
||||
await expect(tokenCleanupProcessor(job)).rejects.toThrow(dbError);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
// src/services/userService.ts
|
||||
import * as db from './db/index.db';
|
||||
import type { Job } from 'bullmq';
|
||||
import type { Logger } from 'pino';
|
||||
import { AddressRepository } from './db/address.db';
|
||||
import { UserRepository } from './db/user.db';
|
||||
import type { Address, UserProfile } from '../types';
|
||||
import { logger as globalLogger } from './logger.server';
|
||||
import type { TokenCleanupJobData } from './queues.server';
|
||||
|
||||
/**
|
||||
* Encapsulates user-related business logic that may involve multiple repository calls.
|
||||
@@ -44,6 +47,35 @@ class UserService {
|
||||
return addressId;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a job to clean up expired password reset tokens from the database.
|
||||
* @param job The BullMQ job object.
|
||||
* @returns An object containing the count of deleted tokens.
|
||||
*/
|
||||
async processTokenCleanupJob(
|
||||
job: Job<TokenCleanupJobData>,
|
||||
): Promise<{ deletedCount: number }> {
|
||||
const logger = globalLogger.child({
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
});
|
||||
|
||||
logger.info('Picked up expired token cleanup job.');
|
||||
|
||||
try {
|
||||
const deletedCount = await db.userRepo.deleteExpiredResetTokens(logger);
|
||||
logger.info(`Successfully deleted ${deletedCount} expired tokens.`);
|
||||
return { deletedCount };
|
||||
} catch (error) {
|
||||
const wrappedError = error instanceof Error ? error : new Error(String(error));
|
||||
logger.error(
|
||||
{ err: wrappedError, attemptsMade: job.attemptsMade },
|
||||
'Expired token cleanup job failed.',
|
||||
);
|
||||
throw wrappedError;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const userService = new UserService();
|
||||
|
||||
@@ -6,6 +6,8 @@ import { promisify } from 'util';
|
||||
import { logger } from './logger.server';
|
||||
import { connection } from './redis.server';
|
||||
import { aiService } from './aiService.server';
|
||||
import { analyticsService } from './analyticsService.server';
|
||||
import { userService } from './userService';
|
||||
import * as emailService from './emailService.server';
|
||||
import * as db from './db/index.db';
|
||||
import {
|
||||
@@ -69,20 +71,11 @@ export const flyerWorker = new Worker<FlyerJobData>(
|
||||
try {
|
||||
return await flyerProcessingService.processJob(job);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
const errorMessage = wrappedError.message || '';
|
||||
if (
|
||||
errorMessage.includes('quota') ||
|
||||
errorMessage.includes('429') ||
|
||||
errorMessage.includes('RESOURCE_EXHAUSTED')
|
||||
) {
|
||||
logger.error(
|
||||
{ err: wrappedError, jobId: job.id },
|
||||
'[FlyerWorker] Unrecoverable quota error detected. Failing job immediately.',
|
||||
);
|
||||
throw new UnrecoverableError(errorMessage);
|
||||
}
|
||||
throw error;
|
||||
// The service layer now handles identifying unrecoverable errors and throws
|
||||
// a `BullMQ.UnrecoverableError` which the worker respects. This catch block
|
||||
// simply ensures any non-Error exceptions are properly wrapped before being
|
||||
// re-thrown for BullMQ to handle retries.
|
||||
throw normalizeError(error);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -94,20 +87,13 @@ export const flyerWorker = new Worker<FlyerJobData>(
|
||||
export const emailWorker = new Worker<EmailJobData>(
|
||||
'email-sending',
|
||||
async (job: Job<EmailJobData>) => {
|
||||
const { to, subject } = job.data;
|
||||
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
||||
jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`);
|
||||
try {
|
||||
await emailService.sendEmail(job.data, jobLogger);
|
||||
// Delegate all logic to the service layer
|
||||
return await emailService.processEmailJob(job);
|
||||
} catch (error: unknown) {
|
||||
// The service layer now handles logging. This block just ensures
|
||||
// any unexpected errors are normalized before BullMQ handles them.
|
||||
const wrappedError = normalizeError(error);
|
||||
logger.error(
|
||||
{
|
||||
err: wrappedError,
|
||||
jobData: job.data,
|
||||
},
|
||||
`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
}
|
||||
},
|
||||
@@ -120,20 +106,12 @@ export const emailWorker = new Worker<EmailJobData>(
|
||||
export const analyticsWorker = new Worker<AnalyticsJobData>(
|
||||
'analytics-reporting',
|
||||
async (job: Job<AnalyticsJobData>) => {
|
||||
const { reportDate } = job.data;
|
||||
logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`);
|
||||
try {
|
||||
if (reportDate === 'FAIL') {
|
||||
throw new Error('This is a test failure for the analytics job.');
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 10000));
|
||||
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
|
||||
return await analyticsService.processDailyReportJob(job);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
logger.error({ err: wrappedError, jobData: job.data },
|
||||
`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
// The service layer now handles logging. This block just ensures
|
||||
// any unexpected errors are normalized before BullMQ handles them.
|
||||
throw normalizeError(error);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -145,48 +123,12 @@ export const analyticsWorker = new Worker<AnalyticsJobData>(
|
||||
export const cleanupWorker = new Worker<CleanupJobData>(
|
||||
'file-cleanup',
|
||||
async (job: Job<CleanupJobData>) => {
|
||||
const { flyerId, paths } = job.data;
|
||||
logger.info(
|
||||
{ paths },
|
||||
`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`,
|
||||
);
|
||||
|
||||
try {
|
||||
if (!paths || paths.length === 0) {
|
||||
logger.warn(
|
||||
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} received no paths to clean. Skipping.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const filePath of paths) {
|
||||
try {
|
||||
await fsAdapter.unlink(filePath);
|
||||
logger.info(`[CleanupWorker] Deleted temporary file: ${filePath}`);
|
||||
} catch (unlinkError: unknown) {
|
||||
if (
|
||||
unlinkError instanceof Error &&
|
||||
'code' in unlinkError &&
|
||||
(unlinkError as any).code === 'ENOENT'
|
||||
) {
|
||||
logger.warn(
|
||||
`[CleanupWorker] File not found during cleanup (already deleted?): ${filePath}`,
|
||||
);
|
||||
} else {
|
||||
throw unlinkError;
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(
|
||||
`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`,
|
||||
);
|
||||
return await flyerProcessingService.processCleanupJob(job);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
logger.error(
|
||||
{ err: wrappedError },
|
||||
`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
// The service layer now handles logging. This block just ensures
|
||||
// any unexpected errors are normalized before BullMQ handles them.
|
||||
throw normalizeError(error);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -198,23 +140,12 @@ export const cleanupWorker = new Worker<CleanupJobData>(
|
||||
export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
|
||||
'weekly-analytics-reporting',
|
||||
async (job: Job<WeeklyAnalyticsJobData>) => {
|
||||
const { reportYear, reportWeek } = job.data;
|
||||
logger.info(
|
||||
{ reportYear, reportWeek },
|
||||
`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`,
|
||||
);
|
||||
try {
|
||||
await new Promise((resolve) => setTimeout(resolve, 30000));
|
||||
logger.info(
|
||||
`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`,
|
||||
);
|
||||
return await analyticsService.processWeeklyReportJob(job);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
logger.error(
|
||||
{ err: wrappedError, jobData: job.data },
|
||||
`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`,
|
||||
);
|
||||
throw wrappedError;
|
||||
// The service layer now handles logging. This block just ensures
|
||||
// any unexpected errors are normalized before BullMQ handles them.
|
||||
throw normalizeError(error);
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -226,16 +157,12 @@ export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
|
||||
export const tokenCleanupWorker = new Worker<TokenCleanupJobData>(
|
||||
'token-cleanup',
|
||||
async (job: Job<TokenCleanupJobData>) => {
|
||||
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
|
||||
jobLogger.info('[TokenCleanupWorker] Starting cleanup of expired password reset tokens.');
|
||||
try {
|
||||
const deletedCount = await db.userRepo.deleteExpiredResetTokens(jobLogger);
|
||||
jobLogger.info(`[TokenCleanupWorker] Successfully deleted ${deletedCount} expired tokens.`);
|
||||
return { deletedCount };
|
||||
return await userService.processTokenCleanupJob(job);
|
||||
} catch (error: unknown) {
|
||||
const wrappedError = normalizeError(error);
|
||||
jobLogger.error({ err: wrappedError }, `[TokenCleanupWorker] Job ${job.id} failed.`);
|
||||
throw wrappedError;
|
||||
// The service layer now handles logging. This block just ensures
|
||||
// any unexpected errors are normalized before BullMQ handles them.
|
||||
throw normalizeError(error);
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user