9.2 KiB
9.2 KiB
ADR-037: Scheduled Jobs and Cron Pattern
Date: 2026-01-09
Status: Accepted
Implemented: 2026-01-09
Context
Many business operations need to run on a recurring schedule without user intervention:
- Daily Deal Checks: Scan watched items for price drops and notify users.
- Analytics Generation: Compile daily and weekly statistics reports.
- Token Cleanup: Remove expired password reset tokens from the database.
- Data Maintenance: Archive old data, clean up temporary files.
These scheduled operations require:
- Reliable execution at specific times
- Protection against overlapping runs
- Graceful error handling that doesn't crash the server
- Integration with the existing job queue system (BullMQ)
Decision
We will use node-cron for scheduling jobs and integrate with BullMQ for job execution. This provides:
- Cron Expressions: Standard, well-understood scheduling syntax.
- Job Queue Integration: Scheduled jobs enqueue work to BullMQ for reliable processing.
- Idempotency: Jobs use predictable IDs to prevent duplicate runs.
- Overlap Protection: In-memory locks prevent concurrent execution of the same job.
Architecture
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ node-cron │────▶│ BullMQ Queue │────▶│ Worker │
│ (Scheduler) │ │ (Job Store) │ │ (Processor) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Redis │
│ (Persistence) │
└─────────────────┘
Implementation Details
BackgroundJobService
Located in src/services/backgroundJobService.ts:
import cron from 'node-cron';
import type { Logger } from 'pino';
import type { Queue } from 'bullmq';
export class BackgroundJobService {
constructor(
private personalizationRepo: PersonalizationRepository,
private notificationRepo: NotificationRepository,
private emailQueue: Queue<EmailJobData>,
private logger: Logger,
) {}
async runDailyDealCheck(): Promise<void> {
this.logger.info('[BackgroundJob] Starting daily deal check...');
// 1. Fetch all deals for all users in one efficient query
const allDeals = await this.personalizationRepo.getBestSalePricesForAllUsers(this.logger);
// 2. Group deals by user
const dealsByUser = this.groupDealsByUser(allDeals);
// 3. Process each user's deals in parallel
const results = await Promise.allSettled(
Array.from(dealsByUser.values()).map((userGroup) => this._processDealsForUser(userGroup)),
);
// 4. Bulk insert notifications
await this.bulkCreateNotifications(results);
this.logger.info('[BackgroundJob] Daily deal check completed.');
}
async triggerAnalyticsReport(): Promise<string> {
const reportDate = getCurrentDateISOString();
const jobId = `manual-report-${reportDate}-${Date.now()}`;
const job = await analyticsQueue.add('generate-daily-report', { reportDate }, { jobId });
return job.id;
}
}
Cron Job Initialization
// In-memory lock to prevent job overlap
let isDailyDealCheckRunning = false;
export function startBackgroundJobs(
backgroundJobService: BackgroundJobService,
analyticsQueue: Queue,
weeklyAnalyticsQueue: Queue,
tokenCleanupQueue: Queue,
logger: Logger,
): void {
// Daily deal check at 2:00 AM
cron.schedule('0 2 * * *', () => {
(async () => {
if (isDailyDealCheckRunning) {
logger.warn('[BackgroundJob] Daily deal check already running. Skipping.');
return;
}
isDailyDealCheckRunning = true;
try {
await backgroundJobService.runDailyDealCheck();
} catch (error) {
logger.error({ err: error }, '[BackgroundJob] Daily deal check failed.');
} finally {
isDailyDealCheckRunning = false;
}
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Unhandled rejection in cron wrapper.');
isDailyDealCheckRunning = false;
});
});
// Daily analytics at 3:00 AM
cron.schedule('0 3 * * *', () => {
(async () => {
const reportDate = getCurrentDateISOString();
await analyticsQueue.add(
'generate-daily-report',
{ reportDate },
{ jobId: `daily-report-${reportDate}` }, // Prevents duplicates
);
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Analytics job enqueue failed.');
});
});
// Weekly analytics at 4:00 AM on Sundays
cron.schedule('0 4 * * 0', () => {
(async () => {
const { year, week } = getSimpleWeekAndYear();
await weeklyAnalyticsQueue.add(
'generate-weekly-report',
{ reportYear: year, reportWeek: week },
{ jobId: `weekly-report-${year}-${week}` },
);
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Weekly analytics enqueue failed.');
});
});
// Token cleanup at 5:00 AM
cron.schedule('0 5 * * *', () => {
(async () => {
const timestamp = new Date().toISOString();
await tokenCleanupQueue.add(
'cleanup-tokens',
{ timestamp },
{ jobId: `token-cleanup-${timestamp.split('T')[0]}` },
);
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Token cleanup enqueue failed.');
});
});
logger.info('[BackgroundJob] All cron jobs scheduled successfully.');
}
Job Schedule Reference
| Job | Schedule | Queue | Purpose |
|---|---|---|---|
| Daily Deal Check | 0 2 * * * (2:00 AM) |
Direct execution | Find price drops on watched items |
| Daily Analytics | 0 3 * * * (3:00 AM) |
analyticsQueue |
Generate daily statistics |
| Weekly Analytics | 0 4 * * 0 (4:00 AM Sunday) |
weeklyAnalyticsQueue |
Generate weekly reports |
| Token Cleanup | 0 5 * * * (5:00 AM) |
tokenCleanupQueue |
Remove expired tokens |
Cron Expression Reference
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 7, Sun = 0 or 7)
│ │ │ │ │
* * * * *
Examples:
0 2 * * * = 2:00 AM every day
0 4 * * 0 = 4:00 AM every Sunday
*/15 * * * * = Every 15 minutes
0 0 1 * * = Midnight on the 1st of each month
Error Handling Pattern
The async IIFE wrapper with .catch() ensures that:
- Errors in the job don't crash the cron scheduler
- Unhandled promise rejections are logged
- The lock is always released in the
finallyblock
cron.schedule('0 2 * * *', () => {
(async () => {
// Job logic here
})().catch((error) => {
// Handle unhandled rejections from the async wrapper
logger.error({ err: error }, 'Unhandled rejection');
});
});
Manual Trigger API
Admin endpoints allow manual triggering of scheduled jobs:
// src/routes/admin.routes.ts
router.post('/jobs/daily-deals', isAdmin, async (req, res, next) => {
await backgroundJobService.runDailyDealCheck();
res.json({ message: 'Daily deal check triggered' });
});
router.post('/jobs/analytics', isAdmin, async (req, res, next) => {
const jobId = await backgroundJobService.triggerAnalyticsReport();
res.json({ message: 'Analytics report queued', jobId });
});
Consequences
Positive
- Reliability: Jobs run at predictable times without manual intervention.
- Idempotency: Duplicate job prevention via job IDs.
- Observability: All job activity is logged with structured logging.
- Flexibility: Jobs can be triggered manually for testing or urgent runs.
- Separation: Scheduling is decoupled from job execution (cron vs BullMQ).
Negative
- Single Server: Cron runs on a single server instance. For multi-server deployments, consider distributed scheduling.
- Time Zone Dependency: Cron times are server-local; consider UTC for distributed systems.
- In-Memory Locks: Overlap protection is per-process, not cluster-wide.
Key Files
src/services/backgroundJobService.ts- BackgroundJobService class andstartBackgroundJobssrc/services/queueService.server.ts- BullMQ queue definitionssrc/services/workers.server.ts- BullMQ worker processors