+ );
+}
+```
+
+## Troubleshooting
+
+### Connection Issues
+
+1. **Check JWT Token**: WebSocket requires a valid JWT token in cookies or query string
+2. **Check Server Logs**: Look for WebSocket connection errors in server logs
+3. **Check Browser Console**: WebSocket errors are logged to console
+4. **Verify Path**: WebSocket server is at `ws://localhost:3001/ws` (or `wss://` for HTTPS)
+
+### Not Receiving Notifications
+
+1. **Check Connection Status**: Use `` to verify connection
+2. **Verify Event Name**: Ensure you're listening to the correct event (`notification:deal`, etc.)
+3. **Check User ID**: Notifications are sent to specific users - verify JWT user_id matches
+
+### High Memory Usage
+
+1. **Connection Leaks**: Ensure components using `useWebSocket` are properly unmounting
+2. **Event Listeners**: `useEventBus` automatically cleans up, but verify no manual listeners remain
+3. **Check Stats**: Use `/api/admin/websocket/stats` to monitor connection count
+
+## Testing
+
+### Unit Tests
+
+```typescript
+import { renderHook } from '@testing-library/react';
+import { useWebSocket } from '../hooks/useWebSocket';
+
+describe('useWebSocket', () => {
+ it('should connect automatically', () => {
+ const { result } = renderHook(() => useWebSocket({ autoConnect: true }));
+ expect(result.current.isConnecting).toBe(true);
+ });
+});
+```
+
+### Integration Tests
+
+See [src/tests/integration/websocket.integration.test.ts](../src/tests/integration/websocket.integration.test.ts) for comprehensive integration tests.
+
+## Related Documentation
+
+- [ADR-022: Real-time Notification System](./adr/0022-real-time-notification-system.md)
+- [ADR-036: Event Bus and Pub/Sub Pattern](./adr/0036-event-bus-and-pub-sub-pattern.md)
+- [ADR-042: Email and Notification Architecture](./adr/0042-email-and-notification-architecture.md)
diff --git a/docs/adr/0022-real-time-notification-system.md b/docs/adr/0022-real-time-notification-system.md
index 5b76957..fafa14a 100644
--- a/docs/adr/0022-real-time-notification-system.md
+++ b/docs/adr/0022-real-time-notification-system.md
@@ -2,17 +2,374 @@
**Date**: 2025-12-12
-**Status**: Proposed
+**Status**: Accepted
+
+**Implemented**: 2026-01-19
## Context
A core feature is providing "Active Deal Alerts" to users. The current HTTP-based architecture is not suitable for pushing real-time updates to clients efficiently. Relying on traditional polling would be inefficient and slow.
+Users need to be notified immediately when:
+
+1. **New deals are found** on their watched items
+2. **System announcements** need to be broadcast
+3. **Background jobs complete** that affect their data
+
+Traditional approaches:
+
+- **HTTP Polling**: Inefficient, creates unnecessary load, delays up to polling interval
+- **Server-Sent Events (SSE)**: One-way only, no client-to-server messaging
+- **WebSockets**: Bi-directional, real-time, efficient
+
## Decision
-We will implement a real-time communication system using **WebSockets** (e.g., with the `ws` library or Socket.IO). This will involve an architecture for a notification service that listens for backend events (like a new deal from a background job) and pushes live updates to connected clients.
+We will implement a real-time communication system using **WebSockets** with the `ws` library. This will involve:
+
+1. **WebSocket Server**: Manages connections, authentication, and message routing
+2. **React Hook**: Provides easy integration for React components
+3. **Event Bus Integration**: Bridges WebSocket messages to in-app events
+4. **Background Job Integration**: Emits WebSocket notifications when deals are found
+
+### Design Principles
+
+- **JWT Authentication**: WebSocket connections authenticated via JWT tokens
+- **Type-Safe Messages**: Strongly-typed message formats prevent errors
+- **Auto-Reconnect**: Client automatically reconnects with exponential backoff
+- **Graceful Degradation**: Email + DB notifications remain for offline users
+- **Heartbeat Ping/Pong**: Detect and cleanup dead connections
+- **Singleton Service**: Single WebSocket service instance shared across app
+
+## Implementation Details
+
+### WebSocket Message Types
+
+Located in `src/types/websocket.ts`:
+
+```typescript
+export interface WebSocketMessage {
+ type: WebSocketMessageType;
+ data: T;
+ timestamp: string;
+}
+
+export type WebSocketMessageType =
+ | 'deal-notification'
+ | 'system-message'
+ | 'ping'
+ | 'pong'
+ | 'error'
+ | 'connection-established';
+
+// Deal notification payload
+export interface DealNotificationData {
+ notification_id?: string;
+ deals: DealInfo[];
+ user_id: string;
+ message: string;
+}
+
+// Type-safe message creators
+export const createWebSocketMessage = {
+ dealNotification: (data: DealNotificationData) => ({ ... }),
+ systemMessage: (data: SystemMessageData) => ({ ... }),
+ error: (data: ErrorMessageData) => ({ ... }),
+ // ...
+};
+```
+
+### WebSocket Server Service
+
+Located in `src/services/websocketService.server.ts`:
+
+```typescript
+export class WebSocketService {
+ private wss: WebSocketServer | null = null;
+ private clients: Map> = new Map();
+ private pingInterval: NodeJS.Timeout | null = null;
+
+ initialize(server: HTTPServer): void {
+ this.wss = new WebSocketServer({
+ server,
+ path: '/ws',
+ });
+
+ this.wss.on('connection', (ws, request) => {
+ this.handleConnection(ws, request);
+ });
+
+ this.startHeartbeat(); // Ping every 30s
+ }
+
+ // Authentication via JWT from query string or cookie
+ private extractToken(request: IncomingMessage): string | null {
+ // Extract from ?token=xxx or Cookie: accessToken=xxx
+ }
+
+ // Broadcast to specific user
+ broadcastDealNotification(userId: string, data: DealNotificationData): void {
+ const message = createWebSocketMessage.dealNotification(data);
+ this.broadcastToUser(userId, message);
+ }
+
+ // Broadcast to all users
+ broadcastToAll(data: SystemMessageData): void {
+ // Send to all connected clients
+ }
+
+ shutdown(): void {
+ // Gracefully close all connections
+ }
+}
+
+export const websocketService = new WebSocketService(globalLogger);
+```
+
+### Server Integration
+
+Located in `server.ts`:
+
+```typescript
+import { websocketService } from './src/services/websocketService.server';
+
+if (process.env.NODE_ENV !== 'test') {
+ const server = app.listen(PORT, () => {
+ logger.info(`Authentication server started on port ${PORT}`);
+ });
+
+ // Initialize WebSocket server (ADR-022)
+ websocketService.initialize(server);
+ logger.info('WebSocket server initialized for real-time notifications');
+
+ // Graceful shutdown
+ const handleShutdown = (signal: string) => {
+ websocketService.shutdown();
+ gracefulShutdown(signal);
+ };
+
+ process.on('SIGINT', () => handleShutdown('SIGINT'));
+ process.on('SIGTERM', () => handleShutdown('SIGTERM'));
+}
+```
+
+### React Client Hook
+
+Located in `src/hooks/useWebSocket.ts`:
+
+```typescript
+export function useWebSocket(options: UseWebSocketOptions = {}) {
+ const [state, setState] = useState({
+ isConnected: false,
+ isConnecting: false,
+ error: null,
+ });
+
+ const connect = useCallback(() => {
+ const url = getWebSocketUrl(); // wss://host/ws?token=xxx
+ const ws = new WebSocket(url);
+
+ ws.onmessage = (event) => {
+ const message = JSON.parse(event.data) as WebSocketMessage;
+
+ // Emit to event bus for cross-component communication
+ switch (message.type) {
+ case 'deal-notification':
+ eventBus.dispatch('notification:deal', message.data);
+ break;
+ case 'system-message':
+ eventBus.dispatch('notification:system', message.data);
+ break;
+ // ...
+ }
+ };
+
+ ws.onclose = () => {
+ // Auto-reconnect with exponential backoff
+ if (reconnectAttempts < maxReconnectAttempts) {
+ setTimeout(connect, reconnectDelay * Math.pow(2, reconnectAttempts));
+ reconnectAttempts++;
+ }
+ };
+ }, []);
+
+ useEffect(() => {
+ if (autoConnect) connect();
+ return () => disconnect();
+ }, [autoConnect, connect, disconnect]);
+
+ return { ...state, connect, disconnect, send };
+}
+```
+
+### Background Job Integration
+
+Located in `src/services/backgroundJobService.ts`:
+
+```typescript
+private async _processDealsForUser({ userProfile, deals }: UserDealGroup) {
+ // ... existing email notification logic ...
+
+ // Send real-time WebSocket notification (ADR-022)
+ const { websocketService } = await import('./websocketService.server');
+ websocketService.broadcastDealNotification(userProfile.user_id, {
+ user_id: userProfile.user_id,
+ deals: deals.map((deal) => ({
+ item_name: deal.item_name,
+ best_price_in_cents: deal.best_price_in_cents,
+ store_name: deal.store.name,
+ store_id: deal.store.store_id,
+ })),
+ message: `You have ${deals.length} new deal(s) on your watched items!`,
+ });
+}
+```
+
+### Usage in React Components
+
+```typescript
+import { useWebSocket } from '../hooks/useWebSocket';
+import { useEventBus } from '../hooks/useEventBus';
+import { useCallback } from 'react';
+
+function NotificationComponent() {
+ // Connect to WebSocket
+ const { isConnected, error } = useWebSocket({ autoConnect: true });
+
+ // Listen for deal notifications via event bus
+ const handleDealNotification = useCallback((data: DealNotificationData) => {
+ toast.success(`${data.deals.length} new deals found!`);
+ }, []);
+
+ useEventBus('notification:deal', handleDealNotification);
+
+ return (
+
+ {isConnected ? '🟢 Live' : '🔴 Offline'}
+
+ );
+}
+```
+
+## Architecture Diagram
+
+```
+┌─────────────────────────────────────────────────────────────┐
+│ WebSocket Architecture │
+└─────────────────────────────────────────────────────────────┘
+
+Server Side:
+┌──────────────────┐ ┌──────────────────┐ ┌─────────────────┐
+│ Background Job │─────▶│ WebSocket │─────▶│ Connected │
+│ (Deal Checker) │ │ Service │ │ Clients │
+└──────────────────┘ └──────────────────┘ └─────────────────┘
+ │ ▲
+ │ │
+ ▼ │
+┌──────────────────┐ │
+│ Email Queue │ │
+│ (BullMQ) │ │
+└──────────────────┘ │
+ │ │
+ ▼ │
+┌──────────────────┐ ┌──────────────────┐
+│ DB Notification │ │ Express Server │
+│ Storage │ │ + WS Upgrade │
+└──────────────────┘ └──────────────────┘
+
+Client Side:
+┌──────────────────┐ ┌──────────────────┐ ┌─────────────────┐
+│ useWebSocket │◀────▶│ WebSocket │◀────▶│ Event Bus │
+│ Hook │ │ Connection │ │ Integration │
+└──────────────────┘ └──────────────────┘ └─────────────────┘
+ │
+ ▼
+┌──────────────────┐
+│ UI Components │
+│ (Notifications) │
+└──────────────────┘
+```
+
+## Security Considerations
+
+1. **Authentication**: JWT tokens required for WebSocket connections
+2. **User Isolation**: Messages routed only to authenticated user's connections
+3. **Rate Limiting**: Heartbeat ping/pong prevents connection flooding
+4. **Graceful Shutdown**: Notifies clients before server shutdown
+5. **Error Handling**: Failed WebSocket sends don't crash the server
## Consequences
-**Positive**: Enables a core, user-facing feature in a scalable and efficient manner. Significantly improves user engagement and experience.
-**Negative**: Introduces a new dependency (e.g., WebSocket library) and adds complexity to the backend and frontend architecture. Requires careful handling of connection management and scaling.
+### Positive
+
+- **Real-time Updates**: Users see deals immediately when found
+- **Better UX**: No page refresh needed, instant notifications
+- **Efficient**: Single persistent connection vs polling every N seconds
+- **Scalable**: Connection pooling per user, heartbeat cleanup
+- **Type-Safe**: TypeScript types prevent message format errors
+- **Resilient**: Auto-reconnect with exponential backoff
+- **Observable**: Connection stats available via `getConnectionStats()`
+- **Testable**: Comprehensive unit tests for message types and service
+
+### Negative
+
+- **Complexity**: WebSocket server adds new infrastructure component
+- **Memory**: Each connection consumes server memory
+- **Scaling**: Single-server implementation (multi-server requires Redis pub/sub)
+- **Browser Support**: Requires WebSocket-capable browsers (all modern browsers)
+- **Network**: Persistent connections require stable network
+
+### Mitigation
+
+- **Graceful Degradation**: Email + DB notifications remain for offline users
+- **Connection Limits**: Can add max connections per user if needed
+- **Monitoring**: Connection stats exposed for observability
+- **Future Scaling**: Can add Redis pub/sub for multi-instance deployments
+- **Heartbeat**: 30s ping/pong detects and cleans up dead connections
+
+## Testing Strategy
+
+### Unit Tests
+
+Located in `src/services/websocketService.server.test.ts`:
+
+```typescript
+describe('WebSocketService', () => {
+ it('should initialize without errors', () => { ... });
+ it('should handle broadcasting with no active connections', () => { ... });
+ it('should shutdown gracefully', () => { ... });
+});
+```
+
+Located in `src/types/websocket.test.ts`:
+
+```typescript
+describe('WebSocket Message Creators', () => {
+ it('should create valid deal notification messages', () => { ... });
+ it('should generate valid ISO timestamps', () => { ... });
+});
+```
+
+### Integration Tests
+
+Future work: Add integration tests that:
+
+- Connect WebSocket clients to test server
+- Verify authentication and message routing
+- Test reconnection logic
+- Validate message delivery
+
+## Key Files
+
+- `src/types/websocket.ts` - WebSocket message types and creators
+- `src/services/websocketService.server.ts` - WebSocket server service
+- `src/hooks/useWebSocket.ts` - React hook for WebSocket connections
+- `src/services/backgroundJobService.ts` - Integration point for deal notifications
+- `server.ts` - Express + WebSocket server initialization
+- `src/services/websocketService.server.test.ts` - Unit tests
+- `src/types/websocket.test.ts` - Message type tests
+
+## Related ADRs
+
+- [ADR-036](./0036-event-bus-and-pub-sub-pattern.md) - Event Bus Pattern (used by client hook)
+- [ADR-042](./0042-email-and-notification-architecture.md) - Email Notifications (fallback mechanism)
+- [ADR-006](./0006-background-job-processing-and-task-queues.md) - Background Jobs (triggers WebSocket notifications)
diff --git a/package-lock.json b/package-lock.json
index 109d805..ef02cf1 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -84,6 +84,7 @@
"@types/supertest": "^6.0.3",
"@types/swagger-jsdoc": "^6.0.4",
"@types/swagger-ui-express": "^4.1.8",
+ "@types/ws": "^8.18.1",
"@types/zxcvbn": "^4.4.5",
"@typescript-eslint/eslint-plugin": "^8.47.0",
"@typescript-eslint/parser": "^8.47.0",
@@ -6741,6 +6742,16 @@
"integrity": "sha512-zFDAD+tlpf2r4asuHEj0XH6pY6i0g5NeAHPn+15wk3BV6JA69eERFXC1gyGThDkVa1zCyKr5jox1+2LbV/AMLg==",
"license": "MIT"
},
+ "node_modules/@types/ws": {
+ "version": "8.18.1",
+ "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz",
+ "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==",
+ "dev": true,
+ "license": "MIT",
+ "dependencies": {
+ "@types/node": "*"
+ }
+ },
"node_modules/@types/zxcvbn": {
"version": "4.4.5",
"resolved": "https://registry.npmjs.org/@types/zxcvbn/-/zxcvbn-4.4.5.tgz",
diff --git a/package.json b/package.json
index b8bdd04..3c9bf89 100644
--- a/package.json
+++ b/package.json
@@ -104,6 +104,7 @@
"@types/supertest": "^6.0.3",
"@types/swagger-jsdoc": "^6.0.4",
"@types/swagger-ui-express": "^4.1.8",
+ "@types/ws": "^8.18.1",
"@types/zxcvbn": "^4.4.5",
"@typescript-eslint/eslint-plugin": "^8.47.0",
"@typescript-eslint/parser": "^8.47.0",
diff --git a/server.ts b/server.ts
index 5be8908..7120897 100644
--- a/server.ts
+++ b/server.ts
@@ -40,6 +40,7 @@ import reactionsRouter from './src/routes/reactions.routes';
import storeRouter from './src/routes/store.routes';
import { errorHandler } from './src/middleware/errorHandler';
import { backgroundJobService, startBackgroundJobs } from './src/services/backgroundJobService';
+import { websocketService } from './src/services/websocketService.server';
import type { UserProfile } from './src/types';
// API Documentation (ADR-018)
@@ -315,13 +316,17 @@ app.use(errorHandler);
// This prevents the server from trying to listen on a port during tests.
if (process.env.NODE_ENV !== 'test') {
const PORT = process.env.PORT || 3001;
- app.listen(PORT, () => {
+ const server = app.listen(PORT, () => {
logger.info(`Authentication server started on port ${PORT}`);
console.log('--- REGISTERED API ROUTES ---');
console.table(listEndpoints(app));
console.log('-----------------------------');
});
+ // Initialize WebSocket server (ADR-022)
+ websocketService.initialize(server);
+ logger.info('WebSocket server initialized for real-time notifications');
+
// Start the scheduled background jobs
startBackgroundJobs(
backgroundJobService,
@@ -332,8 +337,18 @@ if (process.env.NODE_ENV !== 'test') {
);
// --- Graceful Shutdown Handling ---
- process.on('SIGINT', () => gracefulShutdown('SIGINT'));
- process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
+ const handleShutdown = (signal: string) => {
+ logger.info(`${signal} received, starting graceful shutdown...`);
+
+ // Shutdown WebSocket server
+ websocketService.shutdown();
+
+ // Shutdown queues and workers
+ gracefulShutdown(signal);
+ };
+
+ process.on('SIGINT', () => handleShutdown('SIGINT'));
+ process.on('SIGTERM', () => handleShutdown('SIGTERM'));
}
// Export the app for integration testing
diff --git a/sql/initial_schema.sql b/sql/initial_schema.sql
index fab3106..46bec5a 100644
--- a/sql/initial_schema.sql
+++ b/sql/initial_schema.sql
@@ -458,7 +458,7 @@ CREATE TABLE IF NOT EXISTS public.user_submitted_prices (
user_submitted_price_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
user_id UUID NOT NULL REFERENCES public.users(user_id) ON DELETE CASCADE,
master_item_id BIGINT NOT NULL REFERENCES public.master_grocery_items(master_grocery_item_id) ON DELETE CASCADE,
- store_id BIGINT NOT NULL REFERENCES public.stores(store_id) ON DELETE CASCADE,
+ store_location_id BIGINT NOT NULL REFERENCES public.store_locations(store_location_id) ON DELETE CASCADE,
price_in_cents INTEGER NOT NULL CHECK (price_in_cents > 0),
photo_url TEXT,
upvotes INTEGER DEFAULT 0 NOT NULL CHECK (upvotes >= 0),
@@ -472,6 +472,7 @@ COMMENT ON COLUMN public.user_submitted_prices.photo_url IS 'URL to user-submitt
COMMENT ON COLUMN public.user_submitted_prices.upvotes IS 'Community validation score indicating accuracy.';
CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_user_id ON public.user_submitted_prices(user_id);
CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_master_item_id ON public.user_submitted_prices(master_item_id);
+CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_store_location_id ON public.user_submitted_prices(store_location_id);
-- 22. Log flyer items that could not be automatically matched to a master item.
CREATE TABLE IF NOT EXISTS public.unmatched_flyer_items (
@@ -936,7 +937,7 @@ CREATE INDEX IF NOT EXISTS idx_user_follows_following_id ON public.user_follows(
CREATE TABLE IF NOT EXISTS public.receipts (
receipt_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
user_id UUID NOT NULL REFERENCES public.users(user_id) ON DELETE CASCADE,
- store_id BIGINT REFERENCES public.stores(store_id) ON DELETE CASCADE,
+ store_location_id BIGINT REFERENCES public.store_locations(store_location_id) ON DELETE SET NULL,
receipt_image_url TEXT NOT NULL,
transaction_date TIMESTAMPTZ,
total_amount_cents INTEGER CHECK (total_amount_cents IS NULL OR total_amount_cents >= 0),
@@ -956,7 +957,7 @@ CREATE TABLE IF NOT EXISTS public.receipts (
-- CONSTRAINT receipts_receipt_image_url_check CHECK (receipt_image_url ~* '^https://?.*')
COMMENT ON TABLE public.receipts IS 'Stores uploaded user receipts for purchase tracking and analysis.';
CREATE INDEX IF NOT EXISTS idx_receipts_user_id ON public.receipts(user_id);
-CREATE INDEX IF NOT EXISTS idx_receipts_store_id ON public.receipts(store_id);
+CREATE INDEX IF NOT EXISTS idx_receipts_store_location_id ON public.receipts(store_location_id);
CREATE INDEX IF NOT EXISTS idx_receipts_status_retry ON public.receipts(status, retry_count) WHERE status IN ('pending', 'failed') AND retry_count < 3;
-- 53. Store individual line items extracted from a user receipt.
diff --git a/sql/master_schema_rollup.sql b/sql/master_schema_rollup.sql
index 58c51ff..77e60e0 100644
--- a/sql/master_schema_rollup.sql
+++ b/sql/master_schema_rollup.sql
@@ -475,7 +475,7 @@ CREATE TABLE IF NOT EXISTS public.user_submitted_prices (
user_submitted_price_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
user_id UUID NOT NULL REFERENCES public.users(user_id) ON DELETE CASCADE,
master_item_id BIGINT NOT NULL REFERENCES public.master_grocery_items(master_grocery_item_id) ON DELETE CASCADE,
- store_id BIGINT NOT NULL REFERENCES public.stores(store_id) ON DELETE CASCADE,
+ store_location_id BIGINT NOT NULL REFERENCES public.store_locations(store_location_id) ON DELETE CASCADE,
price_in_cents INTEGER NOT NULL CHECK (price_in_cents > 0),
photo_url TEXT,
upvotes INTEGER DEFAULT 0 NOT NULL CHECK (upvotes >= 0),
@@ -489,6 +489,7 @@ COMMENT ON COLUMN public.user_submitted_prices.photo_url IS 'URL to user-submitt
COMMENT ON COLUMN public.user_submitted_prices.upvotes IS 'Community validation score indicating accuracy.';
CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_user_id ON public.user_submitted_prices(user_id);
CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_master_item_id ON public.user_submitted_prices(master_item_id);
+CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_store_location_id ON public.user_submitted_prices(store_location_id);
-- 22. Log flyer items that could not be automatically matched to a master item.
CREATE TABLE IF NOT EXISTS public.unmatched_flyer_items (
@@ -955,7 +956,7 @@ CREATE INDEX IF NOT EXISTS idx_user_follows_following_id ON public.user_follows(
CREATE TABLE IF NOT EXISTS public.receipts (
receipt_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
user_id UUID NOT NULL REFERENCES public.users(user_id) ON DELETE CASCADE,
- store_id BIGINT REFERENCES public.stores(store_id) ON DELETE CASCADE,
+ store_location_id BIGINT REFERENCES public.store_locations(store_location_id) ON DELETE SET NULL,
receipt_image_url TEXT NOT NULL,
transaction_date TIMESTAMPTZ,
total_amount_cents INTEGER CHECK (total_amount_cents IS NULL OR total_amount_cents >= 0),
@@ -975,7 +976,7 @@ CREATE TABLE IF NOT EXISTS public.receipts (
-- CONSTRAINT receipts_receipt_image_url_check CHECK (receipt_image_url ~* '^https?://.*'),
COMMENT ON TABLE public.receipts IS 'Stores uploaded user receipts for purchase tracking and analysis.';
CREATE INDEX IF NOT EXISTS idx_receipts_user_id ON public.receipts(user_id);
-CREATE INDEX IF NOT EXISTS idx_receipts_store_id ON public.receipts(store_id);
+CREATE INDEX IF NOT EXISTS idx_receipts_store_location_id ON public.receipts(store_location_id);
CREATE INDEX IF NOT EXISTS idx_receipts_status_retry ON public.receipts(status, retry_count) WHERE status IN ('pending', 'failed') AND retry_count < 3;
-- 53. Store individual line items extracted from a user receipt.
diff --git a/sql/migrations/004_populate_flyer_locations.sql b/sql/migrations/004_populate_flyer_locations.sql
new file mode 100644
index 0000000..65a8cd0
--- /dev/null
+++ b/sql/migrations/004_populate_flyer_locations.sql
@@ -0,0 +1,44 @@
+-- Migration: Populate flyer_locations table with existing flyer→store relationships
+-- Purpose: The flyer_locations table was created in the initial schema but never populated.
+-- This migration populates it with data from the legacy flyer.store_id relationship.
+--
+-- Background: The schema correctly defines a many-to-many relationship between flyers
+-- and store_locations via the flyer_locations table, but all code was using
+-- the legacy flyer.store_id foreign key directly.
+
+-- Step 1: For each flyer with a store_id, link it to all locations of that store
+-- This assumes that if a flyer is associated with a store, it's valid at ALL locations of that store
+INSERT INTO public.flyer_locations (flyer_id, store_location_id)
+SELECT DISTINCT
+ f.flyer_id,
+ sl.store_location_id
+FROM public.flyers f
+JOIN public.store_locations sl ON f.store_id = sl.store_id
+WHERE f.store_id IS NOT NULL
+ON CONFLICT (flyer_id, store_location_id) DO NOTHING;
+
+-- Step 2: Add a comment documenting this migration
+COMMENT ON TABLE public.flyer_locations IS
+'A linking table associating a single flyer with multiple store locations where its deals are valid. Populated from legacy flyer.store_id relationships via migration 004.';
+
+-- Step 3: Verify the migration worked
+-- This should return the number of flyer_location entries created
+DO $$
+DECLARE
+ flyer_location_count INTEGER;
+ flyer_with_store_count INTEGER;
+BEGIN
+ SELECT COUNT(*) INTO flyer_location_count FROM public.flyer_locations;
+ SELECT COUNT(*) INTO flyer_with_store_count FROM public.flyers WHERE store_id IS NOT NULL;
+
+ RAISE NOTICE 'Migration 004 complete:';
+ RAISE NOTICE ' - Created % flyer_location entries', flyer_location_count;
+ RAISE NOTICE ' - Based on % flyers with store_id', flyer_with_store_count;
+
+ IF flyer_location_count = 0 AND flyer_with_store_count > 0 THEN
+ RAISE EXCEPTION 'Migration 004 failed: No flyer_locations created but flyers with store_id exist';
+ END IF;
+END $$;
+
+-- Note: The flyer.store_id column is kept for backward compatibility but should eventually be deprecated
+-- Future work: Add a migration to remove flyer.store_id once all code uses flyer_locations
diff --git a/sql/migrations/005_add_store_location_to_user_submitted_prices.sql b/sql/migrations/005_add_store_location_to_user_submitted_prices.sql
new file mode 100644
index 0000000..badc425
--- /dev/null
+++ b/sql/migrations/005_add_store_location_to_user_submitted_prices.sql
@@ -0,0 +1,59 @@
+-- Migration: Add store_location_id to user_submitted_prices table
+-- Purpose: Replace store_id with store_location_id for better geographic specificity.
+-- This allows prices to be specific to individual store locations rather than
+-- all locations of a store chain.
+
+-- Step 1: Add the new column (nullable initially for backward compatibility)
+ALTER TABLE public.user_submitted_prices
+ADD COLUMN store_location_id BIGINT REFERENCES public.store_locations(store_location_id) ON DELETE CASCADE;
+
+-- Step 2: Create index on the new column
+CREATE INDEX IF NOT EXISTS idx_user_submitted_prices_store_location_id
+ON public.user_submitted_prices(store_location_id);
+
+-- Step 3: Migrate existing data
+-- For each existing price with a store_id, link it to the first location of that store
+-- (or a random location if multiple exist)
+UPDATE public.user_submitted_prices usp
+SET store_location_id = sl.store_location_id
+FROM (
+ SELECT DISTINCT ON (store_id)
+ store_id,
+ store_location_id
+ FROM public.store_locations
+ ORDER BY store_id, store_location_id ASC
+) sl
+WHERE usp.store_id = sl.store_id
+AND usp.store_location_id IS NULL;
+
+-- Step 4: Make store_location_id NOT NULL (all existing data should now have values)
+ALTER TABLE public.user_submitted_prices
+ALTER COLUMN store_location_id SET NOT NULL;
+
+-- Step 5: Drop the old store_id column (no longer needed - store_location_id provides better specificity)
+ALTER TABLE public.user_submitted_prices DROP COLUMN store_id;
+
+-- Step 6: Update table comment
+COMMENT ON TABLE public.user_submitted_prices IS
+'Stores item prices submitted by users directly from physical stores. Uses store_location_id for geographic specificity (added in migration 005).';
+
+COMMENT ON COLUMN public.user_submitted_prices.store_location_id IS
+'The specific store location where this price was observed. Provides geographic specificity for price comparisons.';
+
+-- Step 7: Verify the migration
+DO $$
+DECLARE
+ rows_with_location INTEGER;
+ total_rows INTEGER;
+BEGIN
+ SELECT COUNT(*) INTO rows_with_location FROM public.user_submitted_prices WHERE store_location_id IS NOT NULL;
+ SELECT COUNT(*) INTO total_rows FROM public.user_submitted_prices;
+
+ RAISE NOTICE 'Migration 005 complete:';
+ RAISE NOTICE ' - % of % user_submitted_prices now have store_location_id', rows_with_location, total_rows;
+ RAISE NOTICE ' - store_id column has been removed - all prices use store_location_id';
+
+ IF total_rows > 0 AND rows_with_location != total_rows THEN
+ RAISE EXCEPTION 'Migration 005 failed: Not all prices have store_location_id';
+ END IF;
+END $$;
diff --git a/sql/migrations/006_add_store_location_to_receipts.sql b/sql/migrations/006_add_store_location_to_receipts.sql
new file mode 100644
index 0000000..70f7f4c
--- /dev/null
+++ b/sql/migrations/006_add_store_location_to_receipts.sql
@@ -0,0 +1,54 @@
+-- Migration: Add store_location_id to receipts table
+-- Purpose: Replace store_id with store_location_id for better geographic specificity.
+-- This allows receipts to be tied to specific store locations, enabling
+-- location-based shopping pattern analysis and better receipt matching.
+
+-- Step 1: Add the new column (nullable initially for backward compatibility)
+ALTER TABLE public.receipts
+ADD COLUMN store_location_id BIGINT REFERENCES public.store_locations(store_location_id) ON DELETE SET NULL;
+
+-- Step 2: Create index on the new column
+CREATE INDEX IF NOT EXISTS idx_receipts_store_location_id
+ON public.receipts(store_location_id);
+
+-- Step 3: Migrate existing data
+-- For each existing receipt with a store_id, link it to the first location of that store
+UPDATE public.receipts r
+SET store_location_id = sl.store_location_id
+FROM (
+ SELECT DISTINCT ON (store_id)
+ store_id,
+ store_location_id
+ FROM public.store_locations
+ ORDER BY store_id, store_location_id ASC
+) sl
+WHERE r.store_id = sl.store_id
+AND r.store_location_id IS NULL;
+
+-- Step 4: Drop the old store_id column (no longer needed - store_location_id provides better specificity)
+ALTER TABLE public.receipts DROP COLUMN store_id;
+
+-- Step 5: Update table comment
+COMMENT ON TABLE public.receipts IS
+'Stores uploaded user receipts for purchase tracking and analysis. Uses store_location_id for geographic specificity (added in migration 006).';
+
+COMMENT ON COLUMN public.receipts.store_location_id IS
+'The specific store location where this purchase was made. Provides geographic specificity for shopping pattern analysis.';
+
+-- Step 6: Verify the migration
+DO $$
+DECLARE
+ rows_with_location INTEGER;
+ total_rows INTEGER;
+BEGIN
+ SELECT COUNT(*) INTO rows_with_location FROM public.receipts WHERE store_location_id IS NOT NULL;
+ SELECT COUNT(*) INTO total_rows FROM public.receipts;
+
+ RAISE NOTICE 'Migration 006 complete:';
+ RAISE NOTICE ' - Total receipts: %', total_rows;
+ RAISE NOTICE ' - Receipts with store_location_id: %', rows_with_location;
+ RAISE NOTICE ' - store_id column has been removed - all receipts use store_location_id';
+ RAISE NOTICE ' - Note: store_location_id may be NULL if receipt not yet matched to a store';
+END $$;
+
+-- Note: store_location_id is nullable because receipts may not have a matched store yet during processing.
diff --git a/src/components/NotificationBell.tsx b/src/components/NotificationBell.tsx
new file mode 100644
index 0000000..3e21250
--- /dev/null
+++ b/src/components/NotificationBell.tsx
@@ -0,0 +1,131 @@
+// src/components/NotificationBell.tsx
+
+/**
+ * Real-time notification bell component
+ * Displays WebSocket connection status and unread notification count
+ * Integrates with useWebSocket hook for real-time updates
+ */
+
+import { useState, useCallback } from 'react';
+import { Bell, Wifi, WifiOff } from 'lucide-react';
+import { useWebSocket } from '../hooks/useWebSocket';
+import { useEventBus } from '../hooks/useEventBus';
+import type { DealNotificationData } from '../types/websocket';
+
+interface NotificationBellProps {
+ /**
+ * Callback when bell is clicked
+ */
+ onClick?: () => void;
+
+ /**
+ * Whether to show the connection status indicator
+ * @default true
+ */
+ showConnectionStatus?: boolean;
+
+ /**
+ * Custom CSS classes for the bell container
+ */
+ className?: string;
+}
+
+export function NotificationBell({
+ onClick,
+ showConnectionStatus = true,
+ className = '',
+}: NotificationBellProps) {
+ const [unreadCount, setUnreadCount] = useState(0);
+ const { isConnected, error } = useWebSocket({ autoConnect: true });
+
+ // Handle incoming deal notifications
+ const handleDealNotification = useCallback((data?: DealNotificationData) => {
+ if (data) {
+ setUnreadCount((prev) => prev + 1);
+ }
+ }, []);
+
+ // Listen for deal notifications via event bus
+ useEventBus('notification:deal', handleDealNotification);
+
+ // Reset count when clicked
+ const handleClick = () => {
+ setUnreadCount(0);
+ onClick?.();
+ };
+
+ return (
+
+ {/* Notification Bell Button */}
+
+
+ {/* Connection Status Tooltip (shown on hover when disconnected) */}
+ {!isConnected && error && (
+
+
+
+ Live notifications unavailable
+
+
+ )}
+
+ );
+}
+
+/**
+ * Simple connection status indicator (no bell, just status)
+ */
+export function ConnectionStatus() {
+ const { isConnected, error } = useWebSocket({ autoConnect: true });
+
+ return (
+