Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65cb54500c | ||
| 664ad291be |
@@ -56,7 +56,8 @@
|
|||||||
"mcp__memory__delete_entities",
|
"mcp__memory__delete_entities",
|
||||||
"mcp__sequential-thinking__sequentialthinking",
|
"mcp__sequential-thinking__sequentialthinking",
|
||||||
"mcp__filesystem__list_directory",
|
"mcp__filesystem__list_directory",
|
||||||
"mcp__filesystem__read_multiple_files"
|
"mcp__filesystem__read_multiple_files",
|
||||||
|
"mcp__filesystem__directory_tree"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,6 +96,23 @@ jobs:
|
|||||||
# It prevents the accumulation of duplicate processes from previous test runs.
|
# It prevents the accumulation of duplicate processes from previous test runs.
|
||||||
node -e "const exec = require('child_process').execSync; try { const list = JSON.parse(exec('pm2 jlist').toString()); list.forEach(p => { if (p.name && p.name.endsWith('-test')) { console.log('Deleting test process: ' + p.name + ' (' + p.pm2_env.pm_id + ')'); try { exec('pm2 delete ' + p.pm2_env.pm_id); } catch(e) { console.error('Failed to delete ' + p.pm2_env.pm_id, e.message); } } }); console.log('✅ Test process cleanup complete.'); } catch (e) { if (e.stdout.toString().includes('No process found')) { console.log('No PM2 processes running, cleanup not needed.'); } else { console.error('Error cleaning up test processes:', e.message); } }" || true
|
node -e "const exec = require('child_process').execSync; try { const list = JSON.parse(exec('pm2 jlist').toString()); list.forEach(p => { if (p.name && p.name.endsWith('-test')) { console.log('Deleting test process: ' + p.name + ' (' + p.pm2_env.pm_id + ')'); try { exec('pm2 delete ' + p.pm2_env.pm_id); } catch(e) { console.error('Failed to delete ' + p.pm2_env.pm_id, e.message); } } }); console.log('✅ Test process cleanup complete.'); } catch (e) { if (e.stdout.toString().includes('No process found')) { console.log('No PM2 processes running, cleanup not needed.'); } else { console.error('Error cleaning up test processes:', e.message); } }" || true
|
||||||
|
|
||||||
|
- name: Flush Redis Before Tests
|
||||||
|
# CRITICAL: Clear all Redis data to remove stale BullMQ jobs from previous test runs.
|
||||||
|
# This prevents old jobs with outdated error messages from polluting test results.
|
||||||
|
env:
|
||||||
|
REDIS_PASSWORD: ${{ secrets.REDIS_PASSWORD_TEST }}
|
||||||
|
run: |
|
||||||
|
echo "--- Flushing Redis database to remove stale jobs ---"
|
||||||
|
if [ -z "$REDIS_PASSWORD" ]; then
|
||||||
|
echo "⚠️ REDIS_PASSWORD_TEST not set, attempting flush without password..."
|
||||||
|
redis-cli FLUSHDB || echo "Redis flush failed (no password)"
|
||||||
|
else
|
||||||
|
redis-cli -a "$REDIS_PASSWORD" FLUSHDB 2>/dev/null && echo "✅ Redis database flushed successfully." || echo "⚠️ Redis flush failed"
|
||||||
|
fi
|
||||||
|
# Verify the flush worked by checking key count
|
||||||
|
KEY_COUNT=$(redis-cli -a "$REDIS_PASSWORD" DBSIZE 2>/dev/null | grep -oE '[0-9]+' || echo "unknown")
|
||||||
|
echo "Redis key count after flush: $KEY_COUNT"
|
||||||
|
|
||||||
- name: Run All Tests and Generate Merged Coverage Report
|
- name: Run All Tests and Generate Merged Coverage Report
|
||||||
# This single step runs both unit and integration tests, then merges their
|
# This single step runs both unit and integration tests, then merges their
|
||||||
# coverage data into a single report. It combines the environment variables
|
# coverage data into a single report. It combines the environment variables
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
**Date**: 2025-12-12
|
**Date**: 2025-12-12
|
||||||
|
|
||||||
**Status**: Proposed
|
**Status**: Accepted
|
||||||
|
|
||||||
## Context
|
## Context
|
||||||
|
|
||||||
@@ -16,3 +16,82 @@ We will implement a dedicated background job processing system using a task queu
|
|||||||
|
|
||||||
**Positive**: Decouples the API from heavy processing, allows for retries on failure, and enables scaling the processing workers independently. Increases application reliability and resilience.
|
**Positive**: Decouples the API from heavy processing, allows for retries on failure, and enables scaling the processing workers independently. Increases application reliability and resilience.
|
||||||
**Negative**: Introduces a new dependency (Redis) into the infrastructure. Requires refactoring of the flyer processing logic to work within a job queue structure.
|
**Negative**: Introduces a new dependency (Redis) into the infrastructure. Requires refactoring of the flyer processing logic to work within a job queue structure.
|
||||||
|
|
||||||
|
## Implementation Details
|
||||||
|
|
||||||
|
### Queue Infrastructure
|
||||||
|
|
||||||
|
The implementation uses **BullMQ v5.65.1** with **ioredis v5.8.2** for Redis connectivity. Six distinct queues handle different job types:
|
||||||
|
|
||||||
|
| Queue Name | Purpose | Retry Attempts | Backoff Strategy |
|
||||||
|
| ---------------------------- | --------------------------- | -------------- | ---------------------- |
|
||||||
|
| `flyer-processing` | OCR/AI processing of flyers | 3 | Exponential (5s base) |
|
||||||
|
| `email-sending` | Email delivery | 5 | Exponential (10s base) |
|
||||||
|
| `analytics-reporting` | Daily report generation | 2 | Exponential (60s base) |
|
||||||
|
| `weekly-analytics-reporting` | Weekly report generation | 2 | Exponential (1h base) |
|
||||||
|
| `file-cleanup` | Temporary file cleanup | 3 | Exponential (30s base) |
|
||||||
|
| `token-cleanup` | Expired token removal | 2 | Exponential (1h base) |
|
||||||
|
|
||||||
|
### Key Files
|
||||||
|
|
||||||
|
- `src/services/queues.server.ts` - Queue definitions and configuration
|
||||||
|
- `src/services/workers.server.ts` - Worker implementations with configurable concurrency
|
||||||
|
- `src/services/redis.server.ts` - Redis connection management
|
||||||
|
- `src/services/queueService.server.ts` - Queue lifecycle and graceful shutdown
|
||||||
|
- `src/services/flyerProcessingService.server.ts` - 5-stage flyer processing pipeline
|
||||||
|
- `src/types/job-data.ts` - TypeScript interfaces for all job data types
|
||||||
|
|
||||||
|
### API Design
|
||||||
|
|
||||||
|
Endpoints for long-running tasks return **202 Accepted** immediately with a job ID:
|
||||||
|
|
||||||
|
```text
|
||||||
|
POST /api/ai/upload-and-process → 202 { jobId: "..." }
|
||||||
|
GET /api/ai/jobs/:jobId/status → { state: "...", progress: ... }
|
||||||
|
```
|
||||||
|
|
||||||
|
### Worker Configuration
|
||||||
|
|
||||||
|
Workers are configured via environment variables:
|
||||||
|
|
||||||
|
- `WORKER_CONCURRENCY` - Flyer processing parallelism (default: 1)
|
||||||
|
- `EMAIL_WORKER_CONCURRENCY` - Email worker parallelism (default: 10)
|
||||||
|
- `ANALYTICS_WORKER_CONCURRENCY` - Analytics worker parallelism (default: 1)
|
||||||
|
- `CLEANUP_WORKER_CONCURRENCY` - Cleanup worker parallelism (default: 10)
|
||||||
|
|
||||||
|
### Monitoring
|
||||||
|
|
||||||
|
- **Bull Board UI** available at `/api/admin/jobs` for admin users
|
||||||
|
- Worker status endpoint: `GET /api/admin/workers/status`
|
||||||
|
- Queue status endpoint: `GET /api/admin/queues/status`
|
||||||
|
|
||||||
|
### Graceful Shutdown
|
||||||
|
|
||||||
|
Both API and worker processes implement graceful shutdown with a 30-second timeout, ensuring in-flight jobs complete before process termination.
|
||||||
|
|
||||||
|
## Compliance Notes
|
||||||
|
|
||||||
|
### Deprecated Synchronous Endpoints
|
||||||
|
|
||||||
|
The following endpoints process flyers synchronously and are **deprecated**:
|
||||||
|
|
||||||
|
- `POST /api/ai/upload-legacy` - For integration testing only
|
||||||
|
- `POST /api/ai/flyers/process` - Legacy workflow, should migrate to queue-based approach
|
||||||
|
|
||||||
|
New integrations MUST use `POST /api/ai/upload-and-process` for queue-based processing.
|
||||||
|
|
||||||
|
### Email Handling
|
||||||
|
|
||||||
|
- **Bulk emails** (deal notifications): Enqueued via `emailQueue`
|
||||||
|
- **Transactional emails** (password reset): Sent synchronously for immediate user feedback
|
||||||
|
|
||||||
|
## Future Enhancements
|
||||||
|
|
||||||
|
Potential improvements for consideration:
|
||||||
|
|
||||||
|
1. **Dead Letter Queue (DLQ)**: Move permanently failed jobs to a dedicated queue for analysis
|
||||||
|
2. **Job Priority Levels**: Allow priority-based processing for different job types
|
||||||
|
3. **Real-time Progress**: WebSocket/SSE for live job progress updates to clients
|
||||||
|
4. **Per-Queue Rate Limiting**: Throttle job processing based on external API limits
|
||||||
|
5. **Job Dependencies**: Support for jobs that depend on completion of other jobs
|
||||||
|
6. **Prometheus Metrics**: Export queue metrics for observability dashboards
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
**Date**: 2025-12-12
|
**Date**: 2025-12-12
|
||||||
|
|
||||||
**Status**: Proposed
|
**Status**: Accepted
|
||||||
|
|
||||||
## Context
|
## Context
|
||||||
|
|
||||||
@@ -20,3 +20,107 @@ We will implement a multi-layered caching strategy using an in-memory data store
|
|||||||
|
|
||||||
**Positive**: Directly addresses application performance and scalability. Reduces database load and improves API response times for common requests.
|
**Positive**: Directly addresses application performance and scalability. Reduces database load and improves API response times for common requests.
|
||||||
**Negative**: Introduces Redis as a dependency if not already used. Adds complexity to the data-fetching logic and requires careful management of cache invalidation to prevent stale data.
|
**Negative**: Introduces Redis as a dependency if not already used. Adds complexity to the data-fetching logic and requires careful management of cache invalidation to prevent stale data.
|
||||||
|
|
||||||
|
## Implementation Details
|
||||||
|
|
||||||
|
### Cache Service
|
||||||
|
|
||||||
|
A centralized cache service (`src/services/cacheService.server.ts`) provides reusable caching functionality:
|
||||||
|
|
||||||
|
- **`getOrSet<T>(key, fetcher, options)`**: Cache-aside pattern implementation
|
||||||
|
- **`get<T>(key)`**: Retrieve cached value
|
||||||
|
- **`set<T>(key, value, ttl)`**: Store value with TTL
|
||||||
|
- **`del(key)`**: Delete specific key
|
||||||
|
- **`invalidatePattern(pattern)`**: Delete keys matching a pattern
|
||||||
|
|
||||||
|
All cache operations are fail-safe - cache failures do not break the application.
|
||||||
|
|
||||||
|
### TTL Configuration
|
||||||
|
|
||||||
|
Different data types use different TTL values based on volatility:
|
||||||
|
|
||||||
|
| Data Type | TTL | Rationale |
|
||||||
|
| ------------------- | --------- | -------------------------------------- |
|
||||||
|
| Brands/Stores | 1 hour | Rarely changes, safe to cache longer |
|
||||||
|
| Flyer lists | 5 minutes | Changes when new flyers are added |
|
||||||
|
| Individual flyers | 10 minutes| Stable once created |
|
||||||
|
| Flyer items | 10 minutes| Stable once created |
|
||||||
|
| Statistics | 5 minutes | Can be slightly stale |
|
||||||
|
| Frequent sales | 15 minutes| Aggregated data, updated periodically |
|
||||||
|
| Categories | 1 hour | Rarely changes |
|
||||||
|
|
||||||
|
### Cache Key Strategy
|
||||||
|
|
||||||
|
Cache keys follow a consistent prefix pattern for pattern-based invalidation:
|
||||||
|
|
||||||
|
- `cache:brands` - All brands list
|
||||||
|
- `cache:flyers:{limit}:{offset}` - Paginated flyer lists
|
||||||
|
- `cache:flyer:{id}` - Individual flyer data
|
||||||
|
- `cache:flyer-items:{flyerId}` - Items for a specific flyer
|
||||||
|
- `cache:stats:*` - Statistics data
|
||||||
|
- `geocode:{address}` - Geocoding results (30-day TTL)
|
||||||
|
|
||||||
|
### Cached Endpoints
|
||||||
|
|
||||||
|
The following repository methods implement server-side caching:
|
||||||
|
|
||||||
|
| Method | Cache Key Pattern | TTL |
|
||||||
|
| ------ | ----------------- | --- |
|
||||||
|
| `FlyerRepository.getAllBrands()` | `cache:brands` | 1 hour |
|
||||||
|
| `FlyerRepository.getFlyers()` | `cache:flyers:{limit}:{offset}` | 5 minutes |
|
||||||
|
| `FlyerRepository.getFlyerItems()` | `cache:flyer-items:{flyerId}` | 10 minutes |
|
||||||
|
|
||||||
|
### Cache Invalidation
|
||||||
|
|
||||||
|
**Event-based invalidation** is triggered on write operations:
|
||||||
|
|
||||||
|
- **Flyer creation** (`FlyerPersistenceService.saveFlyer`): Invalidates all `cache:flyers*` keys
|
||||||
|
- **Flyer deletion** (`FlyerRepository.deleteFlyer`): Invalidates specific flyer and flyer items cache, plus flyer lists
|
||||||
|
|
||||||
|
**Manual invalidation** via admin endpoints:
|
||||||
|
|
||||||
|
- `POST /api/admin/system/clear-cache` - Clears all application cache (flyers, brands, stats)
|
||||||
|
- `POST /api/admin/system/clear-geocode-cache` - Clears geocoding cache
|
||||||
|
|
||||||
|
### Client-Side Caching
|
||||||
|
|
||||||
|
TanStack React Query provides client-side caching with configurable stale times:
|
||||||
|
|
||||||
|
| Query Type | Stale Time |
|
||||||
|
| ----------------- | ----------- |
|
||||||
|
| Categories | 1 hour |
|
||||||
|
| Master Items | 10 minutes |
|
||||||
|
| Flyer Items | 5 minutes |
|
||||||
|
| Flyers | 2 minutes |
|
||||||
|
| Shopping Lists | 1 minute |
|
||||||
|
| Activity Log | 30 seconds |
|
||||||
|
|
||||||
|
### Multi-Layer Cache Architecture
|
||||||
|
|
||||||
|
```text
|
||||||
|
Client Request
|
||||||
|
↓
|
||||||
|
[TanStack React Query] ← Client-side cache (staleTime-based)
|
||||||
|
↓
|
||||||
|
[Express API]
|
||||||
|
↓
|
||||||
|
[CacheService.getOrSet()] ← Server-side Redis cache (TTL-based)
|
||||||
|
↓
|
||||||
|
[PostgreSQL Database]
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key Files
|
||||||
|
|
||||||
|
- `src/services/cacheService.server.ts` - Centralized cache service
|
||||||
|
- `src/services/db/flyer.db.ts` - Repository with caching for brands, flyers, flyer items
|
||||||
|
- `src/services/flyerPersistenceService.server.ts` - Cache invalidation on flyer creation
|
||||||
|
- `src/routes/admin.routes.ts` - Admin cache management endpoints
|
||||||
|
- `src/config/queryClient.ts` - Client-side query cache configuration
|
||||||
|
|
||||||
|
## Future Enhancements
|
||||||
|
|
||||||
|
1. **Recipe caching**: Add caching to expensive recipe queries (by-sale-percentage, etc.)
|
||||||
|
2. **Cache warming**: Pre-populate cache on startup for frequently accessed static data
|
||||||
|
3. **Cache metrics**: Add hit/miss rate monitoring for observability
|
||||||
|
4. **Conditional caching**: Skip cache for authenticated user-specific data
|
||||||
|
5. **Cache compression**: Compress large cached payloads to reduce Redis memory usage
|
||||||
|
|||||||
4
package-lock.json
generated
4
package-lock.json
generated
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "flyer-crawler",
|
"name": "flyer-crawler",
|
||||||
"version": "0.9.67",
|
"version": "0.9.68",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "flyer-crawler",
|
"name": "flyer-crawler",
|
||||||
"version": "0.9.67",
|
"version": "0.9.68",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@bull-board/api": "^6.14.2",
|
"@bull-board/api": "^6.14.2",
|
||||||
"@bull-board/express": "^6.14.2",
|
"@bull-board/express": "^6.14.2",
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "flyer-crawler",
|
"name": "flyer-crawler",
|
||||||
"private": true,
|
"private": true,
|
||||||
"version": "0.9.67",
|
"version": "0.9.68",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"dev": "concurrently \"npm:start:dev\" \"vite\"",
|
"dev": "concurrently \"npm:start:dev\" \"vite\"",
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import { z } from 'zod';
|
|||||||
import * as db from '../services/db/index.db';
|
import * as db from '../services/db/index.db';
|
||||||
import type { UserProfile } from '../types';
|
import type { UserProfile } from '../types';
|
||||||
import { geocodingService } from '../services/geocodingService.server';
|
import { geocodingService } from '../services/geocodingService.server';
|
||||||
|
import { cacheService } from '../services/cacheService.server';
|
||||||
import { requireFileUpload } from '../middleware/fileUpload.middleware'; // This was a duplicate, fixed.
|
import { requireFileUpload } from '../middleware/fileUpload.middleware'; // This was a duplicate, fixed.
|
||||||
import {
|
import {
|
||||||
createUploadMiddleware,
|
createUploadMiddleware,
|
||||||
@@ -635,6 +636,44 @@ router.post(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POST /api/admin/system/clear-cache - Clears the application data cache.
|
||||||
|
* Clears cached flyers, brands, and stats data from Redis.
|
||||||
|
* Requires admin privileges.
|
||||||
|
*/
|
||||||
|
router.post(
|
||||||
|
'/system/clear-cache',
|
||||||
|
adminTriggerLimiter,
|
||||||
|
validateRequest(emptySchema),
|
||||||
|
async (req: Request, res: Response, next: NextFunction) => {
|
||||||
|
const userProfile = req.user as UserProfile;
|
||||||
|
req.log.info(
|
||||||
|
`[Admin] Manual cache clear received from user: ${userProfile.user.user_id}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const [flyersDeleted, brandsDeleted, statsDeleted] = await Promise.all([
|
||||||
|
cacheService.invalidateFlyers(req.log),
|
||||||
|
cacheService.invalidateBrands(req.log),
|
||||||
|
cacheService.invalidateStats(req.log),
|
||||||
|
]);
|
||||||
|
|
||||||
|
const totalDeleted = flyersDeleted + brandsDeleted + statsDeleted;
|
||||||
|
res.status(200).json({
|
||||||
|
message: `Successfully cleared the application cache. ${totalDeleted} keys were removed.`,
|
||||||
|
details: {
|
||||||
|
flyers: flyersDeleted,
|
||||||
|
brands: brandsDeleted,
|
||||||
|
stats: statsDeleted,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
req.log.error({ error }, '[Admin] Failed to clear application cache.');
|
||||||
|
next(error);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
/* Catches errors from multer (e.g., file size, file filter) */
|
/* Catches errors from multer (e.g., file size, file filter) */
|
||||||
router.use(handleMulterError);
|
router.use(handleMulterError);
|
||||||
|
|
||||||
|
|||||||
@@ -234,6 +234,9 @@ router.post(
|
|||||||
* POST /api/ai/upload-legacy - Process a flyer upload from a legacy client.
|
* POST /api/ai/upload-legacy - Process a flyer upload from a legacy client.
|
||||||
* This is an authenticated route that processes the flyer synchronously.
|
* This is an authenticated route that processes the flyer synchronously.
|
||||||
* This is used for integration testing the legacy upload flow.
|
* This is used for integration testing the legacy upload flow.
|
||||||
|
*
|
||||||
|
* @deprecated Use POST /api/ai/upload-and-process instead for async queue-based processing (ADR-0006).
|
||||||
|
* This synchronous endpoint is retained only for integration testing purposes.
|
||||||
*/
|
*/
|
||||||
router.post(
|
router.post(
|
||||||
'/upload-legacy',
|
'/upload-legacy',
|
||||||
@@ -282,9 +285,12 @@ router.get(
|
|||||||
);
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This endpoint saves the processed flyer data to the database. It is the final step
|
* POST /api/ai/flyers/process - Saves the processed flyer data to the database.
|
||||||
* in the flyer upload workflow after the AI has extracted the data.
|
* This is the final step in the flyer upload workflow after the AI has extracted the data.
|
||||||
* It uses `optionalAuth` to handle submissions from both anonymous and authenticated users.
|
* It uses `optionalAuth` to handle submissions from both anonymous and authenticated users.
|
||||||
|
*
|
||||||
|
* @deprecated Use POST /api/ai/upload-and-process instead for async queue-based processing (ADR-0006).
|
||||||
|
* This synchronous endpoint processes flyers inline and should be migrated to the queue-based approach.
|
||||||
*/
|
*/
|
||||||
router.post(
|
router.post(
|
||||||
'/flyers/process',
|
'/flyers/process',
|
||||||
|
|||||||
226
src/services/cacheService.server.ts
Normal file
226
src/services/cacheService.server.ts
Normal file
@@ -0,0 +1,226 @@
|
|||||||
|
// src/services/cacheService.server.ts
|
||||||
|
/**
|
||||||
|
* @file Centralized caching service implementing the Cache-Aside pattern.
|
||||||
|
* This service provides a reusable wrapper around Redis for caching read-heavy operations.
|
||||||
|
* See ADR-009 for the caching strategy documentation.
|
||||||
|
*/
|
||||||
|
import type { Logger } from 'pino';
|
||||||
|
import { connection as redis } from './redis.server';
|
||||||
|
import { logger as globalLogger } from './logger.server';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TTL values in seconds for different cache types.
|
||||||
|
* These can be tuned based on data volatility and freshness requirements.
|
||||||
|
*/
|
||||||
|
export const CACHE_TTL = {
|
||||||
|
/** Brand/store list - rarely changes, safe to cache for 1 hour */
|
||||||
|
BRANDS: 60 * 60,
|
||||||
|
/** Flyer list - changes when new flyers are added, cache for 5 minutes */
|
||||||
|
FLYERS: 5 * 60,
|
||||||
|
/** Individual flyer data - cache for 10 minutes */
|
||||||
|
FLYER: 10 * 60,
|
||||||
|
/** Flyer items - cache for 10 minutes */
|
||||||
|
FLYER_ITEMS: 10 * 60,
|
||||||
|
/** Statistics - can be slightly stale, cache for 5 minutes */
|
||||||
|
STATS: 5 * 60,
|
||||||
|
/** Most frequent sales - aggregated data, cache for 15 minutes */
|
||||||
|
FREQUENT_SALES: 15 * 60,
|
||||||
|
/** Categories - rarely changes, cache for 1 hour */
|
||||||
|
CATEGORIES: 60 * 60,
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache key prefixes for different data types.
|
||||||
|
* Using consistent prefixes allows for pattern-based invalidation.
|
||||||
|
*/
|
||||||
|
export const CACHE_PREFIX = {
|
||||||
|
BRANDS: 'cache:brands',
|
||||||
|
FLYERS: 'cache:flyers',
|
||||||
|
FLYER: 'cache:flyer',
|
||||||
|
FLYER_ITEMS: 'cache:flyer-items',
|
||||||
|
STATS: 'cache:stats',
|
||||||
|
FREQUENT_SALES: 'cache:frequent-sales',
|
||||||
|
CATEGORIES: 'cache:categories',
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
export interface CacheOptions {
|
||||||
|
/** Time-to-live in seconds */
|
||||||
|
ttl: number;
|
||||||
|
/** Optional logger for this operation */
|
||||||
|
logger?: Logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Centralized cache service implementing the Cache-Aside pattern.
|
||||||
|
* All cache operations are fail-safe - cache failures do not break the application.
|
||||||
|
*/
|
||||||
|
class CacheService {
|
||||||
|
/**
|
||||||
|
* Retrieves a value from cache.
|
||||||
|
* @param key The cache key
|
||||||
|
* @param logger Optional logger for this operation
|
||||||
|
* @returns The cached value or null if not found/error
|
||||||
|
*/
|
||||||
|
async get<T>(key: string, logger: Logger = globalLogger): Promise<T | null> {
|
||||||
|
try {
|
||||||
|
const cached = await redis.get(key);
|
||||||
|
if (cached) {
|
||||||
|
logger.debug({ cacheKey: key }, 'Cache hit');
|
||||||
|
return JSON.parse(cached) as T;
|
||||||
|
}
|
||||||
|
logger.debug({ cacheKey: key }, 'Cache miss');
|
||||||
|
return null;
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn({ err: error, cacheKey: key }, 'Redis GET failed, proceeding without cache');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores a value in cache with TTL.
|
||||||
|
* @param key The cache key
|
||||||
|
* @param value The value to cache (will be JSON stringified)
|
||||||
|
* @param ttl Time-to-live in seconds
|
||||||
|
* @param logger Optional logger for this operation
|
||||||
|
*/
|
||||||
|
async set<T>(key: string, value: T, ttl: number, logger: Logger = globalLogger): Promise<void> {
|
||||||
|
try {
|
||||||
|
await redis.set(key, JSON.stringify(value), 'EX', ttl);
|
||||||
|
logger.debug({ cacheKey: key, ttl }, 'Value cached');
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn({ err: error, cacheKey: key }, 'Redis SET failed, value not cached');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes a specific key from cache.
|
||||||
|
* @param key The cache key to delete
|
||||||
|
* @param logger Optional logger for this operation
|
||||||
|
*/
|
||||||
|
async del(key: string, logger: Logger = globalLogger): Promise<void> {
|
||||||
|
try {
|
||||||
|
await redis.del(key);
|
||||||
|
logger.debug({ cacheKey: key }, 'Cache key deleted');
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn({ err: error, cacheKey: key }, 'Redis DEL failed');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates all cache keys matching a pattern.
|
||||||
|
* Uses SCAN for safe iteration over large key sets.
|
||||||
|
* @param pattern The pattern to match (e.g., 'cache:flyers*')
|
||||||
|
* @param logger Optional logger for this operation
|
||||||
|
* @returns The number of keys deleted
|
||||||
|
*/
|
||||||
|
async invalidatePattern(pattern: string, logger: Logger = globalLogger): Promise<number> {
|
||||||
|
let cursor = '0';
|
||||||
|
let totalDeleted = 0;
|
||||||
|
|
||||||
|
try {
|
||||||
|
do {
|
||||||
|
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
|
||||||
|
cursor = nextCursor;
|
||||||
|
if (keys.length > 0) {
|
||||||
|
const deletedCount = await redis.del(...keys);
|
||||||
|
totalDeleted += deletedCount;
|
||||||
|
}
|
||||||
|
} while (cursor !== '0');
|
||||||
|
|
||||||
|
logger.info({ pattern, totalDeleted }, 'Cache invalidation completed');
|
||||||
|
return totalDeleted;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error({ err: error, pattern }, 'Cache invalidation failed');
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements the Cache-Aside pattern: try cache first, fall back to fetcher, cache result.
|
||||||
|
* This is the primary method for adding caching to existing repository methods.
|
||||||
|
*
|
||||||
|
* @param key The cache key
|
||||||
|
* @param fetcher Function that retrieves data from the source (e.g., database)
|
||||||
|
* @param options Cache options including TTL
|
||||||
|
* @returns The data (from cache or fetcher)
|
||||||
|
*
|
||||||
|
* @example
|
||||||
|
* ```typescript
|
||||||
|
* const brands = await cacheService.getOrSet(
|
||||||
|
* CACHE_PREFIX.BRANDS,
|
||||||
|
* () => this.db.query('SELECT * FROM stores'),
|
||||||
|
* { ttl: CACHE_TTL.BRANDS, logger }
|
||||||
|
* );
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
async getOrSet<T>(
|
||||||
|
key: string,
|
||||||
|
fetcher: () => Promise<T>,
|
||||||
|
options: CacheOptions,
|
||||||
|
): Promise<T> {
|
||||||
|
const logger = options.logger ?? globalLogger;
|
||||||
|
|
||||||
|
// Try to get from cache first
|
||||||
|
const cached = await this.get<T>(key, logger);
|
||||||
|
if (cached !== null) {
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache miss - fetch from source
|
||||||
|
const data = await fetcher();
|
||||||
|
|
||||||
|
// Cache the result (fire-and-forget, don't await)
|
||||||
|
this.set(key, data, options.ttl, logger).catch(() => {
|
||||||
|
// Error already logged in set()
|
||||||
|
});
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Convenience methods for specific cache types ---
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates all brand-related cache entries.
|
||||||
|
*/
|
||||||
|
async invalidateBrands(logger: Logger = globalLogger): Promise<number> {
|
||||||
|
return this.invalidatePattern(`${CACHE_PREFIX.BRANDS}*`, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates all flyer-related cache entries.
|
||||||
|
*/
|
||||||
|
async invalidateFlyers(logger: Logger = globalLogger): Promise<number> {
|
||||||
|
const patterns = [
|
||||||
|
`${CACHE_PREFIX.FLYERS}*`,
|
||||||
|
`${CACHE_PREFIX.FLYER}*`,
|
||||||
|
`${CACHE_PREFIX.FLYER_ITEMS}*`,
|
||||||
|
];
|
||||||
|
|
||||||
|
let total = 0;
|
||||||
|
for (const pattern of patterns) {
|
||||||
|
total += await this.invalidatePattern(pattern, logger);
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates cache for a specific flyer and its items.
|
||||||
|
*/
|
||||||
|
async invalidateFlyer(flyerId: number, logger: Logger = globalLogger): Promise<void> {
|
||||||
|
await Promise.all([
|
||||||
|
this.del(`${CACHE_PREFIX.FLYER}:${flyerId}`, logger),
|
||||||
|
this.del(`${CACHE_PREFIX.FLYER_ITEMS}:${flyerId}`, logger),
|
||||||
|
// Also invalidate the flyers list since it may contain this flyer
|
||||||
|
this.invalidatePattern(`${CACHE_PREFIX.FLYERS}*`, logger),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates all statistics cache entries.
|
||||||
|
*/
|
||||||
|
async invalidateStats(logger: Logger = globalLogger): Promise<number> {
|
||||||
|
return this.invalidatePattern(`${CACHE_PREFIX.STATS}*`, logger);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const cacheService = new CacheService();
|
||||||
@@ -3,6 +3,7 @@ import type { Pool, PoolClient } from 'pg';
|
|||||||
import { getPool, withTransaction } from './connection.db';
|
import { getPool, withTransaction } from './connection.db';
|
||||||
import type { Logger } from 'pino';
|
import type { Logger } from 'pino';
|
||||||
import { UniqueConstraintError, NotFoundError, handleDbError } from './errors.db';
|
import { UniqueConstraintError, NotFoundError, handleDbError } from './errors.db';
|
||||||
|
import { cacheService, CACHE_TTL, CACHE_PREFIX } from '../cacheService.server';
|
||||||
import type {
|
import type {
|
||||||
Flyer,
|
Flyer,
|
||||||
FlyerItem,
|
FlyerItem,
|
||||||
@@ -229,22 +230,31 @@ export class FlyerRepository {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves all distinct brands from the stores table.
|
* Retrieves all distinct brands from the stores table.
|
||||||
|
* Uses cache-aside pattern with 1-hour TTL (brands rarely change).
|
||||||
* @returns A promise that resolves to an array of Brand objects.
|
* @returns A promise that resolves to an array of Brand objects.
|
||||||
*/
|
*/
|
||||||
async getAllBrands(logger: Logger): Promise<Brand[]> {
|
async getAllBrands(logger: Logger): Promise<Brand[]> {
|
||||||
try {
|
const cacheKey = CACHE_PREFIX.BRANDS;
|
||||||
const query = `
|
|
||||||
SELECT s.store_id as brand_id, s.name, s.logo_url, s.created_at, s.updated_at
|
return cacheService.getOrSet<Brand[]>(
|
||||||
FROM public.stores s
|
cacheKey,
|
||||||
ORDER BY s.name;
|
async () => {
|
||||||
`;
|
try {
|
||||||
const res = await this.db.query<Brand>(query);
|
const query = `
|
||||||
return res.rows;
|
SELECT s.store_id as brand_id, s.name, s.logo_url, s.created_at, s.updated_at
|
||||||
} catch (error) {
|
FROM public.stores s
|
||||||
handleDbError(error, logger, 'Database error in getAllBrands', {}, {
|
ORDER BY s.name;
|
||||||
defaultMessage: 'Failed to retrieve brands from database.',
|
`;
|
||||||
});
|
const res = await this.db.query<Brand>(query);
|
||||||
}
|
return res.rows;
|
||||||
|
} catch (error) {
|
||||||
|
handleDbError(error, logger, 'Database error in getAllBrands', {}, {
|
||||||
|
defaultMessage: 'Failed to retrieve brands from database.',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ ttl: CACHE_TTL.BRANDS, logger },
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -262,49 +272,67 @@ export class FlyerRepository {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves all flyers from the database, ordered by creation date.
|
* Retrieves all flyers from the database, ordered by creation date.
|
||||||
|
* Uses cache-aside pattern with 5-minute TTL.
|
||||||
* @param limit The maximum number of flyers to return.
|
* @param limit The maximum number of flyers to return.
|
||||||
* @param offset The number of flyers to skip.
|
* @param offset The number of flyers to skip.
|
||||||
* @returns A promise that resolves to an array of Flyer objects.
|
* @returns A promise that resolves to an array of Flyer objects.
|
||||||
*/
|
*/
|
||||||
async getFlyers(logger: Logger, limit: number = 20, offset: number = 0): Promise<Flyer[]> {
|
async getFlyers(logger: Logger, limit: number = 20, offset: number = 0): Promise<Flyer[]> {
|
||||||
try {
|
const cacheKey = `${CACHE_PREFIX.FLYERS}:${limit}:${offset}`;
|
||||||
const query = `
|
|
||||||
SELECT
|
return cacheService.getOrSet<Flyer[]>(
|
||||||
f.*,
|
cacheKey,
|
||||||
json_build_object(
|
async () => {
|
||||||
'store_id', s.store_id,
|
try {
|
||||||
'name', s.name,
|
const query = `
|
||||||
'logo_url', s.logo_url
|
SELECT
|
||||||
) as store
|
f.*,
|
||||||
FROM public.flyers f
|
json_build_object(
|
||||||
JOIN public.stores s ON f.store_id = s.store_id
|
'store_id', s.store_id,
|
||||||
ORDER BY f.created_at DESC LIMIT $1 OFFSET $2`;
|
'name', s.name,
|
||||||
const res = await this.db.query<Flyer>(query, [limit, offset]);
|
'logo_url', s.logo_url
|
||||||
return res.rows;
|
) as store
|
||||||
} catch (error) {
|
FROM public.flyers f
|
||||||
handleDbError(error, logger, 'Database error in getFlyers', { limit, offset }, {
|
JOIN public.stores s ON f.store_id = s.store_id
|
||||||
defaultMessage: 'Failed to retrieve flyers from database.',
|
ORDER BY f.created_at DESC LIMIT $1 OFFSET $2`;
|
||||||
});
|
const res = await this.db.query<Flyer>(query, [limit, offset]);
|
||||||
}
|
return res.rows;
|
||||||
|
} catch (error) {
|
||||||
|
handleDbError(error, logger, 'Database error in getFlyers', { limit, offset }, {
|
||||||
|
defaultMessage: 'Failed to retrieve flyers from database.',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ ttl: CACHE_TTL.FLYERS, logger },
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves all items for a specific flyer.
|
* Retrieves all items for a specific flyer.
|
||||||
|
* Uses cache-aside pattern with 10-minute TTL.
|
||||||
* @param flyerId The ID of the flyer.
|
* @param flyerId The ID of the flyer.
|
||||||
* @returns A promise that resolves to an array of FlyerItem objects.
|
* @returns A promise that resolves to an array of FlyerItem objects.
|
||||||
*/
|
*/
|
||||||
async getFlyerItems(flyerId: number, logger: Logger): Promise<FlyerItem[]> {
|
async getFlyerItems(flyerId: number, logger: Logger): Promise<FlyerItem[]> {
|
||||||
try {
|
const cacheKey = `${CACHE_PREFIX.FLYER_ITEMS}:${flyerId}`;
|
||||||
const res = await this.db.query<FlyerItem>(
|
|
||||||
'SELECT * FROM public.flyer_items WHERE flyer_id = $1 ORDER BY flyer_item_id ASC',
|
return cacheService.getOrSet<FlyerItem[]>(
|
||||||
[flyerId],
|
cacheKey,
|
||||||
);
|
async () => {
|
||||||
return res.rows;
|
try {
|
||||||
} catch (error) {
|
const res = await this.db.query<FlyerItem>(
|
||||||
handleDbError(error, logger, 'Database error in getFlyerItems', { flyerId }, {
|
'SELECT * FROM public.flyer_items WHERE flyer_id = $1 ORDER BY flyer_item_id ASC',
|
||||||
defaultMessage: 'Failed to retrieve flyer items from database.',
|
[flyerId],
|
||||||
});
|
);
|
||||||
}
|
return res.rows;
|
||||||
|
} catch (error) {
|
||||||
|
handleDbError(error, logger, 'Database error in getFlyerItems', { flyerId }, {
|
||||||
|
defaultMessage: 'Failed to retrieve flyer items from database.',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ ttl: CACHE_TTL.FLYER_ITEMS, logger },
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -399,6 +427,7 @@ export class FlyerRepository {
|
|||||||
/**
|
/**
|
||||||
* Deletes a flyer and all its associated items in a transaction.
|
* Deletes a flyer and all its associated items in a transaction.
|
||||||
* This should typically be an admin-only action.
|
* This should typically be an admin-only action.
|
||||||
|
* Invalidates related cache entries after successful deletion.
|
||||||
* @param flyerId The ID of the flyer to delete.
|
* @param flyerId The ID of the flyer to delete.
|
||||||
*/
|
*/
|
||||||
async deleteFlyer(flyerId: number, logger: Logger): Promise<void> {
|
async deleteFlyer(flyerId: number, logger: Logger): Promise<void> {
|
||||||
@@ -413,6 +442,9 @@ export class FlyerRepository {
|
|||||||
}
|
}
|
||||||
logger.info(`Successfully deleted flyer with ID: ${flyerId}`);
|
logger.info(`Successfully deleted flyer with ID: ${flyerId}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Invalidate cache after successful deletion
|
||||||
|
await cacheService.invalidateFlyer(flyerId, logger);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
handleDbError(error, logger, 'Database transaction error in deleteFlyer', { flyerId }, {
|
handleDbError(error, logger, 'Database transaction error in deleteFlyer', { flyerId }, {
|
||||||
defaultMessage: 'Failed to delete flyer.',
|
defaultMessage: 'Failed to delete flyer.',
|
||||||
|
|||||||
@@ -4,12 +4,13 @@ import { withTransaction } from './db/connection.db';
|
|||||||
import { createFlyerAndItems } from './db/flyer.db';
|
import { createFlyerAndItems } from './db/flyer.db';
|
||||||
import { AdminRepository } from './db/admin.db';
|
import { AdminRepository } from './db/admin.db';
|
||||||
import { GamificationRepository } from './db/gamification.db';
|
import { GamificationRepository } from './db/gamification.db';
|
||||||
|
import { cacheService } from './cacheService.server';
|
||||||
import type { FlyerInsert, FlyerItemInsert, Flyer } from '../types';
|
import type { FlyerInsert, FlyerItemInsert, Flyer } from '../types';
|
||||||
|
|
||||||
export class FlyerPersistenceService {
|
export class FlyerPersistenceService {
|
||||||
/**
|
/**
|
||||||
* Saves the flyer and its items to the database within a transaction.
|
* Saves the flyer and its items to the database within a transaction.
|
||||||
* Also logs the activity.
|
* Also logs the activity and invalidates related cache entries.
|
||||||
*/
|
*/
|
||||||
async saveFlyer(
|
async saveFlyer(
|
||||||
flyerData: FlyerInsert,
|
flyerData: FlyerInsert,
|
||||||
@@ -17,7 +18,7 @@ export class FlyerPersistenceService {
|
|||||||
userId: string | undefined,
|
userId: string | undefined,
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
): Promise<Flyer> {
|
): Promise<Flyer> {
|
||||||
return withTransaction(async (client) => {
|
const flyer = await withTransaction(async (client) => {
|
||||||
const { flyer, items } = await createFlyerAndItems(flyerData, itemsForDb, logger, client);
|
const { flyer, items } = await createFlyerAndItems(flyerData, itemsForDb, logger, client);
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -43,5 +44,12 @@ export class FlyerPersistenceService {
|
|||||||
}
|
}
|
||||||
return flyer;
|
return flyer;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Invalidate flyer list cache after successful creation (fire-and-forget)
|
||||||
|
cacheService.invalidateFlyers(logger).catch(() => {
|
||||||
|
// Error already logged in invalidateFlyers
|
||||||
|
});
|
||||||
|
|
||||||
|
return flyer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -14,22 +14,34 @@ let globalPool: ReturnType<typeof getPool> | null = null;
|
|||||||
* This is critical because old jobs with outdated error messages can pollute test results.
|
* This is critical because old jobs with outdated error messages can pollute test results.
|
||||||
*/
|
*/
|
||||||
async function cleanAllQueues() {
|
async function cleanAllQueues() {
|
||||||
console.log(`[PID:${process.pid}] Cleaning all BullMQ queues...`);
|
// Use console.error for visibility in CI logs (stderr is often more reliable)
|
||||||
const { flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue } = await import('../../services/queues.server');
|
console.error(`[PID:${process.pid}] [QUEUE CLEANUP] Starting BullMQ queue cleanup...`);
|
||||||
|
|
||||||
const queues = [flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue];
|
try {
|
||||||
|
const { flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue } = await import('../../services/queues.server');
|
||||||
|
console.error(`[QUEUE CLEANUP] Successfully imported queue modules`);
|
||||||
|
|
||||||
for (const queue of queues) {
|
const queues = [flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue];
|
||||||
try {
|
|
||||||
// obliterate() removes ALL data associated with the queue from Redis
|
for (const queue of queues) {
|
||||||
await queue.obliterate({ force: true });
|
try {
|
||||||
console.log(` ✅ Cleaned queue: ${queue.name}`);
|
// Log queue state before cleanup
|
||||||
} catch (error) {
|
const jobCounts = await queue.getJobCounts();
|
||||||
// Log but don't fail - the queue might not exist yet
|
console.error(`[QUEUE CLEANUP] Queue "${queue.name}" before cleanup: ${JSON.stringify(jobCounts)}`);
|
||||||
console.log(` ⚠️ Could not clean queue ${queue.name}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
|
||||||
|
// obliterate() removes ALL data associated with the queue from Redis
|
||||||
|
await queue.obliterate({ force: true });
|
||||||
|
console.error(` ✅ [QUEUE CLEANUP] Cleaned queue: ${queue.name}`);
|
||||||
|
} catch (error) {
|
||||||
|
// Log but don't fail - the queue might not exist yet
|
||||||
|
console.error(` ⚠️ [QUEUE CLEANUP] Could not clean queue ${queue.name}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
console.error(`✅ [PID:${process.pid}] [QUEUE CLEANUP] All queues cleaned successfully.`);
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`❌ [PID:${process.pid}] [QUEUE CLEANUP] CRITICAL ERROR during queue cleanup:`, error);
|
||||||
|
// Don't throw - we want the tests to continue even if cleanup fails
|
||||||
}
|
}
|
||||||
console.log(`✅ [PID:${process.pid}] All queues cleaned.`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function setup() {
|
export async function setup() {
|
||||||
@@ -38,11 +50,15 @@ export async function setup() {
|
|||||||
// Fix: Set the FRONTEND_URL globally for the test server instance
|
// Fix: Set the FRONTEND_URL globally for the test server instance
|
||||||
process.env.FRONTEND_URL = 'https://example.com';
|
process.env.FRONTEND_URL = 'https://example.com';
|
||||||
|
|
||||||
console.log(`\n--- [PID:${process.pid}] Running Integration Test GLOBAL Setup ---`);
|
console.error(`\n--- [PID:${process.pid}] Running Integration Test GLOBAL Setup ---`);
|
||||||
|
console.error(`[SETUP] REDIS_URL: ${process.env.REDIS_URL}`);
|
||||||
|
console.error(`[SETUP] REDIS_PASSWORD is set: ${!!process.env.REDIS_PASSWORD}`);
|
||||||
|
|
||||||
// CRITICAL: Clean all queues BEFORE running any tests to remove stale jobs
|
// CRITICAL: Clean all queues BEFORE running any tests to remove stale jobs
|
||||||
// from previous test runs that may have outdated error messages.
|
// from previous test runs that may have outdated error messages.
|
||||||
|
console.error(`[SETUP] About to call cleanAllQueues()...`);
|
||||||
await cleanAllQueues();
|
await cleanAllQueues();
|
||||||
|
console.error(`[SETUP] cleanAllQueues() completed.`);
|
||||||
|
|
||||||
// The integration setup is now the single source of truth for preparing the test DB.
|
// The integration setup is now the single source of truth for preparing the test DB.
|
||||||
// It runs the same seed script that `npm run db:reset:test` used.
|
// It runs the same seed script that `npm run db:reset:test` used.
|
||||||
|
|||||||
Reference in New Issue
Block a user