Files
flyer-crawler.projectium.com/docs/adr/0037-scheduled-jobs-and-cron-pattern.md

9.2 KiB

ADR-037: Scheduled Jobs and Cron Pattern

Date: 2026-01-09

Status: Accepted

Implemented: 2026-01-09

Context

Many business operations need to run on a recurring schedule without user intervention:

  1. Daily Deal Checks: Scan watched items for price drops and notify users.
  2. Analytics Generation: Compile daily and weekly statistics reports.
  3. Token Cleanup: Remove expired password reset tokens from the database.
  4. Data Maintenance: Archive old data, clean up temporary files.

These scheduled operations require:

  • Reliable execution at specific times
  • Protection against overlapping runs
  • Graceful error handling that doesn't crash the server
  • Integration with the existing job queue system (BullMQ)

Decision

We will use node-cron for scheduling jobs and integrate with BullMQ for job execution. This provides:

  1. Cron Expressions: Standard, well-understood scheduling syntax.
  2. Job Queue Integration: Scheduled jobs enqueue work to BullMQ for reliable processing.
  3. Idempotency: Jobs use predictable IDs to prevent duplicate runs.
  4. Overlap Protection: In-memory locks prevent concurrent execution of the same job.

Architecture

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   node-cron     │────▶│   BullMQ Queue  │────▶│   Worker        │
│   (Scheduler)   │     │   (Job Store)   │     │   (Processor)   │
└─────────────────┘     └─────────────────┘     └─────────────────┘
                              │
                              ▼
                        ┌─────────────────┐
                        │   Redis         │
                        │   (Persistence) │
                        └─────────────────┘

Implementation Details

BackgroundJobService

Located in src/services/backgroundJobService.ts:

import cron from 'node-cron';
import type { Logger } from 'pino';
import type { Queue } from 'bullmq';

export class BackgroundJobService {
  constructor(
    private personalizationRepo: PersonalizationRepository,
    private notificationRepo: NotificationRepository,
    private emailQueue: Queue<EmailJobData>,
    private logger: Logger,
  ) {}

  async runDailyDealCheck(): Promise<void> {
    this.logger.info('[BackgroundJob] Starting daily deal check...');

    // 1. Fetch all deals for all users in one efficient query
    const allDeals = await this.personalizationRepo.getBestSalePricesForAllUsers(this.logger);

    // 2. Group deals by user
    const dealsByUser = this.groupDealsByUser(allDeals);

    // 3. Process each user's deals in parallel
    const results = await Promise.allSettled(
      Array.from(dealsByUser.values()).map((userGroup) => this._processDealsForUser(userGroup)),
    );

    // 4. Bulk insert notifications
    await this.bulkCreateNotifications(results);

    this.logger.info('[BackgroundJob] Daily deal check completed.');
  }

  async triggerAnalyticsReport(): Promise<string> {
    const reportDate = getCurrentDateISOString();
    const jobId = `manual-report-${reportDate}-${Date.now()}`;
    const job = await analyticsQueue.add('generate-daily-report', { reportDate }, { jobId });
    return job.id;
  }
}

Cron Job Initialization

// In-memory lock to prevent job overlap
let isDailyDealCheckRunning = false;

