From 664ad291be48c5f2f73f81b418fb277d00fe90c9 Mon Sep 17 00:00:00 2001 From: Torben Sorensen Date: Fri, 9 Jan 2026 03:41:57 -0800 Subject: [PATCH] integration test fixes - claude for the win? try 3 --- .claude/settings.local.json | 3 +- .gitea/workflows/deploy-to-test.yml | 17 ++ ...ckground-job-processing-and-task-queues.md | 81 ++++++- ...hing-strategy-for-read-heavy-operations.md | 106 +++++++- src/routes/admin.routes.ts | 39 +++ src/routes/ai.routes.ts | 10 +- src/services/cacheService.server.ts | 226 ++++++++++++++++++ src/services/db/flyer.db.ts | 118 +++++---- .../flyerPersistenceService.server.ts | 12 +- src/tests/setup/integration-global-setup.ts | 42 +++- 10 files changed, 591 insertions(+), 63 deletions(-) create mode 100644 src/services/cacheService.server.ts diff --git a/.claude/settings.local.json b/.claude/settings.local.json index b940d086..05e7f879 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -56,7 +56,8 @@ "mcp__memory__delete_entities", "mcp__sequential-thinking__sequentialthinking", "mcp__filesystem__list_directory", - "mcp__filesystem__read_multiple_files" + "mcp__filesystem__read_multiple_files", + "mcp__filesystem__directory_tree" ] } } diff --git a/.gitea/workflows/deploy-to-test.yml b/.gitea/workflows/deploy-to-test.yml index d8245ee6..c8aaf25c 100644 --- a/.gitea/workflows/deploy-to-test.yml +++ b/.gitea/workflows/deploy-to-test.yml @@ -96,6 +96,23 @@ jobs: # It prevents the accumulation of duplicate processes from previous test runs. node -e "const exec = require('child_process').execSync; try { const list = JSON.parse(exec('pm2 jlist').toString()); list.forEach(p => { if (p.name && p.name.endsWith('-test')) { console.log('Deleting test process: ' + p.name + ' (' + p.pm2_env.pm_id + ')'); try { exec('pm2 delete ' + p.pm2_env.pm_id); } catch(e) { console.error('Failed to delete ' + p.pm2_env.pm_id, e.message); } } }); console.log('✅ Test process cleanup complete.'); } catch (e) { if (e.stdout.toString().includes('No process found')) { console.log('No PM2 processes running, cleanup not needed.'); } else { console.error('Error cleaning up test processes:', e.message); } }" || true + - name: Flush Redis Before Tests + # CRITICAL: Clear all Redis data to remove stale BullMQ jobs from previous test runs. + # This prevents old jobs with outdated error messages from polluting test results. + env: + REDIS_PASSWORD: ${{ secrets.REDIS_PASSWORD_TEST }} + run: | + echo "--- Flushing Redis database to remove stale jobs ---" + if [ -z "$REDIS_PASSWORD" ]; then + echo "⚠️ REDIS_PASSWORD_TEST not set, attempting flush without password..." + redis-cli FLUSHDB || echo "Redis flush failed (no password)" + else + redis-cli -a "$REDIS_PASSWORD" FLUSHDB 2>/dev/null && echo "✅ Redis database flushed successfully." || echo "⚠️ Redis flush failed" + fi + # Verify the flush worked by checking key count + KEY_COUNT=$(redis-cli -a "$REDIS_PASSWORD" DBSIZE 2>/dev/null | grep -oE '[0-9]+' || echo "unknown") + echo "Redis key count after flush: $KEY_COUNT" + - name: Run All Tests and Generate Merged Coverage Report # This single step runs both unit and integration tests, then merges their # coverage data into a single report. It combines the environment variables diff --git a/docs/adr/0006-background-job-processing-and-task-queues.md b/docs/adr/0006-background-job-processing-and-task-queues.md index bdcd1603..99834d93 100644 --- a/docs/adr/0006-background-job-processing-and-task-queues.md +++ b/docs/adr/0006-background-job-processing-and-task-queues.md @@ -2,7 +2,7 @@ **Date**: 2025-12-12 -**Status**: Proposed +**Status**: Accepted ## Context @@ -16,3 +16,82 @@ We will implement a dedicated background job processing system using a task queu **Positive**: Decouples the API from heavy processing, allows for retries on failure, and enables scaling the processing workers independently. Increases application reliability and resilience. **Negative**: Introduces a new dependency (Redis) into the infrastructure. Requires refactoring of the flyer processing logic to work within a job queue structure. + +## Implementation Details + +### Queue Infrastructure + +The implementation uses **BullMQ v5.65.1** with **ioredis v5.8.2** for Redis connectivity. Six distinct queues handle different job types: + +| Queue Name | Purpose | Retry Attempts | Backoff Strategy | +| ---------------------------- | --------------------------- | -------------- | ---------------------- | +| `flyer-processing` | OCR/AI processing of flyers | 3 | Exponential (5s base) | +| `email-sending` | Email delivery | 5 | Exponential (10s base) | +| `analytics-reporting` | Daily report generation | 2 | Exponential (60s base) | +| `weekly-analytics-reporting` | Weekly report generation | 2 | Exponential (1h base) | +| `file-cleanup` | Temporary file cleanup | 3 | Exponential (30s base) | +| `token-cleanup` | Expired token removal | 2 | Exponential (1h base) | + +### Key Files + +- `src/services/queues.server.ts` - Queue definitions and configuration +- `src/services/workers.server.ts` - Worker implementations with configurable concurrency +- `src/services/redis.server.ts` - Redis connection management +- `src/services/queueService.server.ts` - Queue lifecycle and graceful shutdown +- `src/services/flyerProcessingService.server.ts` - 5-stage flyer processing pipeline +- `src/types/job-data.ts` - TypeScript interfaces for all job data types + +### API Design + +Endpoints for long-running tasks return **202 Accepted** immediately with a job ID: + +```text +POST /api/ai/upload-and-process → 202 { jobId: "..." } +GET /api/ai/jobs/:jobId/status → { state: "...", progress: ... } +``` + +### Worker Configuration + +Workers are configured via environment variables: + +- `WORKER_CONCURRENCY` - Flyer processing parallelism (default: 1) +- `EMAIL_WORKER_CONCURRENCY` - Email worker parallelism (default: 10) +- `ANALYTICS_WORKER_CONCURRENCY` - Analytics worker parallelism (default: 1) +- `CLEANUP_WORKER_CONCURRENCY` - Cleanup worker parallelism (default: 10) + +### Monitoring + +- **Bull Board UI** available at `/api/admin/jobs` for admin users +- Worker status endpoint: `GET /api/admin/workers/status` +- Queue status endpoint: `GET /api/admin/queues/status` + +### Graceful Shutdown + +Both API and worker processes implement graceful shutdown with a 30-second timeout, ensuring in-flight jobs complete before process termination. + +## Compliance Notes + +### Deprecated Synchronous Endpoints + +The following endpoints process flyers synchronously and are **deprecated**: + +- `POST /api/ai/upload-legacy` - For integration testing only +- `POST /api/ai/flyers/process` - Legacy workflow, should migrate to queue-based approach + +New integrations MUST use `POST /api/ai/upload-and-process` for queue-based processing. + +### Email Handling + +- **Bulk emails** (deal notifications): Enqueued via `emailQueue` +- **Transactional emails** (password reset): Sent synchronously for immediate user feedback + +## Future Enhancements + +Potential improvements for consideration: + +1. **Dead Letter Queue (DLQ)**: Move permanently failed jobs to a dedicated queue for analysis +2. **Job Priority Levels**: Allow priority-based processing for different job types +3. **Real-time Progress**: WebSocket/SSE for live job progress updates to clients +4. **Per-Queue Rate Limiting**: Throttle job processing based on external API limits +5. **Job Dependencies**: Support for jobs that depend on completion of other jobs +6. **Prometheus Metrics**: Export queue metrics for observability dashboards diff --git a/docs/adr/0009-caching-strategy-for-read-heavy-operations.md b/docs/adr/0009-caching-strategy-for-read-heavy-operations.md index 90dcf30f..eee196e5 100644 --- a/docs/adr/0009-caching-strategy-for-read-heavy-operations.md +++ b/docs/adr/0009-caching-strategy-for-read-heavy-operations.md @@ -2,7 +2,7 @@ **Date**: 2025-12-12 -**Status**: Proposed +**Status**: Accepted ## Context @@ -20,3 +20,107 @@ We will implement a multi-layered caching strategy using an in-memory data store **Positive**: Directly addresses application performance and scalability. Reduces database load and improves API response times for common requests. **Negative**: Introduces Redis as a dependency if not already used. Adds complexity to the data-fetching logic and requires careful management of cache invalidation to prevent stale data. + +## Implementation Details + +### Cache Service + +A centralized cache service (`src/services/cacheService.server.ts`) provides reusable caching functionality: + +- **`getOrSet(key, fetcher, options)`**: Cache-aside pattern implementation +- **`get(key)`**: Retrieve cached value +- **`set(key, value, ttl)`**: Store value with TTL +- **`del(key)`**: Delete specific key +- **`invalidatePattern(pattern)`**: Delete keys matching a pattern + +All cache operations are fail-safe - cache failures do not break the application. + +### TTL Configuration + +Different data types use different TTL values based on volatility: + +| Data Type | TTL | Rationale | +| ------------------- | --------- | -------------------------------------- | +| Brands/Stores | 1 hour | Rarely changes, safe to cache longer | +| Flyer lists | 5 minutes | Changes when new flyers are added | +| Individual flyers | 10 minutes| Stable once created | +| Flyer items | 10 minutes| Stable once created | +| Statistics | 5 minutes | Can be slightly stale | +| Frequent sales | 15 minutes| Aggregated data, updated periodically | +| Categories | 1 hour | Rarely changes | + +### Cache Key Strategy + +Cache keys follow a consistent prefix pattern for pattern-based invalidation: + +- `cache:brands` - All brands list +- `cache:flyers:{limit}:{offset}` - Paginated flyer lists +- `cache:flyer:{id}` - Individual flyer data +- `cache:flyer-items:{flyerId}` - Items for a specific flyer +- `cache:stats:*` - Statistics data +- `geocode:{address}` - Geocoding results (30-day TTL) + +### Cached Endpoints + +The following repository methods implement server-side caching: + +| Method | Cache Key Pattern | TTL | +| ------ | ----------------- | --- | +| `FlyerRepository.getAllBrands()` | `cache:brands` | 1 hour | +| `FlyerRepository.getFlyers()` | `cache:flyers:{limit}:{offset}` | 5 minutes | +| `FlyerRepository.getFlyerItems()` | `cache:flyer-items:{flyerId}` | 10 minutes | + +### Cache Invalidation + +**Event-based invalidation** is triggered on write operations: + +- **Flyer creation** (`FlyerPersistenceService.saveFlyer`): Invalidates all `cache:flyers*` keys +- **Flyer deletion** (`FlyerRepository.deleteFlyer`): Invalidates specific flyer and flyer items cache, plus flyer lists + +**Manual invalidation** via admin endpoints: + +- `POST /api/admin/system/clear-cache` - Clears all application cache (flyers, brands, stats) +- `POST /api/admin/system/clear-geocode-cache` - Clears geocoding cache + +### Client-Side Caching + +TanStack React Query provides client-side caching with configurable stale times: + +| Query Type | Stale Time | +| ----------------- | ----------- | +| Categories | 1 hour | +| Master Items | 10 minutes | +| Flyer Items | 5 minutes | +| Flyers | 2 minutes | +| Shopping Lists | 1 minute | +| Activity Log | 30 seconds | + +### Multi-Layer Cache Architecture + +```text +Client Request + ↓ +[TanStack React Query] ← Client-side cache (staleTime-based) + ↓ +[Express API] + ↓ +[CacheService.getOrSet()] ← Server-side Redis cache (TTL-based) + ↓ +[PostgreSQL Database] +``` + +## Key Files + +- `src/services/cacheService.server.ts` - Centralized cache service +- `src/services/db/flyer.db.ts` - Repository with caching for brands, flyers, flyer items +- `src/services/flyerPersistenceService.server.ts` - Cache invalidation on flyer creation +- `src/routes/admin.routes.ts` - Admin cache management endpoints +- `src/config/queryClient.ts` - Client-side query cache configuration + +## Future Enhancements + +1. **Recipe caching**: Add caching to expensive recipe queries (by-sale-percentage, etc.) +2. **Cache warming**: Pre-populate cache on startup for frequently accessed static data +3. **Cache metrics**: Add hit/miss rate monitoring for observability +4. **Conditional caching**: Skip cache for authenticated user-specific data +5. **Cache compression**: Compress large cached payloads to reduce Redis memory usage diff --git a/src/routes/admin.routes.ts b/src/routes/admin.routes.ts index ddf52c22..3327eb84 100644 --- a/src/routes/admin.routes.ts +++ b/src/routes/admin.routes.ts @@ -8,6 +8,7 @@ import { z } from 'zod'; import * as db from '../services/db/index.db'; import type { UserProfile } from '../types'; import { geocodingService } from '../services/geocodingService.server'; +import { cacheService } from '../services/cacheService.server'; import { requireFileUpload } from '../middleware/fileUpload.middleware'; // This was a duplicate, fixed. import { createUploadMiddleware, @@ -635,6 +636,44 @@ router.post( }, ); +/** + * POST /api/admin/system/clear-cache - Clears the application data cache. + * Clears cached flyers, brands, and stats data from Redis. + * Requires admin privileges. + */ +router.post( + '/system/clear-cache', + adminTriggerLimiter, + validateRequest(emptySchema), + async (req: Request, res: Response, next: NextFunction) => { + const userProfile = req.user as UserProfile; + req.log.info( + `[Admin] Manual cache clear received from user: ${userProfile.user.user_id}`, + ); + + try { + const [flyersDeleted, brandsDeleted, statsDeleted] = await Promise.all([ + cacheService.invalidateFlyers(req.log), + cacheService.invalidateBrands(req.log), + cacheService.invalidateStats(req.log), + ]); + + const totalDeleted = flyersDeleted + brandsDeleted + statsDeleted; + res.status(200).json({ + message: `Successfully cleared the application cache. ${totalDeleted} keys were removed.`, + details: { + flyers: flyersDeleted, + brands: brandsDeleted, + stats: statsDeleted, + }, + }); + } catch (error) { + req.log.error({ error }, '[Admin] Failed to clear application cache.'); + next(error); + } + }, +); + /* Catches errors from multer (e.g., file size, file filter) */ router.use(handleMulterError); diff --git a/src/routes/ai.routes.ts b/src/routes/ai.routes.ts index 640b69e9..da3a61ee 100644 --- a/src/routes/ai.routes.ts +++ b/src/routes/ai.routes.ts @@ -234,6 +234,9 @@ router.post( * POST /api/ai/upload-legacy - Process a flyer upload from a legacy client. * This is an authenticated route that processes the flyer synchronously. * This is used for integration testing the legacy upload flow. + * + * @deprecated Use POST /api/ai/upload-and-process instead for async queue-based processing (ADR-0006). + * This synchronous endpoint is retained only for integration testing purposes. */ router.post( '/upload-legacy', @@ -282,9 +285,12 @@ router.get( ); /** - * This endpoint saves the processed flyer data to the database. It is the final step - * in the flyer upload workflow after the AI has extracted the data. + * POST /api/ai/flyers/process - Saves the processed flyer data to the database. + * This is the final step in the flyer upload workflow after the AI has extracted the data. * It uses `optionalAuth` to handle submissions from both anonymous and authenticated users. + * + * @deprecated Use POST /api/ai/upload-and-process instead for async queue-based processing (ADR-0006). + * This synchronous endpoint processes flyers inline and should be migrated to the queue-based approach. */ router.post( '/flyers/process', diff --git a/src/services/cacheService.server.ts b/src/services/cacheService.server.ts new file mode 100644 index 00000000..17441ed0 --- /dev/null +++ b/src/services/cacheService.server.ts @@ -0,0 +1,226 @@ +// src/services/cacheService.server.ts +/** + * @file Centralized caching service implementing the Cache-Aside pattern. + * This service provides a reusable wrapper around Redis for caching read-heavy operations. + * See ADR-009 for the caching strategy documentation. + */ +import type { Logger } from 'pino'; +import { connection as redis } from './redis.server'; +import { logger as globalLogger } from './logger.server'; + +/** + * TTL values in seconds for different cache types. + * These can be tuned based on data volatility and freshness requirements. + */ +export const CACHE_TTL = { + /** Brand/store list - rarely changes, safe to cache for 1 hour */ + BRANDS: 60 * 60, + /** Flyer list - changes when new flyers are added, cache for 5 minutes */ + FLYERS: 5 * 60, + /** Individual flyer data - cache for 10 minutes */ + FLYER: 10 * 60, + /** Flyer items - cache for 10 minutes */ + FLYER_ITEMS: 10 * 60, + /** Statistics - can be slightly stale, cache for 5 minutes */ + STATS: 5 * 60, + /** Most frequent sales - aggregated data, cache for 15 minutes */ + FREQUENT_SALES: 15 * 60, + /** Categories - rarely changes, cache for 1 hour */ + CATEGORIES: 60 * 60, +} as const; + +/** + * Cache key prefixes for different data types. + * Using consistent prefixes allows for pattern-based invalidation. + */ +export const CACHE_PREFIX = { + BRANDS: 'cache:brands', + FLYERS: 'cache:flyers', + FLYER: 'cache:flyer', + FLYER_ITEMS: 'cache:flyer-items', + STATS: 'cache:stats', + FREQUENT_SALES: 'cache:frequent-sales', + CATEGORIES: 'cache:categories', +} as const; + +export interface CacheOptions { + /** Time-to-live in seconds */ + ttl: number; + /** Optional logger for this operation */ + logger?: Logger; +} + +/** + * Centralized cache service implementing the Cache-Aside pattern. + * All cache operations are fail-safe - cache failures do not break the application. + */ +class CacheService { + /** + * Retrieves a value from cache. + * @param key The cache key + * @param logger Optional logger for this operation + * @returns The cached value or null if not found/error + */ + async get(key: string, logger: Logger = globalLogger): Promise { + try { + const cached = await redis.get(key); + if (cached) { + logger.debug({ cacheKey: key }, 'Cache hit'); + return JSON.parse(cached) as T; + } + logger.debug({ cacheKey: key }, 'Cache miss'); + return null; + } catch (error) { + logger.warn({ err: error, cacheKey: key }, 'Redis GET failed, proceeding without cache'); + return null; + } + } + + /** + * Stores a value in cache with TTL. + * @param key The cache key + * @param value The value to cache (will be JSON stringified) + * @param ttl Time-to-live in seconds + * @param logger Optional logger for this operation + */ + async set(key: string, value: T, ttl: number, logger: Logger = globalLogger): Promise { + try { + await redis.set(key, JSON.stringify(value), 'EX', ttl); + logger.debug({ cacheKey: key, ttl }, 'Value cached'); + } catch (error) { + logger.warn({ err: error, cacheKey: key }, 'Redis SET failed, value not cached'); + } + } + + /** + * Deletes a specific key from cache. + * @param key The cache key to delete + * @param logger Optional logger for this operation + */ + async del(key: string, logger: Logger = globalLogger): Promise { + try { + await redis.del(key); + logger.debug({ cacheKey: key }, 'Cache key deleted'); + } catch (error) { + logger.warn({ err: error, cacheKey: key }, 'Redis DEL failed'); + } + } + + /** + * Invalidates all cache keys matching a pattern. + * Uses SCAN for safe iteration over large key sets. + * @param pattern The pattern to match (e.g., 'cache:flyers*') + * @param logger Optional logger for this operation + * @returns The number of keys deleted + */ + async invalidatePattern(pattern: string, logger: Logger = globalLogger): Promise { + let cursor = '0'; + let totalDeleted = 0; + + try { + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100); + cursor = nextCursor; + if (keys.length > 0) { + const deletedCount = await redis.del(...keys); + totalDeleted += deletedCount; + } + } while (cursor !== '0'); + + logger.info({ pattern, totalDeleted }, 'Cache invalidation completed'); + return totalDeleted; + } catch (error) { + logger.error({ err: error, pattern }, 'Cache invalidation failed'); + throw error; + } + } + + /** + * Implements the Cache-Aside pattern: try cache first, fall back to fetcher, cache result. + * This is the primary method for adding caching to existing repository methods. + * + * @param key The cache key + * @param fetcher Function that retrieves data from the source (e.g., database) + * @param options Cache options including TTL + * @returns The data (from cache or fetcher) + * + * @example + * ```typescript + * const brands = await cacheService.getOrSet( + * CACHE_PREFIX.BRANDS, + * () => this.db.query('SELECT * FROM stores'), + * { ttl: CACHE_TTL.BRANDS, logger } + * ); + * ``` + */ + async getOrSet( + key: string, + fetcher: () => Promise, + options: CacheOptions, + ): Promise { + const logger = options.logger ?? globalLogger; + + // Try to get from cache first + const cached = await this.get(key, logger); + if (cached !== null) { + return cached; + } + + // Cache miss - fetch from source + const data = await fetcher(); + + // Cache the result (fire-and-forget, don't await) + this.set(key, data, options.ttl, logger).catch(() => { + // Error already logged in set() + }); + + return data; + } + + // --- Convenience methods for specific cache types --- + + /** + * Invalidates all brand-related cache entries. + */ + async invalidateBrands(logger: Logger = globalLogger): Promise { + return this.invalidatePattern(`${CACHE_PREFIX.BRANDS}*`, logger); + } + + /** + * Invalidates all flyer-related cache entries. + */ + async invalidateFlyers(logger: Logger = globalLogger): Promise { + const patterns = [ + `${CACHE_PREFIX.FLYERS}*`, + `${CACHE_PREFIX.FLYER}*`, + `${CACHE_PREFIX.FLYER_ITEMS}*`, + ]; + + let total = 0; + for (const pattern of patterns) { + total += await this.invalidatePattern(pattern, logger); + } + return total; + } + + /** + * Invalidates cache for a specific flyer and its items. + */ + async invalidateFlyer(flyerId: number, logger: Logger = globalLogger): Promise { + await Promise.all([ + this.del(`${CACHE_PREFIX.FLYER}:${flyerId}`, logger), + this.del(`${CACHE_PREFIX.FLYER_ITEMS}:${flyerId}`, logger), + // Also invalidate the flyers list since it may contain this flyer + this.invalidatePattern(`${CACHE_PREFIX.FLYERS}*`, logger), + ]); + } + + /** + * Invalidates all statistics cache entries. + */ + async invalidateStats(logger: Logger = globalLogger): Promise { + return this.invalidatePattern(`${CACHE_PREFIX.STATS}*`, logger); + } +} + +export const cacheService = new CacheService(); diff --git a/src/services/db/flyer.db.ts b/src/services/db/flyer.db.ts index bcf3d1dc..7994a592 100644 --- a/src/services/db/flyer.db.ts +++ b/src/services/db/flyer.db.ts @@ -3,6 +3,7 @@ import type { Pool, PoolClient } from 'pg'; import { getPool, withTransaction } from './connection.db'; import type { Logger } from 'pino'; import { UniqueConstraintError, NotFoundError, handleDbError } from './errors.db'; +import { cacheService, CACHE_TTL, CACHE_PREFIX } from '../cacheService.server'; import type { Flyer, FlyerItem, @@ -229,22 +230,31 @@ export class FlyerRepository { /** * Retrieves all distinct brands from the stores table. + * Uses cache-aside pattern with 1-hour TTL (brands rarely change). * @returns A promise that resolves to an array of Brand objects. */ async getAllBrands(logger: Logger): Promise { - try { - const query = ` - SELECT s.store_id as brand_id, s.name, s.logo_url, s.created_at, s.updated_at - FROM public.stores s - ORDER BY s.name; - `; - const res = await this.db.query(query); - return res.rows; - } catch (error) { - handleDbError(error, logger, 'Database error in getAllBrands', {}, { - defaultMessage: 'Failed to retrieve brands from database.', - }); - } + const cacheKey = CACHE_PREFIX.BRANDS; + + return cacheService.getOrSet( + cacheKey, + async () => { + try { + const query = ` + SELECT s.store_id as brand_id, s.name, s.logo_url, s.created_at, s.updated_at + FROM public.stores s + ORDER BY s.name; + `; + const res = await this.db.query(query); + return res.rows; + } catch (error) { + handleDbError(error, logger, 'Database error in getAllBrands', {}, { + defaultMessage: 'Failed to retrieve brands from database.', + }); + } + }, + { ttl: CACHE_TTL.BRANDS, logger }, + ); } /** @@ -262,49 +272,67 @@ export class FlyerRepository { /** * Retrieves all flyers from the database, ordered by creation date. + * Uses cache-aside pattern with 5-minute TTL. * @param limit The maximum number of flyers to return. * @param offset The number of flyers to skip. * @returns A promise that resolves to an array of Flyer objects. */ async getFlyers(logger: Logger, limit: number = 20, offset: number = 0): Promise { - try { - const query = ` - SELECT - f.*, - json_build_object( - 'store_id', s.store_id, - 'name', s.name, - 'logo_url', s.logo_url - ) as store - FROM public.flyers f - JOIN public.stores s ON f.store_id = s.store_id - ORDER BY f.created_at DESC LIMIT $1 OFFSET $2`; - const res = await this.db.query(query, [limit, offset]); - return res.rows; - } catch (error) { - handleDbError(error, logger, 'Database error in getFlyers', { limit, offset }, { - defaultMessage: 'Failed to retrieve flyers from database.', - }); - } + const cacheKey = `${CACHE_PREFIX.FLYERS}:${limit}:${offset}`; + + return cacheService.getOrSet( + cacheKey, + async () => { + try { + const query = ` + SELECT + f.*, + json_build_object( + 'store_id', s.store_id, + 'name', s.name, + 'logo_url', s.logo_url + ) as store + FROM public.flyers f + JOIN public.stores s ON f.store_id = s.store_id + ORDER BY f.created_at DESC LIMIT $1 OFFSET $2`; + const res = await this.db.query(query, [limit, offset]); + return res.rows; + } catch (error) { + handleDbError(error, logger, 'Database error in getFlyers', { limit, offset }, { + defaultMessage: 'Failed to retrieve flyers from database.', + }); + } + }, + { ttl: CACHE_TTL.FLYERS, logger }, + ); } /** * Retrieves all items for a specific flyer. + * Uses cache-aside pattern with 10-minute TTL. * @param flyerId The ID of the flyer. * @returns A promise that resolves to an array of FlyerItem objects. */ async getFlyerItems(flyerId: number, logger: Logger): Promise { - try { - const res = await this.db.query( - 'SELECT * FROM public.flyer_items WHERE flyer_id = $1 ORDER BY flyer_item_id ASC', - [flyerId], - ); - return res.rows; - } catch (error) { - handleDbError(error, logger, 'Database error in getFlyerItems', { flyerId }, { - defaultMessage: 'Failed to retrieve flyer items from database.', - }); - } + const cacheKey = `${CACHE_PREFIX.FLYER_ITEMS}:${flyerId}`; + + return cacheService.getOrSet( + cacheKey, + async () => { + try { + const res = await this.db.query( + 'SELECT * FROM public.flyer_items WHERE flyer_id = $1 ORDER BY flyer_item_id ASC', + [flyerId], + ); + return res.rows; + } catch (error) { + handleDbError(error, logger, 'Database error in getFlyerItems', { flyerId }, { + defaultMessage: 'Failed to retrieve flyer items from database.', + }); + } + }, + { ttl: CACHE_TTL.FLYER_ITEMS, logger }, + ); } /** @@ -399,6 +427,7 @@ export class FlyerRepository { /** * Deletes a flyer and all its associated items in a transaction. * This should typically be an admin-only action. + * Invalidates related cache entries after successful deletion. * @param flyerId The ID of the flyer to delete. */ async deleteFlyer(flyerId: number, logger: Logger): Promise { @@ -413,6 +442,9 @@ export class FlyerRepository { } logger.info(`Successfully deleted flyer with ID: ${flyerId}`); }); + + // Invalidate cache after successful deletion + await cacheService.invalidateFlyer(flyerId, logger); } catch (error) { handleDbError(error, logger, 'Database transaction error in deleteFlyer', { flyerId }, { defaultMessage: 'Failed to delete flyer.', diff --git a/src/services/flyerPersistenceService.server.ts b/src/services/flyerPersistenceService.server.ts index f86ee449..44072fe2 100644 --- a/src/services/flyerPersistenceService.server.ts +++ b/src/services/flyerPersistenceService.server.ts @@ -4,12 +4,13 @@ import { withTransaction } from './db/connection.db'; import { createFlyerAndItems } from './db/flyer.db'; import { AdminRepository } from './db/admin.db'; import { GamificationRepository } from './db/gamification.db'; +import { cacheService } from './cacheService.server'; import type { FlyerInsert, FlyerItemInsert, Flyer } from '../types'; export class FlyerPersistenceService { /** * Saves the flyer and its items to the database within a transaction. - * Also logs the activity. + * Also logs the activity and invalidates related cache entries. */ async saveFlyer( flyerData: FlyerInsert, @@ -17,7 +18,7 @@ export class FlyerPersistenceService { userId: string | undefined, logger: Logger, ): Promise { - return withTransaction(async (client) => { + const flyer = await withTransaction(async (client) => { const { flyer, items } = await createFlyerAndItems(flyerData, itemsForDb, logger, client); logger.info( @@ -43,5 +44,12 @@ export class FlyerPersistenceService { } return flyer; }); + + // Invalidate flyer list cache after successful creation (fire-and-forget) + cacheService.invalidateFlyers(logger).catch(() => { + // Error already logged in invalidateFlyers + }); + + return flyer; } } \ No newline at end of file diff --git a/src/tests/setup/integration-global-setup.ts b/src/tests/setup/integration-global-setup.ts index e9db94bd..49622ccb 100644 --- a/src/tests/setup/integration-global-setup.ts +++ b/src/tests/setup/integration-global-setup.ts @@ -14,22 +14,34 @@ let globalPool: ReturnType | null = null; * This is critical because old jobs with outdated error messages can pollute test results. */ async function cleanAllQueues() { - console.log(`[PID:${process.pid}] Cleaning all BullMQ queues...`); - const { flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue } = await import('../../services/queues.server'); + // Use console.error for visibility in CI logs (stderr is often more reliable) + console.error(`[PID:${process.pid}] [QUEUE CLEANUP] Starting BullMQ queue cleanup...`); - const queues = [flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue]; + try { + const { flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue } = await import('../../services/queues.server'); + console.error(`[QUEUE CLEANUP] Successfully imported queue modules`); - for (const queue of queues) { - try { - // obliterate() removes ALL data associated with the queue from Redis - await queue.obliterate({ force: true }); - console.log(` ✅ Cleaned queue: ${queue.name}`); - } catch (error) { - // Log but don't fail - the queue might not exist yet - console.log(` ⚠️ Could not clean queue ${queue.name}: ${error instanceof Error ? error.message : 'Unknown error'}`); + const queues = [flyerQueue, cleanupQueue, emailQueue, analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue]; + + for (const queue of queues) { + try { + // Log queue state before cleanup + const jobCounts = await queue.getJobCounts(); + console.error(`[QUEUE CLEANUP] Queue "${queue.name}" before cleanup: ${JSON.stringify(jobCounts)}`); + + // obliterate() removes ALL data associated with the queue from Redis + await queue.obliterate({ force: true }); + console.error(` ✅ [QUEUE CLEANUP] Cleaned queue: ${queue.name}`); + } catch (error) { + // Log but don't fail - the queue might not exist yet + console.error(` ⚠️ [QUEUE CLEANUP] Could not clean queue ${queue.name}: ${error instanceof Error ? error.message : 'Unknown error'}`); + } } + console.error(`✅ [PID:${process.pid}] [QUEUE CLEANUP] All queues cleaned successfully.`); + } catch (error) { + console.error(`❌ [PID:${process.pid}] [QUEUE CLEANUP] CRITICAL ERROR during queue cleanup:`, error); + // Don't throw - we want the tests to continue even if cleanup fails } - console.log(`✅ [PID:${process.pid}] All queues cleaned.`); } export async function setup() { @@ -38,11 +50,15 @@ export async function setup() { // Fix: Set the FRONTEND_URL globally for the test server instance process.env.FRONTEND_URL = 'https://example.com'; - console.log(`\n--- [PID:${process.pid}] Running Integration Test GLOBAL Setup ---`); + console.error(`\n--- [PID:${process.pid}] Running Integration Test GLOBAL Setup ---`); + console.error(`[SETUP] REDIS_URL: ${process.env.REDIS_URL}`); + console.error(`[SETUP] REDIS_PASSWORD is set: ${!!process.env.REDIS_PASSWORD}`); // CRITICAL: Clean all queues BEFORE running any tests to remove stale jobs // from previous test runs that may have outdated error messages. + console.error(`[SETUP] About to call cleanAllQueues()...`); await cleanAllQueues(); + console.error(`[SETUP] cleanAllQueues() completed.`); // The integration setup is now the single source of truth for preparing the test DB. // It runs the same seed script that `npm run db:reset:test` used.