fix: Implement distribute locker in Redis for cron jobs
Some checks failed
Deploy to Staging / Build Images (push) Failing after 30s
Deploy to Staging / Deploy to Staging (push) Has been skipped
Deploy to Staging / Verify Staging (push) Has been skipped
Deploy to Staging / Notify Staging Ready (push) Has been skipped
Deploy to Staging / Notify Staging Failure (push) Successful in 6s
Some checks failed
Deploy to Staging / Build Images (push) Failing after 30s
Deploy to Staging / Deploy to Staging (push) Has been skipped
Deploy to Staging / Verify Staging (push) Has been skipped
Deploy to Staging / Notify Staging Ready (push) Has been skipped
Deploy to Staging / Notify Staging Failure (push) Successful in 6s
This commit is contained in:
@@ -3,9 +3,45 @@
|
|||||||
## Configuration (`src/core/config/`)
|
## Configuration (`src/core/config/`)
|
||||||
- `config-loader.ts` — Load and validate environment variables
|
- `config-loader.ts` — Load and validate environment variables
|
||||||
- `database.ts` — PostgreSQL connection pool
|
- `database.ts` — PostgreSQL connection pool
|
||||||
- `redis.ts` — Redis client and cache helpers
|
- `redis.ts` — Redis client, cache helpers, and distributed locking
|
||||||
- `user-context.ts` — User context utilities
|
- `user-context.ts` — User context utilities
|
||||||
|
|
||||||
|
### Distributed Lock Service
|
||||||
|
|
||||||
|
The `DistributedLockService` in `redis.ts` provides Redis-based distributed locking for preventing duplicate operations across multiple containers (blue-green deployments).
|
||||||
|
|
||||||
|
**All scheduled jobs MUST use distributed locking** to prevent duplicate execution when multiple backend containers are running.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { lockService } from '../core/config/redis';
|
||||||
|
import { v4 as uuidv4 } from 'uuid';
|
||||||
|
|
||||||
|
// Acquire lock (returns false if already held)
|
||||||
|
const lockKey = 'job:my-scheduled-task';
|
||||||
|
const lockValue = uuidv4(); // Unique identifier for this execution
|
||||||
|
const ttlSeconds = 300; // Auto-release after 5 minutes
|
||||||
|
|
||||||
|
const acquired = await lockService.acquireLock(lockKey, ttlSeconds, lockValue);
|
||||||
|
if (!acquired) {
|
||||||
|
// Another container is already running this job
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Do work...
|
||||||
|
} finally {
|
||||||
|
// Always release the lock
|
||||||
|
await lockService.releaseLock(lockKey, lockValue);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**API:**
|
||||||
|
| Method | Description |
|
||||||
|
|--------|-------------|
|
||||||
|
| `acquireLock(key, ttlSeconds, lockValue)` | Acquire lock atomically (SET NX EX) |
|
||||||
|
| `releaseLock(key, lockValue)` | Release only if we hold it (Lua script) |
|
||||||
|
| `isLocked(key)` | Check if lock exists |
|
||||||
|
|
||||||
## Plugins (`src/core/plugins/`)
|
## Plugins (`src/core/plugins/`)
|
||||||
- `auth.plugin.ts` — Auth0 JWT via JWKS (@fastify/jwt, get-jwks)
|
- `auth.plugin.ts` — Auth0 JWT via JWKS (@fastify/jwt, get-jwks)
|
||||||
- `error.plugin.ts` — Error handling
|
- `error.plugin.ts` — Error handling
|
||||||
|
|||||||
@@ -82,3 +82,75 @@ export class CacheService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const cacheService = new CacheService();
|
export const cacheService = new CacheService();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Distributed lock service for preventing concurrent operations across containers
|
||||||
|
*/
|
||||||
|
export class DistributedLockService {
|
||||||
|
private prefix = 'mvp:lock:';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to acquire a lock with the given key
|
||||||
|
* @param key Lock identifier
|
||||||
|
* @param ttlSeconds Time-to-live in seconds (auto-release)
|
||||||
|
* @param lockValue Unique identifier for this lock holder
|
||||||
|
* @returns true if lock acquired, false if already held
|
||||||
|
*/
|
||||||
|
async acquireLock(key: string, ttlSeconds: number, lockValue: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
// SET NX (only if not exists) with EX (expiry)
|
||||||
|
const result = await redis.set(
|
||||||
|
this.prefix + key,
|
||||||
|
lockValue,
|
||||||
|
'EX',
|
||||||
|
ttlSeconds,
|
||||||
|
'NX'
|
||||||
|
);
|
||||||
|
return result === 'OK';
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Lock acquisition error', { key, error });
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Releases a lock only if we hold it (compare lockValue)
|
||||||
|
* @param key Lock identifier
|
||||||
|
* @param lockValue The value used when acquiring the lock
|
||||||
|
* @returns true if lock was released, false if we didn't hold it
|
||||||
|
*/
|
||||||
|
async releaseLock(key: string, lockValue: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
// Lua script to atomically check and delete
|
||||||
|
const script = `
|
||||||
|
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||||
|
return redis.call("del", KEYS[1])
|
||||||
|
else
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
`;
|
||||||
|
const result = await redis.eval(script, 1, this.prefix + key, lockValue);
|
||||||
|
return result === 1;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Lock release error', { key, error });
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if a lock is currently held
|
||||||
|
* @param key Lock identifier
|
||||||
|
* @returns true if lock exists
|
||||||
|
*/
|
||||||
|
async isLocked(key: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
const exists = await redis.exists(this.prefix + key);
|
||||||
|
return exists === 1;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Lock check error', { key, error });
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const lockService = new DistributedLockService();
|
||||||
|
|||||||
92
backend/src/core/scheduler/README.md
Normal file
92
backend/src/core/scheduler/README.md
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
# Scheduler Module
|
||||||
|
|
||||||
|
Centralized cron job scheduler using `node-cron` for background tasks.
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
The scheduler runs periodic background jobs. In blue-green deployments, **multiple backend containers may run simultaneously**, so all jobs MUST use distributed locking to prevent duplicate execution.
|
||||||
|
|
||||||
|
## Registered Jobs
|
||||||
|
|
||||||
|
| Job | Schedule | Description |
|
||||||
|
|-----|----------|-------------|
|
||||||
|
| Notification processing | 8 AM daily | Process scheduled notifications |
|
||||||
|
| Account purge | 2 AM daily | GDPR compliance - purge deleted accounts |
|
||||||
|
| Backup check | Every minute | Check for due scheduled backups |
|
||||||
|
| Retention cleanup | 4 AM daily | Clean up old backups (also runs after each backup) |
|
||||||
|
|
||||||
|
## Distributed Locking Requirement
|
||||||
|
|
||||||
|
**All scheduled jobs MUST use the `lockService`** from `core/config/redis.ts` to prevent duplicate execution when multiple containers are running.
|
||||||
|
|
||||||
|
### Pattern for New Jobs
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { v4 as uuidv4 } from 'uuid';
|
||||||
|
import { lockService } from '../../core/config/redis';
|
||||||
|
import { logger } from '../../core/logging/logger';
|
||||||
|
|
||||||
|
export async function processMyJob(): Promise<void> {
|
||||||
|
const lockKey = 'job:my-job-name';
|
||||||
|
const lockValue = uuidv4();
|
||||||
|
const lockTtlSeconds = 300; // 5 minutes - adjust based on expected job duration
|
||||||
|
|
||||||
|
// Try to acquire lock
|
||||||
|
const acquired = await lockService.acquireLock(lockKey, lockTtlSeconds, lockValue);
|
||||||
|
if (!acquired) {
|
||||||
|
logger.debug('Job already running in another container, skipping');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.info('Starting my job');
|
||||||
|
// Do work...
|
||||||
|
logger.info('My job completed');
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('My job failed', { error });
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
// Always release the lock
|
||||||
|
await lockService.releaseLock(lockKey, lockValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Lock Key Conventions
|
||||||
|
|
||||||
|
Use descriptive, namespaced lock keys:
|
||||||
|
|
||||||
|
| Pattern | Example | Use Case |
|
||||||
|
|---------|---------|----------|
|
||||||
|
| `job:{name}` | `job:notification-processor` | Global jobs (run once) |
|
||||||
|
| `job:{name}:{id}` | `backup:schedule:uuid-here` | Per-entity jobs |
|
||||||
|
|
||||||
|
### Lock TTL Guidelines
|
||||||
|
|
||||||
|
Set TTL longer than the expected job duration, but short enough to recover from crashes:
|
||||||
|
|
||||||
|
| Job Duration | Recommended TTL |
|
||||||
|
|--------------|-----------------|
|
||||||
|
| < 10 seconds | 60 seconds |
|
||||||
|
| < 1 minute | 5 minutes |
|
||||||
|
| < 5 minutes | 15 minutes |
|
||||||
|
| Long-running | 30 minutes + heartbeat |
|
||||||
|
|
||||||
|
## Adding New Jobs
|
||||||
|
|
||||||
|
1. Create job file in the feature's `jobs/` directory
|
||||||
|
2. Implement distributed locking (see pattern above)
|
||||||
|
3. Register in `core/scheduler/index.ts`
|
||||||
|
4. Update this README with the new job
|
||||||
|
|
||||||
|
## Blue-Green Deployment Behavior
|
||||||
|
|
||||||
|
When both blue and green containers are running:
|
||||||
|
|
||||||
|
1. Both schedulers trigger at the same time
|
||||||
|
2. Both attempt to acquire the lock
|
||||||
|
3. Only one succeeds (atomic Redis operation)
|
||||||
|
4. The other skips the job execution
|
||||||
|
5. Lock is released when job completes
|
||||||
|
|
||||||
|
This ensures exactly-once execution regardless of how many containers are running.
|
||||||
@@ -111,7 +111,23 @@ Backups are stored in `/app/data/backups/` (mapped to `./data/backups/` on host)
|
|||||||
|
|
||||||
Jobs are registered in `backend/src/core/scheduler/index.ts`:
|
Jobs are registered in `backend/src/core/scheduler/index.ts`:
|
||||||
- Backup check: Every minute
|
- Backup check: Every minute
|
||||||
- Retention cleanup: Daily at 4 AM
|
- Retention cleanup: Daily at 4 AM (also runs after each scheduled backup)
|
||||||
|
|
||||||
|
### Distributed Locking
|
||||||
|
|
||||||
|
Scheduled backups use Redis distributed locking to prevent duplicate backups when multiple backend containers are running (blue-green deployments).
|
||||||
|
|
||||||
|
**Lock behavior:**
|
||||||
|
- Lock key: `backup:schedule:{schedule_id}`
|
||||||
|
- Lock TTL: 5 minutes (auto-release if container crashes)
|
||||||
|
- Only one container creates the backup; others skip
|
||||||
|
|
||||||
|
**Retention cleanup:**
|
||||||
|
- Runs immediately after each successful scheduled backup
|
||||||
|
- Deletes backups exceeding the schedule's retention count
|
||||||
|
- Also runs globally at 4 AM daily as a safety net
|
||||||
|
|
||||||
|
See `backend/src/core/scheduler/README.md` for the distributed locking pattern.
|
||||||
|
|
||||||
### Admin Routes
|
### Admin Routes
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,16 @@
|
|||||||
/**
|
/**
|
||||||
* @ai-summary Job for processing scheduled backups
|
* @ai-summary Job for processing scheduled backups
|
||||||
* @ai-context Runs every minute to check for due scheduled backups
|
* @ai-context Runs every minute to check for due scheduled backups
|
||||||
|
* @ai-context Uses distributed locking to prevent duplicate backups in blue-green deployments
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { Pool } from 'pg';
|
import { Pool } from 'pg';
|
||||||
|
import { v4 as uuidv4 } from 'uuid';
|
||||||
import { logger } from '../../../core/logging/logger';
|
import { logger } from '../../../core/logging/logger';
|
||||||
|
import { lockService } from '../../../core/config/redis';
|
||||||
import { BackupRepository } from '../data/backup.repository';
|
import { BackupRepository } from '../data/backup.repository';
|
||||||
import { BackupService } from '../domain/backup.service';
|
import { BackupService } from '../domain/backup.service';
|
||||||
|
import { BackupRetentionService } from '../domain/backup-retention.service';
|
||||||
import { ScheduledBackupJobResult, BackupFrequency } from '../domain/backup.types';
|
import { ScheduledBackupJobResult, BackupFrequency } from '../domain/backup.types';
|
||||||
|
|
||||||
let pool: Pool | null = null;
|
let pool: Pool | null = null;
|
||||||
@@ -18,8 +22,12 @@ export function setBackupJobPool(dbPool: Pool): void {
|
|||||||
pool = dbPool;
|
pool = dbPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Lock TTL: 5 minutes (backup should complete well within this)
|
||||||
|
const BACKUP_LOCK_TTL_SECONDS = 300;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processes all scheduled backups that are due
|
* Processes all scheduled backups that are due
|
||||||
|
* Uses distributed locking to prevent duplicate backups across containers
|
||||||
*/
|
*/
|
||||||
export async function processScheduledBackups(): Promise<ScheduledBackupJobResult> {
|
export async function processScheduledBackups(): Promise<ScheduledBackupJobResult> {
|
||||||
if (!pool) {
|
if (!pool) {
|
||||||
@@ -28,6 +36,7 @@ export async function processScheduledBackups(): Promise<ScheduledBackupJobResul
|
|||||||
|
|
||||||
const repository = new BackupRepository(pool);
|
const repository = new BackupRepository(pool);
|
||||||
const backupService = new BackupService(pool);
|
const backupService = new BackupService(pool);
|
||||||
|
const retentionService = new BackupRetentionService(pool);
|
||||||
|
|
||||||
const result: ScheduledBackupJobResult = {
|
const result: ScheduledBackupJobResult = {
|
||||||
processed: 0,
|
processed: 0,
|
||||||
@@ -48,6 +57,21 @@ export async function processScheduledBackups(): Promise<ScheduledBackupJobResul
|
|||||||
logger.info('Processing scheduled backups', { count: dueSchedules.length });
|
logger.info('Processing scheduled backups', { count: dueSchedules.length });
|
||||||
|
|
||||||
for (const schedule of dueSchedules) {
|
for (const schedule of dueSchedules) {
|
||||||
|
// Generate unique lock value for this execution
|
||||||
|
const lockKey = `backup:schedule:${schedule.id}`;
|
||||||
|
const lockValue = uuidv4();
|
||||||
|
|
||||||
|
// Try to acquire lock for this schedule
|
||||||
|
const lockAcquired = await lockService.acquireLock(lockKey, BACKUP_LOCK_TTL_SECONDS, lockValue);
|
||||||
|
|
||||||
|
if (!lockAcquired) {
|
||||||
|
logger.debug('Backup already in progress for schedule, skipping', {
|
||||||
|
scheduleId: schedule.id,
|
||||||
|
scheduleName: schedule.name,
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
result.processed++;
|
result.processed++;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -71,6 +95,28 @@ export async function processScheduledBackups(): Promise<ScheduledBackupJobResul
|
|||||||
scheduleId: schedule.id,
|
scheduleId: schedule.id,
|
||||||
backupId: backupResult.backupId,
|
backupId: backupResult.backupId,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Run retention cleanup for this schedule immediately after successful backup
|
||||||
|
try {
|
||||||
|
const retentionResult = await retentionService.cleanupScheduleBackups(
|
||||||
|
schedule.id,
|
||||||
|
schedule.name,
|
||||||
|
schedule.retentionCount
|
||||||
|
);
|
||||||
|
if (retentionResult.deletedCount > 0) {
|
||||||
|
logger.info('Retention cleanup completed after backup', {
|
||||||
|
scheduleId: schedule.id,
|
||||||
|
deletedCount: retentionResult.deletedCount,
|
||||||
|
freedBytes: retentionResult.freedBytes,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (retentionError) {
|
||||||
|
logger.error('Retention cleanup failed after backup', {
|
||||||
|
scheduleId: schedule.id,
|
||||||
|
error: retentionError instanceof Error ? retentionError.message : String(retentionError),
|
||||||
|
});
|
||||||
|
// Don't fail the overall backup for retention errors
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
result.failed++;
|
result.failed++;
|
||||||
result.errors.push({
|
result.errors.push({
|
||||||
@@ -103,6 +149,9 @@ export async function processScheduledBackups(): Promise<ScheduledBackupJobResul
|
|||||||
} catch {
|
} catch {
|
||||||
// Ignore error updating next run
|
// Ignore error updating next run
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
// Always release the lock
|
||||||
|
await lockService.releaseLock(lockKey, lockValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -30,10 +30,10 @@ You are a senior software engineer specializsing in NodeJS, Typescript, front en
|
|||||||
*** CHANGES TO IMPLEMENT ***
|
*** CHANGES TO IMPLEMENT ***
|
||||||
- Research this code base and ask iterative questions to compile a complete plan.
|
- Research this code base and ask iterative questions to compile a complete plan.
|
||||||
- We will pair troubleshoot this. Tell me what logs and things to run and I will
|
- We will pair troubleshoot this. Tell me what logs and things to run and I will
|
||||||
- There is current a Dark / Light theme option for this application
|
- There is a backup system built into the admin settings.
|
||||||
- There is logic somewhere in the code that detects the operating systems' theme and uses that. Remove this.
|
- Schedules have been configured but it doesn't appear to be removing old backups
|
||||||
- Default to the light theme for everyone.
|
- The hourly schedule is set to retain 8 backups but it has over 9 saved.
|
||||||
- Retain the functionality for the user to pick dark theme and save that preference.
|
- It is also creating multiple backups per run.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user