From 1fc70e391561f684dc7ba0270cd2268ed2af60bb Mon Sep 17 00:00:00 2001 From: Torben Sorensen Date: Fri, 2 Jan 2026 22:33:44 -0800 Subject: [PATCH] extend timers duration - prevent jobs from timing out after 30secs, increased to 4mins --- ecosystem.config.cjs | 3 + src/services/db/flyer.db.test.ts | 65 ++++++++++--------- src/services/db/flyer.db.ts | 52 ++++++--------- src/services/flyerDataTransformer.ts | 22 +++++-- src/services/workers.server.ts | 2 + src/tests/e2e/auth.e2e.test.ts | 5 +- .../flyer-processing.integration.test.ts | 4 +- 7 files changed, 82 insertions(+), 71 deletions(-) diff --git a/ecosystem.config.cjs b/ecosystem.config.cjs index cf885757..3c41cff4 100644 --- a/ecosystem.config.cjs +++ b/ecosystem.config.cjs @@ -52,6 +52,7 @@ module.exports = { SMTP_USER: process.env.SMTP_USER, SMTP_PASS: process.env.SMTP_PASS, SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL, + WORKER_LOCK_DURATION: '120000', }, // Test Environment Settings env_test: { @@ -74,6 +75,7 @@ module.exports = { SMTP_USER: process.env.SMTP_USER, SMTP_PASS: process.env.SMTP_PASS, SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL, + WORKER_LOCK_DURATION: '120000', }, // Development Environment Settings env_development: { @@ -97,6 +99,7 @@ module.exports = { SMTP_USER: process.env.SMTP_USER, SMTP_PASS: process.env.SMTP_PASS, SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL, + WORKER_LOCK_DURATION: '120000', }, }, { diff --git a/src/services/db/flyer.db.test.ts b/src/services/db/flyer.db.test.ts index 5ab656fd..7a65a149 100644 --- a/src/services/db/flyer.db.test.ts +++ b/src/services/db/flyer.db.test.ts @@ -51,67 +51,72 @@ describe('Flyer DB Service', () => { describe('findOrCreateStore', () => { it('should find an existing store and return its ID', async () => { - mockPoolInstance.query.mockResolvedValue({ rows: [{ store_id: 1 }] }); + // 1. INSERT...ON CONFLICT does nothing. 2. SELECT finds the store. + mockPoolInstance.query + .mockResolvedValueOnce({ rows: [], rowCount: 0 }) + .mockResolvedValueOnce({ rows: [{ store_id: 1 }] }); + const result = await flyerRepo.findOrCreateStore('Existing Store', mockLogger); expect(result).toBe(1); + expect(mockPoolInstance.query).toHaveBeenCalledTimes(2); expect(mockPoolInstance.query).toHaveBeenCalledWith( - expect.stringContaining('SELECT store_id FROM public.stores WHERE name = $1'), + 'INSERT INTO public.stores (name) VALUES ($1) ON CONFLICT (name) DO NOTHING', + ['Existing Store'], + ); + expect(mockPoolInstance.query).toHaveBeenCalledWith( + 'SELECT store_id FROM public.stores WHERE name = $1', ['Existing Store'], ); }); - it('should create a new store if it does not exist', async () => { + it('should create a new store if it does not exist and return its ID', async () => { + // 1. INSERT...ON CONFLICT creates the store. 2. SELECT finds it. mockPoolInstance.query - .mockResolvedValueOnce({ rows: [] }) // First SELECT finds nothing - .mockResolvedValueOnce({ rows: [{ store_id: 2 }] }) + .mockResolvedValueOnce({ rows: [], rowCount: 1 }) // INSERT affects 1 row + .mockResolvedValueOnce({ rows: [{ store_id: 2 }] }); // SELECT finds the new store + const result = await flyerRepo.findOrCreateStore('New Store', mockLogger); expect(result).toBe(2); + expect(mockPoolInstance.query).toHaveBeenCalledTimes(2); expect(mockPoolInstance.query).toHaveBeenCalledWith( - expect.stringContaining('INSERT INTO public.stores (name) VALUES ($1) RETURNING store_id'), + 'INSERT INTO public.stores (name) VALUES ($1) ON CONFLICT (name) DO NOTHING', + ['New Store'], + ); + expect(mockPoolInstance.query).toHaveBeenCalledWith( + 'SELECT store_id FROM public.stores WHERE name = $1', ['New Store'], ); }); - it('should handle race condition where store is created between SELECT and INSERT', async () => { - const uniqueConstraintError = new Error('duplicate key value violates unique constraint'); - (uniqueConstraintError as Error & { code: string }).code = '23505'; - - mockPoolInstance.query - .mockResolvedValueOnce({ rows: [] }) // First SELECT finds nothing - .mockRejectedValueOnce(uniqueConstraintError) // INSERT fails due to race condition - .mockResolvedValueOnce({ rows: [{ store_id: 3 }] }); // Second SELECT finds the store - - const result = await flyerRepo.findOrCreateStore('Racy Store', mockLogger); - expect(result).toBe(3); - //expect(mockDb.query).toHaveBeenCalledTimes(3); - }); - - it('should throw an error if the database query fails', async () => { + it('should throw an error if the database query fails', async () => { const dbError = new Error('DB Error'); mockPoolInstance.query.mockRejectedValue(dbError); + // The new implementation uses handleDbError, which will throw a generic Error with the default message. await expect(flyerRepo.findOrCreateStore('Any Store', mockLogger)).rejects.toThrow( 'Failed to find or create store in database.', ); + // handleDbError also logs the error. expect(mockLogger.error).toHaveBeenCalledWith( { err: dbError, storeName: 'Any Store' }, 'Database error in findOrCreateStore', ); }); - it('should throw an error if race condition recovery fails', async () => { - const uniqueConstraintError = new Error('duplicate key value violates unique constraint'); - (uniqueConstraintError as Error & { code: string }).code = '23505'; - + it('should throw an error if store is not found after upsert (edge case)', async () => { + // This simulates a very unlikely scenario where the store is deleted between the + // INSERT...ON CONFLICT and the subsequent SELECT. mockPoolInstance.query - .mockResolvedValueOnce({ rows: [] }) // First SELECT - .mockRejectedValueOnce(uniqueConstraintError) // INSERT fails - .mockRejectedValueOnce(new Error('Second select fails')); // Recovery SELECT fails + .mockResolvedValueOnce({ rows: [], rowCount: 1 }) // INSERT succeeds + .mockResolvedValueOnce({ rows: [] }); // SELECT finds nothing - await expect(flyerRepo.findOrCreateStore('Racy Store', mockLogger)).rejects.toThrow( + await expect(flyerRepo.findOrCreateStore('Weird Store', mockLogger)).rejects.toThrow( 'Failed to find or create store in database.', ); expect(mockLogger.error).toHaveBeenCalledWith( - { err: expect.any(Error), storeName: 'Racy Store' }, + { + err: new Error('Failed to find store immediately after upsert operation.'), + storeName: 'Weird Store', + }, 'Database error in findOrCreateStore', ); }); diff --git a/src/services/db/flyer.db.ts b/src/services/db/flyer.db.ts index 6c751195..0443bc1a 100644 --- a/src/services/db/flyer.db.ts +++ b/src/services/db/flyer.db.ts @@ -28,46 +28,32 @@ export class FlyerRepository { * @returns A promise that resolves to the store's ID. */ async findOrCreateStore(storeName: string, logger: Logger): Promise { - // Note: This method should be called within a transaction if the caller - // needs to ensure atomicity with other operations. try { - // First, try to find the store. - let result = await this.db.query<{ store_id: number }>( + // Atomically insert the store if it doesn't exist. This is safe from race conditions. + await this.db.query( + 'INSERT INTO public.stores (name) VALUES ($1) ON CONFLICT (name) DO NOTHING', + [storeName], + ); + + // Now, the store is guaranteed to exist, so we can safely select its ID. + const result = await this.db.query<{ store_id: number }>( 'SELECT store_id FROM public.stores WHERE name = $1', [storeName], ); - if (result.rows.length > 0) { - return result.rows[0].store_id; - } else { - // If not found, create it. - result = await this.db.query<{ store_id: number }>( - 'INSERT INTO public.stores (name) VALUES ($1) RETURNING store_id', - [storeName], - ); - return result.rows[0].store_id; + // This case should be virtually impossible if the INSERT...ON CONFLICT logic is correct, + // as it would mean the store was deleted between the two queries. We throw an error to be safe. + if (result.rows.length === 0) { + throw new Error('Failed to find store immediately after upsert operation.'); } + + return result.rows[0].store_id; } catch (error) { - // Check for a unique constraint violation on name, which could happen in a race condition - // if two processes try to create the same store at the same time. - if (error instanceof Error && 'code' in error && error.code === '23505') { - try { - logger.warn( - { storeName }, - `Race condition avoided: Store was created by another process. Refetching.`, - ); - const result = await this.db.query<{ store_id: number }>( - 'SELECT store_id FROM public.stores WHERE name = $1', - [storeName], - ); - if (result.rows.length > 0) return result.rows[0].store_id; - } catch (recoveryError) { - // If recovery fails, log a warning and fall through to the generic error handler - logger.warn({ err: recoveryError, storeName }, 'Race condition recovery failed'); - } - } - logger.error({ err: error, storeName }, 'Database error in findOrCreateStore'); - throw new Error('Failed to find or create store in database.'); + // Use the centralized error handler for any unexpected database errors. + handleDbError(error, logger, 'Database error in findOrCreateStore', { storeName }, { + // Any error caught here is unexpected, so we use a generic message. + defaultMessage: 'Failed to find or create store in database.', + }); } } diff --git a/src/services/flyerDataTransformer.ts b/src/services/flyerDataTransformer.ts index f3feec59..be2b6f39 100644 --- a/src/services/flyerDataTransformer.ts +++ b/src/services/flyerDataTransformer.ts @@ -75,11 +75,23 @@ export class FlyerDataTransformer { logger.warn('AI did not return a store name. Using fallback "Unknown Store (auto)".'); } - // Construct proper URLs including protocol and host to satisfy DB constraints - const rawBaseUrl = process.env.FRONTEND_URL || process.env.BASE_URL || `http://localhost:${process.env.PORT || 3000}`; - // Normalize base URL by removing any trailing slash to prevent double slashes in the final URL, - // and replace the strict `new URL()` constructor to prevent exceptions in test environments. - const baseUrl = rawBaseUrl.endsWith('/') ? rawBaseUrl.slice(0, -1) : rawBaseUrl; + // Construct proper URLs including protocol and host to satisfy DB constraints. + // This logic is made more robust to handle cases where env vars might be present but invalid (e.g., whitespace or missing protocol). + let baseUrl = (process.env.FRONTEND_URL || process.env.BASE_URL || '').trim(); + + if (!baseUrl || !baseUrl.startsWith('http')) { + const port = process.env.PORT || 3000; + const fallbackUrl = `http://localhost:${port}`; + if (baseUrl) { + // It was set but invalid + logger.warn( + `FRONTEND_URL/BASE_URL is invalid or incomplete ('${baseUrl}'). Falling back to default local URL: ${fallbackUrl}`, + ); + } + baseUrl = fallbackUrl; + } + + baseUrl = baseUrl.endsWith('/') ? baseUrl.slice(0, -1) : baseUrl; const flyerData: FlyerInsert = { file_name: originalFileName, diff --git a/src/services/workers.server.ts b/src/services/workers.server.ts index 47b80574..f31bed75 100644 --- a/src/services/workers.server.ts +++ b/src/services/workers.server.ts @@ -92,6 +92,8 @@ export const flyerWorker = new Worker( { connection, concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10), + // Increase lock duration to prevent jobs from being re-processed prematurely. + lockDuration: parseInt(process.env.WORKER_LOCK_DURATION || '30000', 10), }, ); diff --git a/src/tests/e2e/auth.e2e.test.ts b/src/tests/e2e/auth.e2e.test.ts index 1f5c7cd1..40bc81b1 100644 --- a/src/tests/e2e/auth.e2e.test.ts +++ b/src/tests/e2e/auth.e2e.test.ts @@ -175,8 +175,9 @@ describe('Authentication E2E Flow', () => { createdUserIds.push(registerData.userprofile.user.user_id); // Add a small delay to mitigate potential DB replication lag or race conditions - // where the user might not be found immediately after creation. - await new Promise((resolve) => setTimeout(resolve, 2000)); + // in the test environment. Increased from 2s to 5s to improve stability. + // The root cause is likely environmental slowness in the CI database. + await new Promise((resolve) => setTimeout(resolve, 5000)); // Act 1: Request a password reset. // The test environment returns the token directly in the response for E2E testing. diff --git a/src/tests/integration/flyer-processing.integration.test.ts b/src/tests/integration/flyer-processing.integration.test.ts index 5a046135..7be2deae 100644 --- a/src/tests/integration/flyer-processing.integration.test.ts +++ b/src/tests/integration/flyer-processing.integration.test.ts @@ -101,7 +101,9 @@ describe('Flyer Processing Background Job Integration Test', () => { // Act 2: Poll for the job status until it completes. let jobStatus; - const maxRetries = 60; // Poll for up to 180 seconds (60 * 3s) + // Poll for up to 210 seconds (70 * 3s). This should be greater than the worker's + // lockDuration (120s) to patiently wait for long-running jobs. + const maxRetries = 70; for (let i = 0; i < maxRetries; i++) { console.log(`Polling attempt ${i + 1}...`); await new Promise((resolve) => setTimeout(resolve, 3000)); // Wait 3 seconds between polls