extend timers duration - prevent jobs from timing out after 30secs, increased to 4mins
All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 22m56s
All checks were successful
Deploy to Test Environment / deploy-to-test (push) Successful in 22m56s
This commit is contained in:
@@ -52,6 +52,7 @@ module.exports = {
|
|||||||
SMTP_USER: process.env.SMTP_USER,
|
SMTP_USER: process.env.SMTP_USER,
|
||||||
SMTP_PASS: process.env.SMTP_PASS,
|
SMTP_PASS: process.env.SMTP_PASS,
|
||||||
SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL,
|
SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL,
|
||||||
|
WORKER_LOCK_DURATION: '120000',
|
||||||
},
|
},
|
||||||
// Test Environment Settings
|
// Test Environment Settings
|
||||||
env_test: {
|
env_test: {
|
||||||
@@ -74,6 +75,7 @@ module.exports = {
|
|||||||
SMTP_USER: process.env.SMTP_USER,
|
SMTP_USER: process.env.SMTP_USER,
|
||||||
SMTP_PASS: process.env.SMTP_PASS,
|
SMTP_PASS: process.env.SMTP_PASS,
|
||||||
SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL,
|
SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL,
|
||||||
|
WORKER_LOCK_DURATION: '120000',
|
||||||
},
|
},
|
||||||
// Development Environment Settings
|
// Development Environment Settings
|
||||||
env_development: {
|
env_development: {
|
||||||
@@ -97,6 +99,7 @@ module.exports = {
|
|||||||
SMTP_USER: process.env.SMTP_USER,
|
SMTP_USER: process.env.SMTP_USER,
|
||||||
SMTP_PASS: process.env.SMTP_PASS,
|
SMTP_PASS: process.env.SMTP_PASS,
|
||||||
SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL,
|
SMTP_FROM_EMAIL: process.env.SMTP_FROM_EMAIL,
|
||||||
|
WORKER_LOCK_DURATION: '120000',
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -51,67 +51,72 @@ describe('Flyer DB Service', () => {
|
|||||||
|
|
||||||
describe('findOrCreateStore', () => {
|
describe('findOrCreateStore', () => {
|
||||||
it('should find an existing store and return its ID', async () => {
|
it('should find an existing store and return its ID', async () => {
|
||||||
mockPoolInstance.query.mockResolvedValue({ rows: [{ store_id: 1 }] });
|
// 1. INSERT...ON CONFLICT does nothing. 2. SELECT finds the store.
|
||||||
|
mockPoolInstance.query
|
||||||
|
.mockResolvedValueOnce({ rows: [], rowCount: 0 })
|
||||||
|
.mockResolvedValueOnce({ rows: [{ store_id: 1 }] });
|
||||||
|
|
||||||
const result = await flyerRepo.findOrCreateStore('Existing Store', mockLogger);
|
const result = await flyerRepo.findOrCreateStore('Existing Store', mockLogger);
|
||||||
expect(result).toBe(1);
|
expect(result).toBe(1);
|
||||||
|
expect(mockPoolInstance.query).toHaveBeenCalledTimes(2);
|
||||||
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
||||||
expect.stringContaining('SELECT store_id FROM public.stores WHERE name = $1'),
|
'INSERT INTO public.stores (name) VALUES ($1) ON CONFLICT (name) DO NOTHING',
|
||||||
|
['Existing Store'],
|
||||||
|
);
|
||||||
|
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
||||||
|
'SELECT store_id FROM public.stores WHERE name = $1',
|
||||||
['Existing Store'],
|
['Existing Store'],
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should create a new store if it does not exist', async () => {
|
it('should create a new store if it does not exist and return its ID', async () => {
|
||||||
|
// 1. INSERT...ON CONFLICT creates the store. 2. SELECT finds it.
|
||||||
mockPoolInstance.query
|
mockPoolInstance.query
|
||||||
.mockResolvedValueOnce({ rows: [] }) // First SELECT finds nothing
|
.mockResolvedValueOnce({ rows: [], rowCount: 1 }) // INSERT affects 1 row
|
||||||
.mockResolvedValueOnce({ rows: [{ store_id: 2 }] })
|
.mockResolvedValueOnce({ rows: [{ store_id: 2 }] }); // SELECT finds the new store
|
||||||
|
|
||||||
const result = await flyerRepo.findOrCreateStore('New Store', mockLogger);
|
const result = await flyerRepo.findOrCreateStore('New Store', mockLogger);
|
||||||
expect(result).toBe(2);
|
expect(result).toBe(2);
|
||||||
|
expect(mockPoolInstance.query).toHaveBeenCalledTimes(2);
|
||||||
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
||||||
expect.stringContaining('INSERT INTO public.stores (name) VALUES ($1) RETURNING store_id'),
|
'INSERT INTO public.stores (name) VALUES ($1) ON CONFLICT (name) DO NOTHING',
|
||||||
|
['New Store'],
|
||||||
|
);
|
||||||
|
expect(mockPoolInstance.query).toHaveBeenCalledWith(
|
||||||
|
'SELECT store_id FROM public.stores WHERE name = $1',
|
||||||
['New Store'],
|
['New Store'],
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should handle race condition where store is created between SELECT and INSERT', async () => {
|
it('should throw an error if the database query fails', async () => {
|
||||||
const uniqueConstraintError = new Error('duplicate key value violates unique constraint');
|
|
||||||
(uniqueConstraintError as Error & { code: string }).code = '23505';
|
|
||||||
|
|
||||||
mockPoolInstance.query
|
|
||||||
.mockResolvedValueOnce({ rows: [] }) // First SELECT finds nothing
|
|
||||||
.mockRejectedValueOnce(uniqueConstraintError) // INSERT fails due to race condition
|
|
||||||
.mockResolvedValueOnce({ rows: [{ store_id: 3 }] }); // Second SELECT finds the store
|
|
||||||
|
|
||||||
const result = await flyerRepo.findOrCreateStore('Racy Store', mockLogger);
|
|
||||||
expect(result).toBe(3);
|
|
||||||
//expect(mockDb.query).toHaveBeenCalledTimes(3);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error if the database query fails', async () => {
|
|
||||||
const dbError = new Error('DB Error');
|
const dbError = new Error('DB Error');
|
||||||
mockPoolInstance.query.mockRejectedValue(dbError);
|
mockPoolInstance.query.mockRejectedValue(dbError);
|
||||||
|
// The new implementation uses handleDbError, which will throw a generic Error with the default message.
|
||||||
await expect(flyerRepo.findOrCreateStore('Any Store', mockLogger)).rejects.toThrow(
|
await expect(flyerRepo.findOrCreateStore('Any Store', mockLogger)).rejects.toThrow(
|
||||||
'Failed to find or create store in database.',
|
'Failed to find or create store in database.',
|
||||||
);
|
);
|
||||||
|
// handleDbError also logs the error.
|
||||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
{ err: dbError, storeName: 'Any Store' },
|
{ err: dbError, storeName: 'Any Store' },
|
||||||
'Database error in findOrCreateStore',
|
'Database error in findOrCreateStore',
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw an error if race condition recovery fails', async () => {
|
it('should throw an error if store is not found after upsert (edge case)', async () => {
|
||||||
const uniqueConstraintError = new Error('duplicate key value violates unique constraint');
|
// This simulates a very unlikely scenario where the store is deleted between the
|
||||||
(uniqueConstraintError as Error & { code: string }).code = '23505';
|
// INSERT...ON CONFLICT and the subsequent SELECT.
|
||||||
|
|
||||||
mockPoolInstance.query
|
mockPoolInstance.query
|
||||||
.mockResolvedValueOnce({ rows: [] }) // First SELECT
|
.mockResolvedValueOnce({ rows: [], rowCount: 1 }) // INSERT succeeds
|
||||||
.mockRejectedValueOnce(uniqueConstraintError) // INSERT fails
|
.mockResolvedValueOnce({ rows: [] }); // SELECT finds nothing
|
||||||
.mockRejectedValueOnce(new Error('Second select fails')); // Recovery SELECT fails
|
|
||||||
|
|
||||||
await expect(flyerRepo.findOrCreateStore('Racy Store', mockLogger)).rejects.toThrow(
|
await expect(flyerRepo.findOrCreateStore('Weird Store', mockLogger)).rejects.toThrow(
|
||||||
'Failed to find or create store in database.',
|
'Failed to find or create store in database.',
|
||||||
);
|
);
|
||||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||||
{ err: expect.any(Error), storeName: 'Racy Store' },
|
{
|
||||||
|
err: new Error('Failed to find store immediately after upsert operation.'),
|
||||||
|
storeName: 'Weird Store',
|
||||||
|
},
|
||||||
'Database error in findOrCreateStore',
|
'Database error in findOrCreateStore',
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -28,46 +28,32 @@ export class FlyerRepository {
|
|||||||
* @returns A promise that resolves to the store's ID.
|
* @returns A promise that resolves to the store's ID.
|
||||||
*/
|
*/
|
||||||
async findOrCreateStore(storeName: string, logger: Logger): Promise<number> {
|
async findOrCreateStore(storeName: string, logger: Logger): Promise<number> {
|
||||||
// Note: This method should be called within a transaction if the caller
|
|
||||||
// needs to ensure atomicity with other operations.
|
|
||||||
try {
|
try {
|
||||||
// First, try to find the store.
|
// Atomically insert the store if it doesn't exist. This is safe from race conditions.
|
||||||
let result = await this.db.query<{ store_id: number }>(
|
await this.db.query(
|
||||||
|
'INSERT INTO public.stores (name) VALUES ($1) ON CONFLICT (name) DO NOTHING',
|
||||||
|
[storeName],
|
||||||
|
);
|
||||||
|
|
||||||
|
// Now, the store is guaranteed to exist, so we can safely select its ID.
|
||||||
|
const result = await this.db.query<{ store_id: number }>(
|
||||||
'SELECT store_id FROM public.stores WHERE name = $1',
|
'SELECT store_id FROM public.stores WHERE name = $1',
|
||||||
[storeName],
|
[storeName],
|
||||||
);
|
);
|
||||||
|
|
||||||
if (result.rows.length > 0) {
|
// This case should be virtually impossible if the INSERT...ON CONFLICT logic is correct,
|
||||||
return result.rows[0].store_id;
|
// as it would mean the store was deleted between the two queries. We throw an error to be safe.
|
||||||
} else {
|
if (result.rows.length === 0) {
|
||||||
// If not found, create it.
|
throw new Error('Failed to find store immediately after upsert operation.');
|
||||||
result = await this.db.query<{ store_id: number }>(
|
|
||||||
'INSERT INTO public.stores (name) VALUES ($1) RETURNING store_id',
|
|
||||||
[storeName],
|
|
||||||
);
|
|
||||||
return result.rows[0].store_id;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return result.rows[0].store_id;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// Check for a unique constraint violation on name, which could happen in a race condition
|
// Use the centralized error handler for any unexpected database errors.
|
||||||
// if two processes try to create the same store at the same time.
|
handleDbError(error, logger, 'Database error in findOrCreateStore', { storeName }, {
|
||||||
if (error instanceof Error && 'code' in error && error.code === '23505') {
|
// Any error caught here is unexpected, so we use a generic message.
|
||||||
try {
|
defaultMessage: 'Failed to find or create store in database.',
|
||||||
logger.warn(
|
});
|
||||||
{ storeName },
|
|
||||||
`Race condition avoided: Store was created by another process. Refetching.`,
|
|
||||||
);
|
|
||||||
const result = await this.db.query<{ store_id: number }>(
|
|
||||||
'SELECT store_id FROM public.stores WHERE name = $1',
|
|
||||||
[storeName],
|
|
||||||
);
|
|
||||||
if (result.rows.length > 0) return result.rows[0].store_id;
|
|
||||||
} catch (recoveryError) {
|
|
||||||
// If recovery fails, log a warning and fall through to the generic error handler
|
|
||||||
logger.warn({ err: recoveryError, storeName }, 'Race condition recovery failed');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.error({ err: error, storeName }, 'Database error in findOrCreateStore');
|
|
||||||
throw new Error('Failed to find or create store in database.');
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -75,11 +75,23 @@ export class FlyerDataTransformer {
|
|||||||
logger.warn('AI did not return a store name. Using fallback "Unknown Store (auto)".');
|
logger.warn('AI did not return a store name. Using fallback "Unknown Store (auto)".');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct proper URLs including protocol and host to satisfy DB constraints
|
// Construct proper URLs including protocol and host to satisfy DB constraints.
|
||||||
const rawBaseUrl = process.env.FRONTEND_URL || process.env.BASE_URL || `http://localhost:${process.env.PORT || 3000}`;
|
// This logic is made more robust to handle cases where env vars might be present but invalid (e.g., whitespace or missing protocol).
|
||||||
// Normalize base URL by removing any trailing slash to prevent double slashes in the final URL,
|
let baseUrl = (process.env.FRONTEND_URL || process.env.BASE_URL || '').trim();
|
||||||
// and replace the strict `new URL()` constructor to prevent exceptions in test environments.
|
|
||||||
const baseUrl = rawBaseUrl.endsWith('/') ? rawBaseUrl.slice(0, -1) : rawBaseUrl;
|
if (!baseUrl || !baseUrl.startsWith('http')) {
|
||||||
|
const port = process.env.PORT || 3000;
|
||||||
|
const fallbackUrl = `http://localhost:${port}`;
|
||||||
|
if (baseUrl) {
|
||||||
|
// It was set but invalid
|
||||||
|
logger.warn(
|
||||||
|
`FRONTEND_URL/BASE_URL is invalid or incomplete ('${baseUrl}'). Falling back to default local URL: ${fallbackUrl}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
baseUrl = fallbackUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
baseUrl = baseUrl.endsWith('/') ? baseUrl.slice(0, -1) : baseUrl;
|
||||||
|
|
||||||
const flyerData: FlyerInsert = {
|
const flyerData: FlyerInsert = {
|
||||||
file_name: originalFileName,
|
file_name: originalFileName,
|
||||||
|
|||||||
@@ -92,6 +92,8 @@ export const flyerWorker = new Worker<FlyerJobData>(
|
|||||||
{
|
{
|
||||||
connection,
|
connection,
|
||||||
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
|
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '1', 10),
|
||||||
|
// Increase lock duration to prevent jobs from being re-processed prematurely.
|
||||||
|
lockDuration: parseInt(process.env.WORKER_LOCK_DURATION || '30000', 10),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -175,8 +175,9 @@ describe('Authentication E2E Flow', () => {
|
|||||||
createdUserIds.push(registerData.userprofile.user.user_id);
|
createdUserIds.push(registerData.userprofile.user.user_id);
|
||||||
|
|
||||||
// Add a small delay to mitigate potential DB replication lag or race conditions
|
// Add a small delay to mitigate potential DB replication lag or race conditions
|
||||||
// where the user might not be found immediately after creation.
|
// in the test environment. Increased from 2s to 5s to improve stability.
|
||||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
// The root cause is likely environmental slowness in the CI database.
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 5000));
|
||||||
|
|
||||||
// Act 1: Request a password reset.
|
// Act 1: Request a password reset.
|
||||||
// The test environment returns the token directly in the response for E2E testing.
|
// The test environment returns the token directly in the response for E2E testing.
|
||||||
|
|||||||
@@ -101,7 +101,9 @@ describe('Flyer Processing Background Job Integration Test', () => {
|
|||||||
|
|
||||||
// Act 2: Poll for the job status until it completes.
|
// Act 2: Poll for the job status until it completes.
|
||||||
let jobStatus;
|
let jobStatus;
|
||||||
const maxRetries = 60; // Poll for up to 180 seconds (60 * 3s)
|
// Poll for up to 210 seconds (70 * 3s). This should be greater than the worker's
|
||||||
|
// lockDuration (120s) to patiently wait for long-running jobs.
|
||||||
|
const maxRetries = 70;
|
||||||
for (let i = 0; i < maxRetries; i++) {
|
for (let i = 0; i < maxRetries; i++) {
|
||||||
console.log(`Polling attempt ${i + 1}...`);
|
console.log(`Polling attempt ${i + 1}...`);
|
||||||
await new Promise((resolve) => setTimeout(resolve, 3000)); // Wait 3 seconds between polls
|
await new Promise((resolve) => setTimeout(resolve, 3000)); // Wait 3 seconds between polls
|
||||||
|
|||||||
Reference in New Issue
Block a user