All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 13m4s
111 lines
3.4 KiB
TypeScript
111 lines
3.4 KiB
TypeScript
// src/services/monitoringService.server.ts
|
|
import {
|
|
flyerQueue,
|
|
emailQueue,
|
|
analyticsQueue,
|
|
cleanupQueue,
|
|
weeklyAnalyticsQueue,
|
|
} from './queueService.server';
|
|
import {
|
|
analyticsWorker,
|
|
cleanupWorker,
|
|
emailWorker,
|
|
flyerWorker,
|
|
weeklyAnalyticsWorker,
|
|
} from './workers.server';
|
|
import type { Job, Queue } from 'bullmq';
|
|
import { NotFoundError, ValidationError } from './db/errors.db';
|
|
import { logger } from './logger.server';
|
|
|
|
class MonitoringService {
|
|
/**
|
|
* Retrieves the current running status of all registered BullMQ workers.
|
|
* @returns A promise that resolves to an array of worker statuses.
|
|
*/
|
|
async getWorkerStatuses() {
|
|
const workers = [flyerWorker, emailWorker, analyticsWorker, cleanupWorker, weeklyAnalyticsWorker];
|
|
return Promise.all(
|
|
workers.map(async (worker) => ({
|
|
name: worker.name,
|
|
isRunning: worker.isRunning(),
|
|
})),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Retrieves job counts for all registered BullMQ queues.
|
|
* @returns A promise that resolves to an array of queue statuses.
|
|
*/
|
|
async getQueueStatuses() {
|
|
const queues = [flyerQueue, emailQueue, analyticsQueue, cleanupQueue, weeklyAnalyticsQueue];
|
|
return Promise.all(
|
|
queues.map(async (queue) => ({
|
|
name: queue.name,
|
|
counts: await queue.getJobCounts(
|
|
'waiting',
|
|
'active',
|
|
'completed',
|
|
'failed',
|
|
'delayed',
|
|
'paused',
|
|
),
|
|
})),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Retries a specific failed job in a given queue.
|
|
* @param queueName The name of the queue.
|
|
* @param jobId The ID of the job to retry.
|
|
* @param userId The ID of the user initiating the retry.
|
|
*/
|
|
async retryFailedJob(queueName: string, jobId: string, userId: string) {
|
|
const queueMap: { [key: string]: Queue } = {
|
|
'flyer-processing': flyerQueue,
|
|
'email-sending': emailQueue,
|
|
'analytics-reporting': analyticsQueue,
|
|
'file-cleanup': cleanupQueue,
|
|
'weekly-analytics-reporting': weeklyAnalyticsQueue, // This was a duplicate, fixed.
|
|
};
|
|
|
|
const queue = queueMap[queueName];
|
|
if (!queue) {
|
|
throw new NotFoundError(`Queue '${queueName}' not found.`);
|
|
}
|
|
|
|
const job = await queue.getJob(jobId);
|
|
if (!job) {
|
|
throw new NotFoundError(`Job with ID '${jobId}' not found in queue '${queueName}'.`);
|
|
}
|
|
|
|
const jobState = await job.getState();
|
|
if (jobState !== 'failed') {
|
|
throw new ValidationError(
|
|
[],
|
|
`Job is not in a 'failed' state. Current state: ${jobState}.`,
|
|
);
|
|
}
|
|
|
|
await job.retry();
|
|
logger.info(`[Admin] User ${userId} manually retried job ${jobId} in queue ${queueName}.`);
|
|
}
|
|
|
|
/**
|
|
* Retrieves the status of a single job from the flyer processing queue.
|
|
* @param jobId The ID of the job to retrieve.
|
|
* @returns A promise that resolves to a simplified job status object.
|
|
*/
|
|
async getFlyerJobStatus(jobId: string): Promise<{ id: string; state: string; progress: number | object | string | boolean; returnValue: any; failedReason: string | null; }> {
|
|
const job = await flyerQueue.getJob(jobId);
|
|
if (!job) {
|
|
throw new NotFoundError('Job not found.');
|
|
}
|
|
const state = await job.getState();
|
|
const progress = job.progress;
|
|
const returnValue = job.returnvalue;
|
|
const failedReason = job.failedReason;
|
|
return { id: job.id!, state, progress, returnValue, failedReason };
|
|
}
|
|
}
|
|
|
|
export const monitoringService = new MonitoringService(); |