Files
flyer-crawler.projectium.com/docs/adr/0037-scheduled-jobs-and-cron-pattern.md

266 lines
9.2 KiB
Markdown

# ADR-037: Scheduled Jobs and Cron Pattern
**Date**: 2026-01-09
**Status**: Accepted
**Implemented**: 2026-01-09
## Context
Many business operations need to run on a recurring schedule without user intervention:
1. **Daily Deal Checks**: Scan watched items for price drops and notify users.
2. **Analytics Generation**: Compile daily and weekly statistics reports.
3. **Token Cleanup**: Remove expired password reset tokens from the database.
4. **Data Maintenance**: Archive old data, clean up temporary files.
These scheduled operations require:
- Reliable execution at specific times
- Protection against overlapping runs
- Graceful error handling that doesn't crash the server
- Integration with the existing job queue system (BullMQ)
## Decision
We will use `node-cron` for scheduling jobs and integrate with BullMQ for job execution. This provides:
1. **Cron Expressions**: Standard, well-understood scheduling syntax.
2. **Job Queue Integration**: Scheduled jobs enqueue work to BullMQ for reliable processing.
3. **Idempotency**: Jobs use predictable IDs to prevent duplicate runs.
4. **Overlap Protection**: In-memory locks prevent concurrent execution of the same job.
### Architecture
```text
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ node-cron │────▶│ BullMQ Queue │────▶│ Worker │
│ (Scheduler) │ │ (Job Store) │ │ (Processor) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
┌─────────────────┐
│ Redis │
│ (Persistence) │
└─────────────────┘
```
## Implementation Details
### BackgroundJobService
Located in `src/services/backgroundJobService.ts`:
```typescript
import cron from 'node-cron';
import type { Logger } from 'pino';
import type { Queue } from 'bullmq';
export class BackgroundJobService {
constructor(
private personalizationRepo: PersonalizationRepository,
private notificationRepo: NotificationRepository,
private emailQueue: Queue<EmailJobData>,
private logger: Logger,
) {}
async runDailyDealCheck(): Promise<void> {
this.logger.info('[BackgroundJob] Starting daily deal check...');
// 1. Fetch all deals for all users in one efficient query
const allDeals = await this.personalizationRepo.getBestSalePricesForAllUsers(this.logger);
// 2. Group deals by user
const dealsByUser = this.groupDealsByUser(allDeals);
// 3. Process each user's deals in parallel
const results = await Promise.allSettled(
Array.from(dealsByUser.values()).map((userGroup) => this._processDealsForUser(userGroup)),
);
// 4. Bulk insert notifications
await this.bulkCreateNotifications(results);
this.logger.info('[BackgroundJob] Daily deal check completed.');
}
async triggerAnalyticsReport(): Promise<string> {
const reportDate = getCurrentDateISOString();
const jobId = `manual-report-${reportDate}-${Date.now()}`;
const job = await analyticsQueue.add('generate-daily-report', { reportDate }, { jobId });
return job.id;
}
}
```
### Cron Job Initialization
```typescript
// In-memory lock to prevent job overlap
let isDailyDealCheckRunning = false;
export function startBackgroundJobs(
backgroundJobService: BackgroundJobService,
analyticsQueue: Queue,
weeklyAnalyticsQueue: Queue,
tokenCleanupQueue: Queue,
logger: Logger,
): void {
// Daily deal check at 2:00 AM
cron.schedule('0 2 * * *', () => {
(async () => {
if (isDailyDealCheckRunning) {
logger.warn('[BackgroundJob] Daily deal check already running. Skipping.');
return;
}
isDailyDealCheckRunning = true;
try {
await backgroundJobService.runDailyDealCheck();
} catch (error) {
logger.error({ err: error }, '[BackgroundJob] Daily deal check failed.');
} finally {
isDailyDealCheckRunning = false;
}
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Unhandled rejection in cron wrapper.');
isDailyDealCheckRunning = false;
});
});
// Daily analytics at 3:00 AM
cron.schedule('0 3 * * *', () => {
(async () => {
const reportDate = getCurrentDateISOString();
await analyticsQueue.add(
'generate-daily-report',
{ reportDate },
{ jobId: `daily-report-${reportDate}` }, // Prevents duplicates
);
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Analytics job enqueue failed.');
});
});
// Weekly analytics at 4:00 AM on Sundays
cron.schedule('0 4 * * 0', () => {
(async () => {
const { year, week } = getSimpleWeekAndYear();
await weeklyAnalyticsQueue.add(
'generate-weekly-report',
{ reportYear: year, reportWeek: week },
{ jobId: `weekly-report-${year}-${week}` },
);
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Weekly analytics enqueue failed.');
});
});
// Token cleanup at 5:00 AM
cron.schedule('0 5 * * *', () => {
(async () => {
const timestamp = new Date().toISOString();
await tokenCleanupQueue.add(
'cleanup-tokens',
{ timestamp },
{ jobId: `token-cleanup-${timestamp.split('T')[0]}` },
);
})().catch((error) => {
logger.error({ err: error }, '[BackgroundJob] Token cleanup enqueue failed.');
});
});
logger.info('[BackgroundJob] All cron jobs scheduled successfully.');
}
```
### Job Schedule Reference
| Job | Schedule | Queue | Purpose |
| ---------------- | ---------------------------- | ---------------------- | --------------------------------- |
| Daily Deal Check | `0 2 * * *` (2:00 AM) | Direct execution | Find price drops on watched items |
| Daily Analytics | `0 3 * * *` (3:00 AM) | `analyticsQueue` | Generate daily statistics |
| Weekly Analytics | `0 4 * * 0` (4:00 AM Sunday) | `weeklyAnalyticsQueue` | Generate weekly reports |
| Token Cleanup | `0 5 * * *` (5:00 AM) | `tokenCleanupQueue` | Remove expired tokens |
### Cron Expression Reference
```text
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 7, Sun = 0 or 7)
│ │ │ │ │
* * * * *
Examples:
0 2 * * * = 2:00 AM every day
0 4 * * 0 = 4:00 AM every Sunday
*/15 * * * * = Every 15 minutes
0 0 1 * * = Midnight on the 1st of each month
```
### Error Handling Pattern
The async IIFE wrapper with `.catch()` ensures that:
1. Errors in the job don't crash the cron scheduler
2. Unhandled promise rejections are logged
3. The lock is always released in the `finally` block
```typescript
cron.schedule('0 2 * * *', () => {
(async () => {
// Job logic here
})().catch((error) => {
// Handle unhandled rejections from the async wrapper
logger.error({ err: error }, 'Unhandled rejection');
});
});
```
### Manual Trigger API
Admin endpoints allow manual triggering of scheduled jobs:
```typescript
// src/routes/admin.routes.ts
router.post('/jobs/daily-deals', isAdmin, async (req, res, next) => {
await backgroundJobService.runDailyDealCheck();
res.json({ message: 'Daily deal check triggered' });
});
router.post('/jobs/analytics', isAdmin, async (req, res, next) => {
const jobId = await backgroundJobService.triggerAnalyticsReport();
res.json({ message: 'Analytics report queued', jobId });
});
```
## Consequences
### Positive
- **Reliability**: Jobs run at predictable times without manual intervention.
- **Idempotency**: Duplicate job prevention via job IDs.
- **Observability**: All job activity is logged with structured logging.
- **Flexibility**: Jobs can be triggered manually for testing or urgent runs.
- **Separation**: Scheduling is decoupled from job execution (cron vs BullMQ).
### Negative
- **Single Server**: Cron runs on a single server instance. For multi-server deployments, consider distributed scheduling.
- **Time Zone Dependency**: Cron times are server-local; consider UTC for distributed systems.
- **In-Memory Locks**: Overlap protection is per-process, not cluster-wide.
## Key Files
- `src/services/backgroundJobService.ts` - BackgroundJobService class and `startBackgroundJobs`
- `src/services/queueService.server.ts` - BullMQ queue definitions
- `src/services/workers.server.ts` - BullMQ worker processors
## Related ADRs
- [ADR-006](./0006-background-job-processing-and-task-queues.md) - Background Job Processing
- [ADR-004](./0004-standardized-application-wide-structured-logging.md) - Structured Logging