export function startBackgroundJobs(
  backgroundJobService: BackgroundJobService,
  analyticsQueue: Queue,
  weeklyAnalyticsQueue: Queue,
  tokenCleanupQueue: Queue,
  logger: Logger,
): void {
  // Daily deal check at 2:00 AM
  cron.schedule('0 2 * * *', () => {
    (async () => {
      if (isDailyDealCheckRunning) {
        logger.warn('[BackgroundJob] Daily deal check already running. Skipping.');
        return;
      }
      isDailyDealCheckRunning = true;
      try {
        await backgroundJobService.runDailyDealCheck();
      } catch (error) {
        logger.error({ err: error }, '[BackgroundJob] Daily deal check failed.');
      } finally {
        isDailyDealCheckRunning = false;
      }
    })().catch((error) => {
      logger.error({ err: error }, '[BackgroundJob] Unhandled rejection in cron wrapper.');
      isDailyDealCheckRunning = false;
    });
  });

  // Daily analytics at 3:00 AM
  cron.schedule('0 3 * * *', () => {
    (async () => {
      const reportDate = getCurrentDateISOString();
      await analyticsQueue.add(
        'generate-daily-report',
        { reportDate },
        { jobId: `daily-report-${reportDate}` }, // Prevents duplicates
      );
    })().catch((error) => {
      logger.error({ err: error }, '[BackgroundJob] Analytics job enqueue failed.');
    });
  });

  // Weekly analytics at 4:00 AM on Sundays
  cron.schedule('0 4 * * 0', () => {
    (async () => {
      const { year, week } = getSimpleWeekAndYear();
      await weeklyAnalyticsQueue.add(
        'generate-weekly-report',
        { reportYear: year, reportWeek: week },
        { jobId: `weekly-report-${year}-${week}` },
      );
    })().catch((error) => {
      logger.error({ err: error }, '[BackgroundJob] Weekly analytics enqueue failed.');
    });
  });

  // Token cleanup at 5:00 AM
  cron.schedule('0 5 * * *', () => {
    (async () => {
      const timestamp = new Date().toISOString();
      await tokenCleanupQueue.add(
        'cleanup-tokens',
        { timestamp },
        { jobId: `token-cleanup-${timestamp.split('T')[0]}` },
      );
    })().catch((error) => {
      logger.error({ err: error }, '[BackgroundJob] Token cleanup enqueue failed.');
    });
  });

  logger.info('[BackgroundJob] All cron jobs scheduled successfully.');
}

Job Schedule Reference

Job Schedule Queue Purpose
Daily Deal Check 0 2 * * * (2:00 AM) Direct execution Find price drops on watched items
Daily Analytics 0 3 * * * (3:00 AM) analyticsQueue Generate daily statistics
Weekly Analytics 0 4 * * 0 (4:00 AM Sunday) weeklyAnalyticsQueue Generate weekly reports
Token Cleanup 0 5 * * * (5:00 AM) tokenCleanupQueue Remove expired tokens

Cron Expression Reference

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 7, Sun = 0 or 7)
│ │ │ │ │
* * * * *

Examples:
0 2 * * *     = 2:00 AM every day
0 4 * * 0     = 4:00 AM every Sunday
*/15 * * * *  = Every 15 minutes
0 0 1 * *     = Midnight on the 1st of each month

Error Handling Pattern

The async IIFE wrapper with .catch() ensures that:

  1. Errors in the job don't crash the cron scheduler
  2. Unhandled promise rejections are logged
  3. The lock is always released in the finally block
cron.schedule('0 2 * * *', () => {
  (async () => {
    // Job logic here
  })().catch((error) => {
    // Handle unhandled rejections from the async wrapper
    logger.error({ err: error }, 'Unhandled rejection');
  });
});

Manual Trigger API

Admin endpoints allow manual triggering of scheduled jobs:

// src/routes/admin.routes.ts
router.post('/jobs/daily-deals', isAdmin, async (req, res, next) => {
  await backgroundJobService.runDailyDealCheck();
  res.json({ message: 'Daily deal check triggered' });
});

router.post('/jobs/analytics', isAdmin, async (req, res, next) => {
  const jobId = await backgroundJobService.triggerAnalyticsReport();
  res.json({ message: 'Analytics report queued', jobId });
});

Consequences

Positive

  • Reliability: Jobs run at predictable times without manual intervention.
  • Idempotency: Duplicate job prevention via job IDs.
  • Observability: All job activity is logged with structured logging.
  • Flexibility: Jobs can be triggered manually for testing or urgent runs.
  • Separation: Scheduling is decoupled from job execution (cron vs BullMQ).

Negative

  • Single Server: Cron runs on a single server instance. For multi-server deployments, consider distributed scheduling.
  • Time Zone Dependency: Cron times are server-local; consider UTC for distributed systems.
  • In-Memory Locks: Overlap protection is per-process, not cluster-wide.

Key Files

  • src/services/backgroundJobService.ts - BackgroundJobService class and startBackgroundJobs
  • src/services/queueService.server.ts - BullMQ queue definitions
  • src/services/workers.server.ts - BullMQ worker processors