// src/services/backgroundJobService.ts import cron from 'node-cron'; import type { Logger } from 'pino'; import type { Queue } from 'bullmq'; import { formatCurrency } from '../utils/formatUtils'; import { getSimpleWeekAndYear, getCurrentDateISOString } from '../utils/dateUtils'; import type { Notification, WatchedItemDeal } from '../types'; // Import types for repositories from their source files import type { PersonalizationRepository } from './db/personalization.db'; import type { NotificationRepository } from './db/notification.db'; import { analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue } from './queueService.server'; type UserDealGroup = { userProfile: { user_id: string; email: string; full_name: string | null }; deals: WatchedItemDeal[]; }; interface EmailJobData { to: string; subject: string; html: string; text: string; } export class BackgroundJobService { constructor( private personalizationRepo: PersonalizationRepository, private notificationRepo: NotificationRepository, // Use the imported type here private emailQueue: Queue, private logger: Logger, ) {} public async triggerAnalyticsReport(): Promise { const reportDate = getCurrentDateISOString(); // YYYY-MM-DD const jobId = `manual-report-${reportDate}-${Date.now()}`; const job = await analyticsQueue.add('generate-daily-report', { reportDate }, { jobId }); if (!job.id) { throw new Error('Failed to enqueue daily report job: No job ID returned'); } return job.id; } public async triggerWeeklyAnalyticsReport(): Promise { const { year: reportYear, week: reportWeek } = getSimpleWeekAndYear(); const jobId = `manual-weekly-report-${reportYear}-${reportWeek}-${Date.now()}`; const job = await weeklyAnalyticsQueue.add( 'generate-weekly-report', { reportYear, reportWeek }, { jobId }, ); if (!job.id) { throw new Error('Failed to enqueue weekly report job: No job ID returned'); } return job.id; } public async triggerTokenCleanup(): Promise { const timestamp = new Date().toISOString(); const jobId = `manual-token-cleanup-${Date.now()}`; const job = await tokenCleanupQueue.add('cleanup-tokens', { timestamp }, { jobId }); if (!job.id) { throw new Error('Failed to enqueue token cleanup job: No job ID returned'); } return job.id; } /** * Prepares the data for an email notification job based on a user's deals. * @param user The user to whom the email will be sent. * @param deals The list of deals found for the user. * @returns An object containing the email job data and a unique job ID. */ private _prepareDealEmail( userProfile: { user_id: string; email: string; full_name: string | null }, deals: WatchedItemDeal[], ): { jobData: EmailJobData; jobId: string } { const recipientName = userProfile.full_name || 'there'; const subject = `New Deals Found on Your Watched Items!`; const dealsListHtml = deals .map( (deal) => `
  • ${deal.item_name} is on sale for ${formatCurrency( deal.best_price_in_cents, )} at ${deal.store.name}!
  • `, ) .join(''); const html = `

    Hi ${recipientName},

    We found some great deals on items you're watching:

      ${dealsListHtml}
    `; const text = `Hi ${recipientName},\n\nWe found some great deals on items you're watching. Visit the deals page on the site to learn more.`; // Use a predictable Job ID to prevent duplicate email notifications for the same user on the same day. const today = getCurrentDateISOString(); const jobId = `deal-email-${userProfile.user_id}-${today}`; return { jobData: { to: userProfile.email, subject, html, text }, jobId, }; } /** * Prepares the data for an in-app notification. * @param userId The ID of the user to notify. * @param dealCount The number of deals found. * @returns The notification object ready for database insertion. */ private _prepareInAppNotification( userId: string, dealCount: number, ): Omit { return { user_id: userId, content: `You have ${dealCount} new deal(s) on your watched items!`, link_url: '/dashboard/deals', // A link to the future "My Deals" page }; } private async _processDealsForUser({ userProfile, deals, }: UserDealGroup): Promise | null> { try { this.logger.info( `[BackgroundJob] Found ${deals.length} deals for user ${userProfile.user_id}.`, ); // Prepare in-app and email notifications. const notification = this._prepareInAppNotification(userProfile.user_id, deals.length); const { jobData, jobId } = this._prepareDealEmail(userProfile, deals); // Enqueue an email notification job. await this.emailQueue.add('send-deal-notification', jobData, { jobId }); // Send real-time WebSocket notification (ADR-022) const { websocketService } = await import('./websocketService.server'); websocketService.broadcastDealNotification(userProfile.user_id, { user_id: userProfile.user_id, deals: deals.map((deal) => ({ item_name: deal.item_name, best_price_in_cents: deal.best_price_in_cents, store_name: deal.store.name, store_id: deal.store.store_id, })), message: `You have ${deals.length} new deal(s) on your watched items!`, }); this.logger.info( `[BackgroundJob] Sent WebSocket notification to user ${userProfile.user_id}`, ); // Return the notification to be collected for bulk insertion. return notification; } catch (userError) { this.logger.error( { err: userError }, `[BackgroundJob] Failed to process deals for user ${userProfile.user_id}`, ); return null; // Return null on error for this user. } } /** * Checks for new deals on watched items for all users and sends notifications. * This function is designed to be run periodically (e.g., daily). */ async runDailyDealCheck(): Promise { this.logger.info('[BackgroundJob] Starting daily deal check for all users...'); try { // 1. Get all deals for all users in a single, efficient query. const allDeals = await this.personalizationRepo.getBestSalePricesForAllUsers(this.logger); if (allDeals.length === 0) { this.logger.info('[BackgroundJob] No deals found for any watched items. Skipping.'); return; } this.logger.info(`[BackgroundJob] Found ${allDeals.length} total deals across all users.`); // 2. Group deals by user in memory. const dealsByUser = new Map(); for (const deal of allDeals) { let userGroup = dealsByUser.get(deal.user_id); if (!userGroup) { userGroup = { userProfile: { user_id: deal.user_id, email: deal.email, full_name: deal.full_name }, deals: [], }; dealsByUser.set(deal.user_id, userGroup); } userGroup.deals.push(deal); } // 3. Process each user's deals in parallel. const userProcessingPromises = Array.from(dealsByUser.values()).map((userGroup) => this._processDealsForUser(userGroup), ); // Wait for all user processing to complete. const results = await Promise.allSettled(userProcessingPromises); // 6. Collect all successfully created notifications. const successfulNotifications = results .filter( ( result, ): result is PromiseFulfilledResult< Omit > => result.status === 'fulfilled' && !!result.value, ) .map((result) => result.value); // 7. Bulk insert all in-app notifications in a single query. if (successfulNotifications.length > 0) { const notificationsForDb = successfulNotifications.map((n) => ({ ...n, updated_at: new Date().toISOString(), })); await this.notificationRepo.createBulkNotifications(notificationsForDb, this.logger); this.logger.info( `[BackgroundJob] Successfully created ${successfulNotifications.length} in-app notifications.`, ); } this.logger.info('[BackgroundJob] Daily deal check completed successfully.'); } catch (error) { this.logger.error( { err: error }, '[BackgroundJob] A critical error occurred during the daily deal check', ); // Re-throw the error so the cron wrapper knows it failed. throw error; } } } // A simple in-memory lock to prevent job overlaps. let isDailyDealCheckRunning = false; /** * Initializes and starts the cron job for daily deal checks. * This should be called once when the server starts. * @param backgroundJobService An instance of BackgroundJobService. * @param analyticsQueue An instance of the BullMQ analytics queue. */ export function startBackgroundJobs( backgroundJobService: BackgroundJobService, analyticsQueue: Queue, weeklyAnalyticsQueue: Queue, tokenCleanupQueue: Queue, logger: Logger, ): void { try { // Schedule the deal check job to run once every day at 2:00 AM. cron.schedule('0 2 * * *', () => { // Self-invoking async function to handle the promise and errors gracefully. (async () => { if (isDailyDealCheckRunning) { logger.warn( '[BackgroundJob] Daily deal check is already running. Skipping this scheduled run.', ); return; } isDailyDealCheckRunning = true; try { await backgroundJobService.runDailyDealCheck(); } catch (error) { // The method itself logs details, this is a final catch-all. logger.error( { err: error }, '[BackgroundJob] Cron job for daily deal check failed unexpectedly.', ); } finally { isDailyDealCheckRunning = false; } })().catch((error: unknown) => { // This catch is for unhandled promise rejections from the async wrapper itself. logger.error( { err: error }, '[BackgroundJob] Unhandled rejection in daily deal check cron wrapper.', ); isDailyDealCheckRunning = false; }); }); logger.info('[BackgroundJob] Cron job for daily deal checks has been scheduled.'); // Schedule the analytics report generation job to run at 3:00 AM. cron.schedule('0 3 * * *', () => { (async () => { logger.info('[BackgroundJob] Enqueuing daily analytics report generation job.'); try { const reportDate = getCurrentDateISOString(); // YYYY-MM-DD // We use a unique job ID to prevent duplicate jobs for the same day if the scheduler restarts. await analyticsQueue.add( 'generate-daily-report', { reportDate }, { jobId: `daily-report-${reportDate}`, }, ); } catch (error) { logger.error({ err: error }, '[BackgroundJob] Failed to enqueue daily analytics job.'); } })().catch((error: unknown) => { logger.error( { err: error }, '[BackgroundJob] Unhandled rejection in analytics report cron wrapper.', ); }); }); logger.info('[BackgroundJob] Cron job for daily analytics reports has been scheduled.'); // Schedule the weekly analytics report generation job to run every Sunday at 4:00 AM. cron.schedule('0 4 * * 0', () => { // 0 4 * * 0 means 4:00 AM on Sunday (async () => { logger.info('[BackgroundJob] Enqueuing weekly analytics report generation job.'); try { const { year: reportYear, week: reportWeek } = getSimpleWeekAndYear(); await weeklyAnalyticsQueue.add( 'generate-weekly-report', { reportYear, reportWeek }, { jobId: `weekly-report-${reportYear}-${reportWeek}`, }, ); } catch (error) { logger.error({ err: error }, '[BackgroundJob] Failed to enqueue weekly analytics job.'); } })().catch((error: unknown) => { logger.error( { err: error }, '[BackgroundJob] Unhandled rejection in weekly analytics report cron wrapper.', ); }); }); logger.info('[BackgroundJob] Cron job for weekly analytics reports has been scheduled.'); // Schedule the expired token cleanup job to run every day at 5:00 AM. cron.schedule('0 5 * * *', () => { (async () => { logger.info('[BackgroundJob] Enqueuing expired password reset token cleanup job.'); try { const timestamp = new Date().toISOString(); await tokenCleanupQueue.add( 'cleanup-tokens', { timestamp }, { jobId: `token-cleanup-${timestamp.split('T')[0]}`, }, ); } catch (error) { logger.error({ err: error }, '[BackgroundJob] Failed to enqueue token cleanup job.'); } })().catch((error: unknown) => { logger.error( { err: error }, '[BackgroundJob] Unhandled rejection in token cleanup cron wrapper.', ); }); }); logger.info('[BackgroundJob] Cron job for expired token cleanup has been scheduled.'); } catch (error) { logger.error( { err: error }, '[BackgroundJob] Failed to schedule a cron job. This is a critical setup error.', ); } } // Instantiate the service with its real dependencies for use in the application. import { personalizationRepo, notificationRepo } from './db/index.db'; import { logger } from './logger.server'; import { emailQueue } from './queueService.server'; export const backgroundJobService = new BackgroundJobService( personalizationRepo, notificationRepo, emailQueue, logger, );