/** * @ai-summary Data access layer for email ingestion queue and pending vehicle associations * @ai-context Provides CRUD operations with standard mapRow() snake_case -> camelCase conversion */ import { Pool } from 'pg'; import pool from '../../../core/config/database'; import { logger } from '../../../core/logging/logger'; import type { EmailIngestionQueueRecord, EmailIngestionStatus, EmailProcessingResult, PendingVehicleAssociation, PendingAssociationStatus, EmailRecordType, ExtractedReceiptData, } from '../domain/email-ingestion.types'; export class EmailIngestionRepository { constructor(private readonly db: Pool = pool) {} // ======================== // Row Mappers // ======================== private mapQueueRow(row: any): EmailIngestionQueueRecord { return { id: row.id, emailId: row.email_id, senderEmail: row.sender_email, userId: row.user_id, receivedAt: row.received_at, subject: row.subject, status: row.status, processingResult: row.processing_result, errorMessage: row.error_message, retryCount: row.retry_count, createdAt: row.created_at, updatedAt: row.updated_at, }; } private mapPendingAssociationRow(row: any): PendingVehicleAssociation { return { id: row.id, userId: row.user_id, recordType: row.record_type, extractedData: row.extracted_data, documentId: row.document_id, status: row.status, createdAt: row.created_at, resolvedAt: row.resolved_at, }; } // ======================== // Queue Operations // ======================== async insertQueueEntry(entry: { emailId: string; senderEmail: string; userId: string; receivedAt: string; subject: string | null; }): Promise { try { const res = await this.db.query( `INSERT INTO email_ingestion_queue (email_id, sender_email, user_id, received_at, subject, status) VALUES ($1, $2, $3, $4, $5, 'pending') RETURNING *`, [ entry.emailId, entry.senderEmail, entry.userId, entry.receivedAt, entry.subject, ] ); return this.mapQueueRow(res.rows[0]); } catch (error) { logger.error('Error inserting queue entry', { error, emailId: entry.emailId }); throw error; } } async updateQueueStatus( emailId: string, status: EmailIngestionStatus, updates?: { processingResult?: EmailProcessingResult; errorMessage?: string; retryCount?: number; userId?: string; } ): Promise { try { const fields: string[] = ['status = $2']; const params: any[] = [emailId, status]; let paramIndex = 3; if (updates?.processingResult !== undefined) { fields.push(`processing_result = $${paramIndex++}`); params.push(JSON.stringify(updates.processingResult)); } if (updates?.errorMessage !== undefined) { fields.push(`error_message = $${paramIndex++}`); params.push(updates.errorMessage); } if (updates?.retryCount !== undefined) { fields.push(`retry_count = $${paramIndex++}`); params.push(updates.retryCount); } if (updates?.userId !== undefined) { fields.push(`user_id = $${paramIndex++}`); params.push(updates.userId); } const res = await this.db.query( `UPDATE email_ingestion_queue SET ${fields.join(', ')} WHERE email_id = $1 RETURNING *`, params ); return res.rows[0] ? this.mapQueueRow(res.rows[0]) : null; } catch (error) { logger.error('Error updating queue status', { error, emailId, status }); throw error; } } async getQueueEntry(emailId: string): Promise { try { const res = await this.db.query( `SELECT * FROM email_ingestion_queue WHERE email_id = $1`, [emailId] ); return res.rows[0] ? this.mapQueueRow(res.rows[0]) : null; } catch (error) { logger.error('Error fetching queue entry', { error, emailId }); throw error; } } async findByEmailId(emailId: string): Promise { return this.getQueueEntry(emailId); } async getRetryableEntries(maxRetries: number = 3): Promise { try { const res = await this.db.query( `SELECT * FROM email_ingestion_queue WHERE status = 'failed' AND retry_count < $1 ORDER BY created_at ASC`, [maxRetries] ); return res.rows.map(row => this.mapQueueRow(row)); } catch (error) { logger.error('Error fetching retryable entries', { error }); throw error; } } // ======================== // Pending Association Operations // ======================== async insertPendingAssociation(association: { userId: string; recordType: EmailRecordType; extractedData: ExtractedReceiptData; documentId: string | null; }): Promise { try { const res = await this.db.query( `INSERT INTO pending_vehicle_associations (user_id, record_type, extracted_data, document_id, status) VALUES ($1, $2, $3, $4, 'pending') RETURNING *`, [ association.userId, association.recordType, JSON.stringify(association.extractedData), association.documentId, ] ); return this.mapPendingAssociationRow(res.rows[0]); } catch (error) { logger.error('Error inserting pending association', { error, userId: association.userId }); throw error; } } async getPendingAssociationById(associationId: string): Promise { try { const res = await this.db.query( `SELECT * FROM pending_vehicle_associations WHERE id = $1`, [associationId] ); return res.rows[0] ? this.mapPendingAssociationRow(res.rows[0]) : null; } catch (error) { logger.error('Error fetching pending association by id', { error, associationId }); throw error; } } async getPendingAssociationCount(userId: string): Promise { try { const res = await this.db.query( `SELECT COUNT(*)::int AS count FROM pending_vehicle_associations WHERE user_id = $1 AND status = 'pending'`, [userId] ); return res.rows[0]?.count ?? 0; } catch (error) { logger.error('Error counting pending associations', { error, userId }); throw error; } } async getPendingAssociations(userId: string): Promise { try { const res = await this.db.query( `SELECT * FROM pending_vehicle_associations WHERE user_id = $1 AND status = 'pending' ORDER BY created_at DESC`, [userId] ); return res.rows.map(row => this.mapPendingAssociationRow(row)); } catch (error) { logger.error('Error fetching pending associations', { error, userId }); throw error; } } async resolvePendingAssociation( associationId: string, status: PendingAssociationStatus = 'resolved' ): Promise { try { const res = await this.db.query( `UPDATE pending_vehicle_associations SET status = $2, resolved_at = CURRENT_TIMESTAMP WHERE id = $1 RETURNING *`, [associationId, status] ); return res.rows[0] ? this.mapPendingAssociationRow(res.rows[0]) : null; } catch (error) { logger.error('Error resolving pending association', { error, associationId }); throw error; } } }