All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 4m24s
338 lines
12 KiB
TypeScript
338 lines
12 KiB
TypeScript
// src/services/backgroundJobService.ts
|
|
import cron from 'node-cron';
|
|
import type { Logger } from 'pino';
|
|
import type { Queue } from 'bullmq';
|
|
import { Notification, WatchedItemDeal } from '../types';
|
|
import { getSimpleWeekAndYear } from '../utils/dateUtils';
|
|
// Import types for repositories from their source files
|
|
import type { PersonalizationRepository } from './db/personalization.db';
|
|
import type { NotificationRepository } from './db/notification.db';
|
|
import { analyticsQueue, weeklyAnalyticsQueue } from './queueService.server';
|
|
|
|
interface EmailJobData {
|
|
to: string;
|
|
subject: string;
|
|
html: string;
|
|
text: string;
|
|
}
|
|
|
|
export class BackgroundJobService {
|
|
constructor(
|
|
private personalizationRepo: PersonalizationRepository,
|
|
private notificationRepo: NotificationRepository, // Use the imported type here
|
|
private emailQueue: Queue<EmailJobData>,
|
|
private logger: Logger,
|
|
) {}
|
|
|
|
public async triggerAnalyticsReport(): Promise<string> {
|
|
const reportDate = new Date().toISOString().split('T')[0]; // YYYY-MM-DD
|
|
const jobId = `manual-report-${reportDate}-${Date.now()}`;
|
|
const job = await analyticsQueue.add('generate-daily-report', { reportDate }, { jobId });
|
|
return job.id!;
|
|
}
|
|
|
|
public async triggerWeeklyAnalyticsReport(): Promise<string> {
|
|
const { year: reportYear, week: reportWeek } = getSimpleWeekAndYear();
|
|
const jobId = `manual-weekly-report-${reportYear}-${reportWeek}-${Date.now()}`;
|
|
const job = await weeklyAnalyticsQueue.add(
|
|
'generate-weekly-report',
|
|
{ reportYear, reportWeek },
|
|
{ jobId },
|
|
);
|
|
return job.id!;
|
|
}
|
|
|
|
/**
|
|
* Prepares the data for an email notification job based on a user's deals.
|
|
* @param user The user to whom the email will be sent.
|
|
* @param deals The list of deals found for the user.
|
|
* @returns An object containing the email job data and a unique job ID.
|
|
*/
|
|
private _prepareDealEmail(
|
|
userProfile: { user_id: string; email: string; full_name: string | null },
|
|
deals: WatchedItemDeal[],
|
|
): { jobData: EmailJobData; jobId: string } {
|
|
const recipientName = userProfile.full_name || 'there';
|
|
const subject = `New Deals Found on Your Watched Items!`;
|
|
const dealsListHtml = deals
|
|
.map(
|
|
(deal) =>
|
|
`<li><strong>${deal.item_name}</strong> is on sale for <strong>$${(deal.best_price_in_cents / 100).toFixed(2)}</strong> at ${deal.store_name}!</li>`,
|
|
)
|
|
.join('');
|
|
const html = `<p>Hi ${recipientName},</p><p>We found some great deals on items you're watching:</p><ul>${dealsListHtml}</ul>`;
|
|
const text = `Hi ${recipientName},\n\nWe found some great deals on items you're watching. Visit the deals page on the site to learn more.`;
|
|
|
|
// Use a predictable Job ID to prevent duplicate email notifications for the same user on the same day.
|
|
const today = new Date().toISOString().split('T')[0];
|
|
const jobId = `deal-email-${userProfile.user_id}-${today}`;
|
|
|
|
return {
|
|
jobData: { to: userProfile.email, subject, html, text },
|
|
jobId,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Prepares the data for an in-app notification.
|
|
* @param userId The ID of the user to notify.
|
|
* @param dealCount The number of deals found.
|
|
* @returns The notification object ready for database insertion.
|
|
*/
|
|
private _prepareInAppNotification(
|
|
userId: string,
|
|
dealCount: number,
|
|
): Omit<Notification, 'notification_id' | 'is_read' | 'created_at'> {
|
|
return {
|
|
user_id: userId,
|
|
content: `You have ${dealCount} new deal(s) on your watched items!`,
|
|
link_url: '/dashboard/deals', // A link to the future "My Deals" page
|
|
updated_at: new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Checks for new deals on watched items for all users and sends notifications.
|
|
* This function is designed to be run periodically (e.g., daily).
|
|
*/
|
|
async runDailyDealCheck(): Promise<void> {
|
|
this.logger.info('[BackgroundJob] Starting daily deal check for all users...');
|
|
|
|
try {
|
|
// 1. Get all deals for all users in a single, efficient query.
|
|
const allDeals = await this.personalizationRepo.getBestSalePricesForAllUsers(this.logger);
|
|
|
|
if (allDeals.length === 0) {
|
|
this.logger.info('[BackgroundJob] No deals found for any watched items. Skipping.');
|
|
return;
|
|
}
|
|
|
|
this.logger.info(`[BackgroundJob] Found ${allDeals.length} total deals across all users.`);
|
|
|
|
// 2. Group deals by user in memory.
|
|
const dealsByUser = allDeals.reduce<
|
|
Record<
|
|
string,
|
|
{
|
|
userProfile: { user_id: string; email: string; full_name: string | null };
|
|
deals: WatchedItemDeal[];
|
|
}
|
|
>
|
|
>((acc, deal) => {
|
|
if (!acc[deal.user_id]) {
|
|
acc[deal.user_id] = {
|
|
userProfile: { user_id: deal.user_id, email: deal.email, full_name: deal.full_name },
|
|
deals: [],
|
|
};
|
|
}
|
|
acc[deal.user_id].deals.push(deal);
|
|
return acc;
|
|
}, {});
|
|
|
|
const allNotifications: Omit<Notification, 'notification_id' | 'is_read' | 'created_at'>[] =
|
|
[];
|
|
|
|
// 3. Process each user's deals in parallel.
|
|
const userProcessingPromises = Object.values(dealsByUser).map(
|
|
async ({ userProfile, deals }) => {
|
|
try {
|
|
this.logger.info(
|
|
`[BackgroundJob] Found ${deals.length} deals for user ${userProfile.user_id}.`,
|
|
);
|
|
|
|
// 4. Prepare in-app and email notifications.
|
|
const notification = this._prepareInAppNotification(userProfile.user_id, deals.length);
|
|
const { jobData, jobId } = this._prepareDealEmail(userProfile, deals);
|
|
|
|
// 5. Enqueue an email notification job.
|
|
await this.emailQueue.add('send-deal-notification', jobData, { jobId });
|
|
|
|
// Return the notification to be collected for bulk insertion.
|
|
return notification;
|
|
} catch (userError) {
|
|
this.logger.error(
|
|
{ err: userError },
|
|
`[BackgroundJob] Failed to process deals for user ${userProfile.user_id}`,
|
|
);
|
|
return null; // Return null on error for this user.
|
|
}
|
|
},
|
|
);
|
|
|
|
// Wait for all user processing to complete.
|
|
const results = await Promise.allSettled(userProcessingPromises);
|
|
|
|
// 6. Collect all successfully created notifications.
|
|
results.forEach((result) => {
|
|
if (result.status === 'fulfilled' && result.value) {
|
|
allNotifications.push(result.value);
|
|
}
|
|
});
|
|
|
|
// 7. Bulk insert all in-app notifications in a single query.
|
|
if (allNotifications.length > 0) {
|
|
await this.notificationRepo.createBulkNotifications(allNotifications, this.logger);
|
|
this.logger.info(
|
|
`[BackgroundJob] Successfully created ${allNotifications.length} in-app notifications.`,
|
|
);
|
|
}
|
|
|
|
this.logger.info('[BackgroundJob] Daily deal check completed successfully.');
|
|
} catch (error) {
|
|
this.logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] A critical error occurred during the daily deal check',
|
|
);
|
|
// Re-throw the error so the cron wrapper knows it failed.
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
// A simple in-memory lock to prevent job overlaps.
|
|
let isDailyDealCheckRunning = false;
|
|
|
|
/**
|
|
* Initializes and starts the cron job for daily deal checks.
|
|
* This should be called once when the server starts.
|
|
* @param backgroundJobService An instance of BackgroundJobService.
|
|
* @param analyticsQueue An instance of the BullMQ analytics queue.
|
|
*/
|
|
export function startBackgroundJobs(
|
|
backgroundJobService: BackgroundJobService,
|
|
analyticsQueue: Queue,
|
|
weeklyAnalyticsQueue: Queue,
|
|
tokenCleanupQueue: Queue,
|
|
logger: Logger,
|
|
): void {
|
|
try {
|
|
// Schedule the deal check job to run once every day at 2:00 AM.
|
|
cron.schedule('0 2 * * *', () => {
|
|
// Self-invoking async function to handle the promise and errors gracefully.
|
|
(async () => {
|
|
if (isDailyDealCheckRunning) {
|
|
logger.warn(
|
|
'[BackgroundJob] Daily deal check is already running. Skipping this scheduled run.',
|
|
);
|
|
return;
|
|
}
|
|
isDailyDealCheckRunning = true;
|
|
try {
|
|
await backgroundJobService.runDailyDealCheck();
|
|
} catch (error) {
|
|
// The method itself logs details, this is a final catch-all.
|
|
logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] Cron job for daily deal check failed unexpectedly.',
|
|
);
|
|
} finally {
|
|
isDailyDealCheckRunning = false;
|
|
}
|
|
})().catch((error: unknown) => {
|
|
// This catch is for unhandled promise rejections from the async wrapper itself.
|
|
logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] Unhandled rejection in daily deal check cron wrapper.',
|
|
);
|
|
isDailyDealCheckRunning = false;
|
|
});
|
|
});
|
|
logger.info('[BackgroundJob] Cron job for daily deal checks has been scheduled.');
|
|
|
|
// Schedule the analytics report generation job to run at 3:00 AM.
|
|
cron.schedule('0 3 * * *', () => {
|
|
(async () => {
|
|
logger.info('[BackgroundJob] Enqueuing daily analytics report generation job.');
|
|
try {
|
|
const reportDate = new Date().toISOString().split('T')[0]; // YYYY-MM-DD
|
|
// We use a unique job ID to prevent duplicate jobs for the same day if the scheduler restarts.
|
|
await analyticsQueue.add(
|
|
'generate-daily-report',
|
|
{ reportDate },
|
|
{
|
|
jobId: `daily-report-${reportDate}`,
|
|
},
|
|
);
|
|
} catch (error) {
|
|
logger.error({ err: error }, '[BackgroundJob] Failed to enqueue daily analytics job.');
|
|
}
|
|
})().catch((error: unknown) => {
|
|
logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] Unhandled rejection in analytics report cron wrapper.',
|
|
);
|
|
});
|
|
});
|
|
logger.info('[BackgroundJob] Cron job for daily analytics reports has been scheduled.');
|
|
|
|
// Schedule the weekly analytics report generation job to run every Sunday at 4:00 AM.
|
|
cron.schedule('0 4 * * 0', () => {
|
|
// 0 4 * * 0 means 4:00 AM on Sunday
|
|
(async () => {
|
|
logger.info('[BackgroundJob] Enqueuing weekly analytics report generation job.');
|
|
try {
|
|
const { year: reportYear, week: reportWeek } = getSimpleWeekAndYear();
|
|
|
|
await weeklyAnalyticsQueue.add(
|
|
'generate-weekly-report',
|
|
{ reportYear, reportWeek },
|
|
{
|
|
jobId: `weekly-report-${reportYear}-${reportWeek}`,
|
|
},
|
|
);
|
|
} catch (error) {
|
|
logger.error({ err: error }, '[BackgroundJob] Failed to enqueue weekly analytics job.');
|
|
}
|
|
})().catch((error: unknown) => {
|
|
logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] Unhandled rejection in weekly analytics report cron wrapper.',
|
|
);
|
|
});
|
|
});
|
|
logger.info('[BackgroundJob] Cron job for weekly analytics reports has been scheduled.');
|
|
|
|
// Schedule the expired token cleanup job to run every day at 5:00 AM.
|
|
cron.schedule('0 5 * * *', () => {
|
|
(async () => {
|
|
logger.info('[BackgroundJob] Enqueuing expired password reset token cleanup job.');
|
|
try {
|
|
const timestamp = new Date().toISOString();
|
|
await tokenCleanupQueue.add(
|
|
'cleanup-tokens',
|
|
{ timestamp },
|
|
{
|
|
jobId: `token-cleanup-${timestamp.split('T')[0]}`,
|
|
},
|
|
);
|
|
} catch (error) {
|
|
logger.error({ err: error }, '[BackgroundJob] Failed to enqueue token cleanup job.');
|
|
}
|
|
})().catch((error: unknown) => {
|
|
logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] Unhandled rejection in token cleanup cron wrapper.',
|
|
);
|
|
});
|
|
});
|
|
logger.info('[BackgroundJob] Cron job for expired token cleanup has been scheduled.');
|
|
} catch (error) {
|
|
logger.error(
|
|
{ err: error },
|
|
'[BackgroundJob] Failed to schedule a cron job. This is a critical setup error.',
|
|
);
|
|
}
|
|
}
|
|
|
|
// Instantiate the service with its real dependencies for use in the application.
|
|
import { personalizationRepo, notificationRepo } from './db/index.db';
|
|
import { logger } from './logger.server';
|
|
import { emailQueue } from './queueService.server';
|
|
|
|
export const backgroundJobService = new BackgroundJobService(
|
|
personalizationRepo,
|
|
notificationRepo,
|
|
emailQueue,
|
|
logger,
|
|
);
|