From d8ea0c7297798825582119bd8884f48b3b342939 Mon Sep 17 00:00:00 2001 From: Eric Gullickson <16152721+ericgullickson@users.noreply.github.com> Date: Thu, 1 Jan 2026 11:02:54 -0600 Subject: [PATCH] fix: Implement distribute locker in Redis for cron jobs --- backend/src/core/README.md | 38 +++++++- backend/src/core/config/redis.ts | 72 +++++++++++++++ backend/src/core/scheduler/README.md | 92 +++++++++++++++++++ backend/src/features/backup/README.md | 18 +++- .../backup/jobs/backup-scheduled.job.ts | 49 ++++++++++ docs/PROMPTS.md | 8 +- 6 files changed, 271 insertions(+), 6 deletions(-) create mode 100644 backend/src/core/scheduler/README.md diff --git a/backend/src/core/README.md b/backend/src/core/README.md index 37af905..d0204b9 100644 --- a/backend/src/core/README.md +++ b/backend/src/core/README.md @@ -3,9 +3,45 @@ ## Configuration (`src/core/config/`) - `config-loader.ts` — Load and validate environment variables - `database.ts` — PostgreSQL connection pool -- `redis.ts` — Redis client and cache helpers +- `redis.ts` — Redis client, cache helpers, and distributed locking - `user-context.ts` — User context utilities +### Distributed Lock Service + +The `DistributedLockService` in `redis.ts` provides Redis-based distributed locking for preventing duplicate operations across multiple containers (blue-green deployments). + +**All scheduled jobs MUST use distributed locking** to prevent duplicate execution when multiple backend containers are running. + +```typescript +import { lockService } from '../core/config/redis'; +import { v4 as uuidv4 } from 'uuid'; + +// Acquire lock (returns false if already held) +const lockKey = 'job:my-scheduled-task'; +const lockValue = uuidv4(); // Unique identifier for this execution +const ttlSeconds = 300; // Auto-release after 5 minutes + +const acquired = await lockService.acquireLock(lockKey, ttlSeconds, lockValue); +if (!acquired) { + // Another container is already running this job + return; +} + +try { + // Do work... +} finally { + // Always release the lock + await lockService.releaseLock(lockKey, lockValue); +} +``` + +**API:** +| Method | Description | +|--------|-------------| +| `acquireLock(key, ttlSeconds, lockValue)` | Acquire lock atomically (SET NX EX) | +| `releaseLock(key, lockValue)` | Release only if we hold it (Lua script) | +| `isLocked(key)` | Check if lock exists | + ## Plugins (`src/core/plugins/`) - `auth.plugin.ts` — Auth0 JWT via JWKS (@fastify/jwt, get-jwks) - `error.plugin.ts` — Error handling diff --git a/backend/src/core/config/redis.ts b/backend/src/core/config/redis.ts index d4e1374..dca9c48 100644 --- a/backend/src/core/config/redis.ts +++ b/backend/src/core/config/redis.ts @@ -82,3 +82,75 @@ export class CacheService { } export const cacheService = new CacheService(); + +/** + * Distributed lock service for preventing concurrent operations across containers + */ +export class DistributedLockService { + private prefix = 'mvp:lock:'; + + /** + * Attempts to acquire a lock with the given key + * @param key Lock identifier + * @param ttlSeconds Time-to-live in seconds (auto-release) + * @param lockValue Unique identifier for this lock holder + * @returns true if lock acquired, false if already held + */ + async acquireLock(key: string, ttlSeconds: number, lockValue: string): Promise { + try { + // SET NX (only if not exists) with EX (expiry) + const result = await redis.set( + this.prefix + key, + lockValue, + 'EX', + ttlSeconds, + 'NX' + ); + return result === 'OK'; + } catch (error) { + logger.error('Lock acquisition error', { key, error }); + return false; + } + } + + /** + * Releases a lock only if we hold it (compare lockValue) + * @param key Lock identifier + * @param lockValue The value used when acquiring the lock + * @returns true if lock was released, false if we didn't hold it + */ + async releaseLock(key: string, lockValue: string): Promise { + try { + // Lua script to atomically check and delete + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + `; + const result = await redis.eval(script, 1, this.prefix + key, lockValue); + return result === 1; + } catch (error) { + logger.error('Lock release error', { key, error }); + return false; + } + } + + /** + * Checks if a lock is currently held + * @param key Lock identifier + * @returns true if lock exists + */ + async isLocked(key: string): Promise { + try { + const exists = await redis.exists(this.prefix + key); + return exists === 1; + } catch (error) { + logger.error('Lock check error', { key, error }); + return false; + } + } +} + +export const lockService = new DistributedLockService(); diff --git a/backend/src/core/scheduler/README.md b/backend/src/core/scheduler/README.md new file mode 100644 index 0000000..49ab75b --- /dev/null +++ b/backend/src/core/scheduler/README.md @@ -0,0 +1,92 @@ +# Scheduler Module + +Centralized cron job scheduler using `node-cron` for background tasks. + +## Overview + +The scheduler runs periodic background jobs. In blue-green deployments, **multiple backend containers may run simultaneously**, so all jobs MUST use distributed locking to prevent duplicate execution. + +## Registered Jobs + +| Job | Schedule | Description | +|-----|----------|-------------| +| Notification processing | 8 AM daily | Process scheduled notifications | +| Account purge | 2 AM daily | GDPR compliance - purge deleted accounts | +| Backup check | Every minute | Check for due scheduled backups | +| Retention cleanup | 4 AM daily | Clean up old backups (also runs after each backup) | + +## Distributed Locking Requirement + +**All scheduled jobs MUST use the `lockService`** from `core/config/redis.ts` to prevent duplicate execution when multiple containers are running. + +### Pattern for New Jobs + +```typescript +import { v4 as uuidv4 } from 'uuid'; +import { lockService } from '../../core/config/redis'; +import { logger } from '../../core/logging/logger'; + +export async function processMyJob(): Promise { + const lockKey = 'job:my-job-name'; + const lockValue = uuidv4(); + const lockTtlSeconds = 300; // 5 minutes - adjust based on expected job duration + + // Try to acquire lock + const acquired = await lockService.acquireLock(lockKey, lockTtlSeconds, lockValue); + if (!acquired) { + logger.debug('Job already running in another container, skipping'); + return; + } + + try { + logger.info('Starting my job'); + // Do work... + logger.info('My job completed'); + } catch (error) { + logger.error('My job failed', { error }); + throw error; + } finally { + // Always release the lock + await lockService.releaseLock(lockKey, lockValue); + } +} +``` + +### Lock Key Conventions + +Use descriptive, namespaced lock keys: + +| Pattern | Example | Use Case | +|---------|---------|----------| +| `job:{name}` | `job:notification-processor` | Global jobs (run once) | +| `job:{name}:{id}` | `backup:schedule:uuid-here` | Per-entity jobs | + +### Lock TTL Guidelines + +Set TTL longer than the expected job duration, but short enough to recover from crashes: + +| Job Duration | Recommended TTL | +|--------------|-----------------| +| < 10 seconds | 60 seconds | +| < 1 minute | 5 minutes | +| < 5 minutes | 15 minutes | +| Long-running | 30 minutes + heartbeat | + +## Adding New Jobs + +1. Create job file in the feature's `jobs/` directory +2. Implement distributed locking (see pattern above) +3. Register in `core/scheduler/index.ts` +4. Update this README with the new job + +## Blue-Green Deployment Behavior + +When both blue and green containers are running: + +1. Both schedulers trigger at the same time +2. Both attempt to acquire the lock +3. Only one succeeds (atomic Redis operation) +4. The other skips the job execution +5. Lock is released when job completes + +This ensures exactly-once execution regardless of how many containers are running. diff --git a/backend/src/features/backup/README.md b/backend/src/features/backup/README.md index 72c1d6e..e3146b2 100644 --- a/backend/src/features/backup/README.md +++ b/backend/src/features/backup/README.md @@ -111,7 +111,23 @@ Backups are stored in `/app/data/backups/` (mapped to `./data/backups/` on host) Jobs are registered in `backend/src/core/scheduler/index.ts`: - Backup check: Every minute -- Retention cleanup: Daily at 4 AM +- Retention cleanup: Daily at 4 AM (also runs after each scheduled backup) + +### Distributed Locking + +Scheduled backups use Redis distributed locking to prevent duplicate backups when multiple backend containers are running (blue-green deployments). + +**Lock behavior:** +- Lock key: `backup:schedule:{schedule_id}` +- Lock TTL: 5 minutes (auto-release if container crashes) +- Only one container creates the backup; others skip + +**Retention cleanup:** +- Runs immediately after each successful scheduled backup +- Deletes backups exceeding the schedule's retention count +- Also runs globally at 4 AM daily as a safety net + +See `backend/src/core/scheduler/README.md` for the distributed locking pattern. ### Admin Routes diff --git a/backend/src/features/backup/jobs/backup-scheduled.job.ts b/backend/src/features/backup/jobs/backup-scheduled.job.ts index 505b686..1a9d4fe 100644 --- a/backend/src/features/backup/jobs/backup-scheduled.job.ts +++ b/backend/src/features/backup/jobs/backup-scheduled.job.ts @@ -1,12 +1,16 @@ /** * @ai-summary Job for processing scheduled backups * @ai-context Runs every minute to check for due scheduled backups + * @ai-context Uses distributed locking to prevent duplicate backups in blue-green deployments */ import { Pool } from 'pg'; +import { v4 as uuidv4 } from 'uuid'; import { logger } from '../../../core/logging/logger'; +import { lockService } from '../../../core/config/redis'; import { BackupRepository } from '../data/backup.repository'; import { BackupService } from '../domain/backup.service'; +import { BackupRetentionService } from '../domain/backup-retention.service'; import { ScheduledBackupJobResult, BackupFrequency } from '../domain/backup.types'; let pool: Pool | null = null; @@ -18,8 +22,12 @@ export function setBackupJobPool(dbPool: Pool): void { pool = dbPool; } +// Lock TTL: 5 minutes (backup should complete well within this) +const BACKUP_LOCK_TTL_SECONDS = 300; + /** * Processes all scheduled backups that are due + * Uses distributed locking to prevent duplicate backups across containers */ export async function processScheduledBackups(): Promise { if (!pool) { @@ -28,6 +36,7 @@ export async function processScheduledBackups(): Promise 0) { + logger.info('Retention cleanup completed after backup', { + scheduleId: schedule.id, + deletedCount: retentionResult.deletedCount, + freedBytes: retentionResult.freedBytes, + }); + } + } catch (retentionError) { + logger.error('Retention cleanup failed after backup', { + scheduleId: schedule.id, + error: retentionError instanceof Error ? retentionError.message : String(retentionError), + }); + // Don't fail the overall backup for retention errors + } } else { result.failed++; result.errors.push({ @@ -103,6 +149,9 @@ export async function processScheduledBackups(): Promise