Files
flyer-crawler.projectium.com/src/services/monitoringService.server.ts
Torben Sorensen d250932c05
All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 15m28s
all tests fixed? can it be?
2026-01-10 22:58:38 -08:00

127 lines
3.7 KiB
TypeScript

// src/services/monitoringService.server.ts
import {
flyerQueue,
emailQueue,
analyticsQueue,
cleanupQueue,
weeklyAnalyticsQueue,
} from './queueService.server';
import {
analyticsWorker,
cleanupWorker,
emailWorker,
flyerWorker,
weeklyAnalyticsWorker,
flyerProcessingService,
} from './workers.server';
import type { Queue } from 'bullmq';
// Re-export flyerProcessingService for integration tests that need to inject mocks.
// This ensures tests get the SAME instance that the workers use, rather than creating
// a new instance by importing workers.server.ts directly.
export { flyerProcessingService };
import { NotFoundError, ValidationError } from './db/errors.db';
import { logger } from './logger.server';
class MonitoringService {
/**
* Retrieves the current running status of all registered BullMQ workers.
* @returns A promise that resolves to an array of worker statuses.
*/
async getWorkerStatuses() {
const workers = [
flyerWorker,
emailWorker,
analyticsWorker,
cleanupWorker,
weeklyAnalyticsWorker,
];
return Promise.all(
workers.map(async (worker) => ({
name: worker.name,
isRunning: worker.isRunning(),
})),
);
}
/**
* Retrieves job counts for all registered BullMQ queues.
* @returns A promise that resolves to an array of queue statuses.
*/
async getQueueStatuses() {
const queues = [flyerQueue, emailQueue, analyticsQueue, cleanupQueue, weeklyAnalyticsQueue];
return Promise.all(
queues.map(async (queue) => ({
name: queue.name,
counts: await queue.getJobCounts(
'waiting',
'active',
'completed',
'failed',
'delayed',
'paused',
),
})),
);
}
/**
* Retries a specific failed job in a given queue.
* @param queueName The name of the queue.
* @param jobId The ID of the job to retry.
* @param userId The ID of the user initiating the retry.
*/
async retryFailedJob(queueName: string, jobId: string, userId: string) {
const queueMap: { [key: string]: Queue } = {
'flyer-processing': flyerQueue,
'email-sending': emailQueue,
'analytics-reporting': analyticsQueue,
'file-cleanup': cleanupQueue,
'weekly-analytics-reporting': weeklyAnalyticsQueue, // This was a duplicate, fixed.
};
const queue = queueMap[queueName];
if (!queue) {
throw new NotFoundError(`Queue '${queueName}' not found.`);
}
const job = await queue.getJob(jobId);
if (!job) {
throw new NotFoundError(`Job with ID '${jobId}' not found in queue '${queueName}'.`);
}
const jobState = await job.getState();
if (jobState !== 'failed') {
throw new ValidationError([], `Job is not in a 'failed' state. Current state: ${jobState}.`);
}
await job.retry();
logger.info(`[Admin] User ${userId} manually retried job ${jobId} in queue ${queueName}.`);
}
/**
* Retrieves the status of a single job from the flyer processing queue.
* @param jobId The ID of the job to retrieve.
* @returns A promise that resolves to a simplified job status object.
*/
async getFlyerJobStatus(jobId: string): Promise<{
id: string;
state: string;
progress: number | object | string | boolean;
returnValue: unknown;
failedReason: string | null;
}> {
const job = await flyerQueue.getJob(jobId);
if (!job) {
throw new NotFoundError('Job not found.');
}
const state = await job.getState();
const progress = job.progress;
const returnValue = job.returnvalue;
const failedReason = job.failedReason;
return { id: job.id!, state, progress, returnValue, failedReason };
}
}
export const monitoringService = new MonitoringService();