Refactor geocoding services and improve logging

- Updated the Nominatim geocoding service to use a class-based structure and accept a logger instance for better logging control.
- Modified tests for the Nominatim service to align with the new structure and improved logging assertions.
- Removed the disabled notification service test file.
- Added a new GeocodingFailedError class to handle geocoding failures more explicitly.
- Enhanced error logging in the queue service to include structured error objects.
- Updated user service to accept a logger instance for better logging in address upsert operations.
- Added request-scoped logger to Express Request interface for type-safe logging in route handlers.
- Improved logging in utility functions for better debugging and error tracking.
- Created a new GoogleGeocodingService class for Google Maps geocoding with structured logging.
- Added tests for the useAiAnalysis hook to ensure proper functionality and error handling.
This commit is contained in:
2025-12-13 17:52:30 -08:00
parent 728f4a5f7e
commit 2affda25dc
62 changed files with 1928 additions and 1329 deletions

View File

@@ -31,8 +31,8 @@ connection.on('connect', () => {
});
connection.on('error', (err) => {
// This is crucial for diagnosing Redis connection issues.
logger.error('[Redis] Connection error.', { error: err });
// This is crucial for diagnosing Redis connection issues. // The patch requested this specific error handling.
logger.error({ err }, '[Redis] Connection error.');
});
const execAsync = promisify(exec);
@@ -152,24 +152,27 @@ const flyerProcessingService = new FlyerProcessingService(
*/
const attachWorkerEventListeners = (worker: Worker) => {
worker.on('completed', (job: Job, returnValue: unknown) => {
logger.info(`[${worker.name}] Job ${job.id} completed successfully.`, { returnValue });
logger.info({ returnValue }, `[${worker.name}] Job ${job.id} completed successfully.`);
});
worker.on('failed', (job: Job | undefined, error: Error) => {
// This event fires after all retries have failed.
logger.error(`[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`, { error: error.message, stack: error.stack, jobData: job?.data });
logger.error({ err: error, jobData: job?.data }, `[${worker.name}] Job ${job?.id} has ultimately failed after all attempts.`);
});
};
export const flyerWorker = new Worker<FlyerJobData>(
'flyer-processing', // Must match the queue name
(job) => flyerProcessingService.processJob(job), // The processor function
(job) => {
// Create a job-specific logger instance
const jobLogger = logger.child({ jobId: job.id, jobName: job.name, userId: job.data.userId });
return flyerProcessingService.processJob(job, jobLogger);
},
{
connection,
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
}
);
/**
* A dedicated worker process for sending emails.
*/
@@ -177,18 +180,20 @@ export const emailWorker = new Worker<EmailJobData>(
'email-sending',
async (job: Job<EmailJobData>) => {
const { to, subject } = job.data;
logger.info(`[EmailWorker] Sending email for job ${job.id}`, { to, subject });
// Create a job-specific logger instance
const jobLogger = logger.child({ jobId: job.id, jobName: job.name });
jobLogger.info({ to, subject }, `[EmailWorker] Sending email for job ${job.id}`);
try {
await emailService.sendEmail(job.data);
await emailService.sendEmail(job.data, jobLogger);
} catch (error: unknown) {
// Standardize error logging to capture the full error object, including the stack trace.
// This provides more context for debugging than just logging the message.
logger.error(`[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
// Log the full error object for better diagnostics.
error: error instanceof Error ? error : new Error(String(error)),
logger.error({
// Log the full error object for better diagnostics. // The patch requested this specific error handling.
err: error instanceof Error ? error : new Error(String(error)),
// Also include the job data for context.
jobData: job.data,
});
}, `[EmailWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`);
// Re-throw to let BullMQ handle the failure and retry.
throw error;
}
@@ -207,7 +212,7 @@ export const analyticsWorker = new Worker<AnalyticsJobData>(
'analytics-reporting',
async (job: Job<AnalyticsJobData>) => {
const { reportDate } = job.data;
logger.info(`[AnalyticsWorker] Starting report generation for job ${job.id}`, { reportDate });
logger.info({ reportDate }, `[AnalyticsWorker] Starting report generation for job ${job.id}`);
try {
// Special case for testing the retry mechanism
if (reportDate === 'FAIL') {
@@ -220,10 +225,10 @@ export const analyticsWorker = new Worker<AnalyticsJobData>(
logger.info(`[AnalyticsWorker] Successfully generated report for ${reportDate}.`);
} catch (error: unknown) {
// Standardize error logging.
logger.error(`[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: error instanceof Error ? error : new Error(String(error)),
logger.error({
err: error instanceof Error ? error : new Error(String(error)),
jobData: job.data,
});
}, `[AnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`);
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
@@ -243,7 +248,7 @@ export const cleanupWorker = new Worker<CleanupJobData>(
async (job: Job<CleanupJobData>) => {
// Destructure the data from the job payload.
const { flyerId, paths } = job.data;
logger.info(`[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`, { paths });
logger.info({ paths }, `[CleanupWorker] Starting file cleanup for job ${job.id} (Flyer ID: ${flyerId})`);
try {
if (!paths || paths.length === 0) {
@@ -269,9 +274,9 @@ export const cleanupWorker = new Worker<CleanupJobData>(
logger.info(`[CleanupWorker] Successfully cleaned up ${paths.length} file(s) for flyer ${flyerId}.`);
} catch (error: unknown) {
// Standardize error logging.
logger.error(`[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: error instanceof Error ? error : new Error(String(error)),
});
logger.error({
err: error instanceof Error ? error : new Error(String(error)),
}, `[CleanupWorker] Job ${job.id} for flyer ${flyerId} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`);
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},
@@ -289,17 +294,17 @@ export const weeklyAnalyticsWorker = new Worker<WeeklyAnalyticsJobData>(
'weekly-analytics-reporting',
async (job: Job<WeeklyAnalyticsJobData>) => {
const { reportYear, reportWeek } = job.data;
logger.info(`[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`, { reportYear, reportWeek });
logger.info({ reportYear, reportWeek }, `[WeeklyAnalyticsWorker] Starting weekly report generation for job ${job.id}`);
try {
// Simulate a longer-running task for weekly reports
await new Promise(resolve => setTimeout(resolve, 30000)); // Simulate 30-second task
logger.info(`[WeeklyAnalyticsWorker] Successfully generated weekly report for week ${reportWeek}, ${reportYear}.`);
} catch (error: unknown) {
// Standardize error logging.
logger.error(`[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`, {
error: error instanceof Error ? error : new Error(String(error)),
logger.error({
err: error instanceof Error ? error : new Error(String(error)),
jobData: job.data,
});
}, `[WeeklyAnalyticsWorker] Job ${job.id} failed. Attempt ${job.attemptsMade}/${job.opts.attempts}.`);
throw error; // Re-throw to let BullMQ handle the failure and retry.
}
},