diff --git a/backend/src/features/email-ingestion/api/email-ingestion.controller.ts b/backend/src/features/email-ingestion/api/email-ingestion.controller.ts index 73ac9d3..1a4247e 100644 --- a/backend/src/features/email-ingestion/api/email-ingestion.controller.ts +++ b/backend/src/features/email-ingestion/api/email-ingestion.controller.ts @@ -5,15 +5,20 @@ import { FastifyRequest, FastifyReply } from 'fastify'; import { ResendInboundClient } from '../external/resend-inbound.client'; -import { pool } from '../../../core/config/database'; +import { EmailIngestionRepository } from '../data/email-ingestion.repository'; +import { EmailIngestionService } from '../domain/email-ingestion.service'; import { logger } from '../../../core/logging/logger'; import type { ResendWebhookEvent } from '../domain/email-ingestion.types'; export class EmailIngestionController { private resendClient: ResendInboundClient; + private repository: EmailIngestionRepository; + private service: EmailIngestionService; constructor() { this.resendClient = new ResendInboundClient(); + this.repository = new EmailIngestionRepository(); + this.service = new EmailIngestionService(); } async handleInboundWebhook(request: FastifyRequest, reply: FastifyReply): Promise { @@ -44,30 +49,20 @@ export class EmailIngestionController { const senderEmail = event.data.from; // Idempotency check: reject if email_id already exists in queue - const existing = await pool.query( - 'SELECT id FROM email_ingestion_queue WHERE email_id = $1', - [emailId] - ); - - if (existing.rows.length > 0) { + const existing = await this.repository.findByEmailId(emailId); + if (existing) { logger.info('Duplicate email webhook received, skipping', { emailId }); return reply.status(200).send({ received: true, duplicate: true }); } - // Insert queue record with status=pending - // user_id is set to sender_email initially; async processing resolves to Auth0 sub - await pool.query( - `INSERT INTO email_ingestion_queue - (email_id, sender_email, user_id, received_at, subject, status) - VALUES ($1, $2, $3, $4, $5, 'pending')`, - [ - emailId, - senderEmail, - senderEmail, - event.data.created_at || new Date().toISOString(), - event.data.subject, - ] - ); + // Insert queue record with status=pending via repository + await this.repository.insertQueueEntry({ + emailId, + senderEmail, + userId: senderEmail, // Resolved to auth0_sub during processing + receivedAt: event.data.created_at || new Date().toISOString(), + subject: event.data.subject, + }); logger.info('Inbound email queued for processing', { emailId, senderEmail }); @@ -76,7 +71,7 @@ export class EmailIngestionController { // Trigger async processing via setImmediate setImmediate(() => { - this.processEmailAsync(emailId, event).catch((error) => { + this.service.processEmail(emailId, event).catch((error) => { logger.error('Async email processing failed', { emailId, error: error instanceof Error ? error.message : String(error), @@ -91,41 +86,4 @@ export class EmailIngestionController { return reply.status(500).send({ error: 'Webhook processing failed' }); } } - - /** - * Async email processing stub - full implementation in later sub-issue. - * Will handle: sender validation, user lookup, OCR, record creation, notifications. - */ - private async processEmailAsync(emailId: string, event: ResendWebhookEvent): Promise { - try { - await pool.query( - "UPDATE email_ingestion_queue SET status = 'processing' WHERE email_id = $1", - [emailId] - ); - - logger.info('Async email processing started', { - emailId, - subject: event.data.subject, - attachmentCount: event.data.attachments?.length || 0, - }); - - // Full processing pipeline will be implemented in subsequent sub-issues: - // 1. Sender validation (lookup user by email) - // 2. Fetch and parse raw email via ResendInboundClient - // 3. OCR attachments via existing OCR service - // 4. Classify record type (fuel vs maintenance) - // 5. Create record or queue for vehicle association - // 6. Send notification emails - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.error('Email processing pipeline error', { emailId, error: errorMessage }); - - await pool.query( - "UPDATE email_ingestion_queue SET status = 'failed', error_message = $2 WHERE email_id = $1", - [emailId, errorMessage] - ).catch((dbError) => { - logger.error('Failed to update queue status to failed', { emailId, error: dbError }); - }); - } - } } diff --git a/backend/src/features/email-ingestion/data/email-ingestion.repository.ts b/backend/src/features/email-ingestion/data/email-ingestion.repository.ts new file mode 100644 index 0000000..4ea1bb0 --- /dev/null +++ b/backend/src/features/email-ingestion/data/email-ingestion.repository.ts @@ -0,0 +1,230 @@ +/** + * @ai-summary Data access layer for email ingestion queue and pending vehicle associations + * @ai-context Provides CRUD operations with standard mapRow() snake_case -> camelCase conversion + */ + +import { Pool } from 'pg'; +import pool from '../../../core/config/database'; +import { logger } from '../../../core/logging/logger'; +import type { + EmailIngestionQueueRecord, + EmailIngestionStatus, + EmailProcessingResult, + PendingVehicleAssociation, + PendingAssociationStatus, + EmailRecordType, + ExtractedReceiptData, +} from '../domain/email-ingestion.types'; + +export class EmailIngestionRepository { + constructor(private readonly db: Pool = pool) {} + + // ======================== + // Row Mappers + // ======================== + + private mapQueueRow(row: any): EmailIngestionQueueRecord { + return { + id: row.id, + emailId: row.email_id, + senderEmail: row.sender_email, + userId: row.user_id, + receivedAt: row.received_at, + subject: row.subject, + status: row.status, + processingResult: row.processing_result, + errorMessage: row.error_message, + retryCount: row.retry_count, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; + } + + private mapPendingAssociationRow(row: any): PendingVehicleAssociation { + return { + id: row.id, + userId: row.user_id, + recordType: row.record_type, + extractedData: row.extracted_data, + documentId: row.document_id, + status: row.status, + createdAt: row.created_at, + resolvedAt: row.resolved_at, + }; + } + + // ======================== + // Queue Operations + // ======================== + + async insertQueueEntry(entry: { + emailId: string; + senderEmail: string; + userId: string; + receivedAt: string; + subject: string | null; + }): Promise { + try { + const res = await this.db.query( + `INSERT INTO email_ingestion_queue + (email_id, sender_email, user_id, received_at, subject, status) + VALUES ($1, $2, $3, $4, $5, 'pending') + RETURNING *`, + [ + entry.emailId, + entry.senderEmail, + entry.userId, + entry.receivedAt, + entry.subject, + ] + ); + return this.mapQueueRow(res.rows[0]); + } catch (error) { + logger.error('Error inserting queue entry', { error, emailId: entry.emailId }); + throw error; + } + } + + async updateQueueStatus( + emailId: string, + status: EmailIngestionStatus, + updates?: { + processingResult?: EmailProcessingResult; + errorMessage?: string; + retryCount?: number; + userId?: string; + } + ): Promise { + try { + const fields: string[] = ['status = $2']; + const params: any[] = [emailId, status]; + let paramIndex = 3; + + if (updates?.processingResult !== undefined) { + fields.push(`processing_result = $${paramIndex++}`); + params.push(JSON.stringify(updates.processingResult)); + } + if (updates?.errorMessage !== undefined) { + fields.push(`error_message = $${paramIndex++}`); + params.push(updates.errorMessage); + } + if (updates?.retryCount !== undefined) { + fields.push(`retry_count = $${paramIndex++}`); + params.push(updates.retryCount); + } + if (updates?.userId !== undefined) { + fields.push(`user_id = $${paramIndex++}`); + params.push(updates.userId); + } + + const res = await this.db.query( + `UPDATE email_ingestion_queue + SET ${fields.join(', ')} + WHERE email_id = $1 + RETURNING *`, + params + ); + return res.rows[0] ? this.mapQueueRow(res.rows[0]) : null; + } catch (error) { + logger.error('Error updating queue status', { error, emailId, status }); + throw error; + } + } + + async getQueueEntry(emailId: string): Promise { + try { + const res = await this.db.query( + `SELECT * FROM email_ingestion_queue WHERE email_id = $1`, + [emailId] + ); + return res.rows[0] ? this.mapQueueRow(res.rows[0]) : null; + } catch (error) { + logger.error('Error fetching queue entry', { error, emailId }); + throw error; + } + } + + async findByEmailId(emailId: string): Promise { + return this.getQueueEntry(emailId); + } + + async getRetryableEntries(maxRetries: number = 3): Promise { + try { + const res = await this.db.query( + `SELECT * FROM email_ingestion_queue + WHERE status = 'failed' + AND retry_count < $1 + ORDER BY created_at ASC`, + [maxRetries] + ); + return res.rows.map(row => this.mapQueueRow(row)); + } catch (error) { + logger.error('Error fetching retryable entries', { error }); + throw error; + } + } + + // ======================== + // Pending Association Operations + // ======================== + + async insertPendingAssociation(association: { + userId: string; + recordType: EmailRecordType; + extractedData: ExtractedReceiptData; + documentId: string | null; + }): Promise { + try { + const res = await this.db.query( + `INSERT INTO pending_vehicle_associations + (user_id, record_type, extracted_data, document_id, status) + VALUES ($1, $2, $3, $4, 'pending') + RETURNING *`, + [ + association.userId, + association.recordType, + JSON.stringify(association.extractedData), + association.documentId, + ] + ); + return this.mapPendingAssociationRow(res.rows[0]); + } catch (error) { + logger.error('Error inserting pending association', { error, userId: association.userId }); + throw error; + } + } + + async getPendingAssociations(userId: string): Promise { + try { + const res = await this.db.query( + `SELECT * FROM pending_vehicle_associations + WHERE user_id = $1 AND status = 'pending' + ORDER BY created_at DESC`, + [userId] + ); + return res.rows.map(row => this.mapPendingAssociationRow(row)); + } catch (error) { + logger.error('Error fetching pending associations', { error, userId }); + throw error; + } + } + + async resolvePendingAssociation( + associationId: string, + status: PendingAssociationStatus = 'resolved' + ): Promise { + try { + const res = await this.db.query( + `UPDATE pending_vehicle_associations + SET status = $2, resolved_at = CURRENT_TIMESTAMP + WHERE id = $1 + RETURNING *`, + [associationId, status] + ); + return res.rows[0] ? this.mapPendingAssociationRow(res.rows[0]) : null; + } catch (error) { + logger.error('Error resolving pending association', { error, associationId }); + throw error; + } + } +} diff --git a/backend/src/features/email-ingestion/domain/email-ingestion.service.ts b/backend/src/features/email-ingestion/domain/email-ingestion.service.ts new file mode 100644 index 0000000..05e4ed9 --- /dev/null +++ b/backend/src/features/email-ingestion/domain/email-ingestion.service.ts @@ -0,0 +1,669 @@ +/** + * @ai-summary Core processing service for the email-to-record pipeline + * @ai-context Orchestrates sender validation, OCR extraction, record classification, + * vehicle association, status tracking, retry logic, and reply emails + */ + +import { Pool } from 'pg'; +import pool from '../../../core/config/database'; +import { logger } from '../../../core/logging/logger'; +import { EmailIngestionRepository } from '../data/email-ingestion.repository'; +import { ResendInboundClient, type ParsedEmailAttachment } from '../external/resend-inbound.client'; +import { UserProfileRepository } from '../../user-profile/data/user-profile.repository'; +import { VehiclesRepository } from '../../vehicles/data/vehicles.repository'; +import { NotificationsRepository } from '../../notifications/data/notifications.repository'; +import { TemplateService } from '../../notifications/domain/template.service'; +import { EmailService } from '../../notifications/domain/email.service'; +import { ocrService } from '../../ocr/domain/ocr.service'; +import type { ReceiptExtractionResponse } from '../../ocr/domain/ocr.types'; +import type { + ResendWebhookEvent, + EmailProcessingResult, + ExtractedReceiptData, + EmailRecordType, +} from './email-ingestion.types'; + +/** Supported attachment MIME types */ +const SUPPORTED_ATTACHMENT_TYPES = new Set([ + 'application/pdf', + 'image/png', + 'image/jpeg', + 'image/heic', + 'image/heif', +]); + +/** Image types that work with receipt-specific OCR */ +const OCR_RECEIPT_IMAGE_TYPES = new Set([ + 'image/jpeg', + 'image/png', + 'image/heic', + 'image/heif', +]); + +const MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024; // 10MB +const MAX_RETRY_COUNT = 3; + +export class EmailIngestionService { + private repository: EmailIngestionRepository; + private resendClient: ResendInboundClient; + private userProfileRepository: UserProfileRepository; + private vehiclesRepository: VehiclesRepository; + private notificationsRepository: NotificationsRepository; + private templateService: TemplateService; + private emailService: EmailService; + + constructor(dbPool?: Pool) { + const p = dbPool || pool; + this.repository = new EmailIngestionRepository(p); + this.resendClient = new ResendInboundClient(); + this.userProfileRepository = new UserProfileRepository(p); + this.vehiclesRepository = new VehiclesRepository(p); + this.notificationsRepository = new NotificationsRepository(p); + this.templateService = new TemplateService(); + this.emailService = new EmailService(); + } + + // ======================== + // Main Processing Pipeline + // ======================== + + /** + * Process an inbound email through the full pipeline. + * Called asynchronously after webhook receipt is acknowledged. + */ + async processEmail(emailId: string, event: ResendWebhookEvent): Promise { + const senderEmail = event.data.from; + const subject = event.data.subject; + + try { + // 1. Mark as processing + await this.repository.updateQueueStatus(emailId, 'processing'); + + // 2. Validate sender + const userProfile = await this.validateSender(senderEmail); + if (!userProfile) { + await this.handleUnregisteredSender(emailId, senderEmail, subject); + return; + } + + const userId = userProfile.auth0Sub; + const userName = userProfile.displayName || userProfile.email; + + // Update queue with resolved user_id + await this.repository.updateQueueStatus(emailId, 'processing', { userId }); + + // 3. Get attachments (from webhook data or by fetching raw email) + const attachments = await this.getAttachments(emailId, event); + + // 4. Filter valid attachments + const validAttachments = this.filterAttachments(attachments); + if (validAttachments.length === 0) { + await this.handleNoValidAttachments(emailId, senderEmail, userName, subject); + return; + } + + // 5. Process first valid image attachment through OCR + const ocrResult = await this.processAttachmentsWithOcr(userId, validAttachments); + if (!ocrResult) { + await this.handleOcrFailure(emailId, senderEmail, userName, subject, 'No receipt data could be extracted from attachments'); + return; + } + + // 6. Build extracted data from OCR result + const extractedData = this.mapOcrToExtractedData(ocrResult.response); + const recordType = ocrResult.recordType; + + // 7. Handle vehicle association + const processingResult = await this.handleVehicleAssociation( + userId, userName, senderEmail, recordType, extractedData + ); + + // 8. Mark as completed + await this.repository.updateQueueStatus(emailId, 'completed', { + processingResult, + }); + + // 9. Send confirmation email + await this.sendConfirmationEmail(senderEmail, userName, processingResult); + + logger.info('Email processing completed successfully', { + emailId, + userId, + recordType, + vehicleId: processingResult.vehicleId, + pendingAssociationId: processingResult.pendingAssociationId, + }); + } catch (error) { + await this.handleProcessingError(emailId, senderEmail, subject, error); + } + } + + // ======================== + // Sender Validation + // ======================== + + private async validateSender(senderEmail: string): Promise<{ + auth0Sub: string; + email: string; + displayName: string | null; + } | null> { + // Case-insensitive lookup by lowercasing the sender email + const profile = await this.userProfileRepository.getByEmail(senderEmail.toLowerCase()); + if (profile) { + return { + auth0Sub: profile.auth0Sub, + email: profile.email, + displayName: profile.displayName ?? null, + }; + } + + // Try original case as fallback + if (senderEmail !== senderEmail.toLowerCase()) { + const fallback = await this.userProfileRepository.getByEmail(senderEmail); + if (fallback) { + return { + auth0Sub: fallback.auth0Sub, + email: fallback.email, + displayName: fallback.displayName ?? null, + }; + } + } + + return null; + } + + // ======================== + // Attachment Handling + // ======================== + + /** + * Get attachments from webhook data or by fetching the raw email + */ + private async getAttachments( + emailId: string, + event: ResendWebhookEvent + ): Promise { + // If webhook includes attachments with content, use those + if (event.data.attachments && event.data.attachments.length > 0) { + return event.data.attachments.map(att => ({ + filename: att.filename, + contentType: att.content_type, + content: Buffer.from(att.content, 'base64'), + size: Buffer.from(att.content, 'base64').length, + })); + } + + // Otherwise fetch and parse the raw email + try { + const { downloadUrl } = await this.resendClient.getEmail(emailId); + const rawEmail = await this.resendClient.downloadRawEmail(downloadUrl); + const parsed = await this.resendClient.parseEmail(rawEmail); + return parsed.attachments; + } catch (error) { + logger.warn('Failed to fetch raw email for attachments', { + emailId, + error: error instanceof Error ? error.message : String(error), + }); + return []; + } + } + + /** + * Filter attachments by supported type and size + */ + private filterAttachments(attachments: ParsedEmailAttachment[]): ParsedEmailAttachment[] { + return attachments.filter(att => { + if (!SUPPORTED_ATTACHMENT_TYPES.has(att.contentType)) { + logger.info('Skipping unsupported attachment type', { + filename: att.filename, + contentType: att.contentType, + }); + return false; + } + + if (att.size > MAX_ATTACHMENT_SIZE) { + logger.info('Skipping oversized attachment', { + filename: att.filename, + size: att.size, + maxSize: MAX_ATTACHMENT_SIZE, + }); + return false; + } + + return true; + }); + } + + // ======================== + // OCR Processing + // ======================== + + /** + * Process attachments through OCR, trying fuel then maintenance receipt extraction. + * Returns the first successful result. + */ + private async processAttachmentsWithOcr( + userId: string, + attachments: ParsedEmailAttachment[] + ): Promise<{ response: ReceiptExtractionResponse; recordType: EmailRecordType } | null> { + // Process only image attachments that the receipt OCR supports + const imageAttachments = attachments.filter(att => OCR_RECEIPT_IMAGE_TYPES.has(att.contentType)); + + for (const attachment of imageAttachments) { + const result = await this.classifyAndExtract(userId, attachment); + if (result) return result; + } + + return null; + } + + /** + * Try both fuel and maintenance OCR extractors, return the better result + */ + private async classifyAndExtract( + userId: string, + attachment: ParsedEmailAttachment + ): Promise<{ response: ReceiptExtractionResponse; recordType: EmailRecordType } | null> { + let fuelResult: ReceiptExtractionResponse | null = null; + let maintenanceResult: ReceiptExtractionResponse | null = null; + + // Try fuel receipt extraction + try { + fuelResult = await ocrService.extractReceipt(userId, { + fileBuffer: attachment.content, + contentType: attachment.contentType, + receiptType: 'fuel', + }); + } catch (error) { + logger.info('Fuel receipt extraction failed, trying maintenance', { + filename: attachment.filename, + error: error instanceof Error ? error.message : String(error), + }); + } + + // Try maintenance receipt extraction + try { + maintenanceResult = await ocrService.extractMaintenanceReceipt(userId, { + fileBuffer: attachment.content, + contentType: attachment.contentType, + }); + } catch (error) { + logger.info('Maintenance receipt extraction failed', { + filename: attachment.filename, + error: error instanceof Error ? error.message : String(error), + }); + } + + // Compare results and pick the best one + return this.selectBestResult(fuelResult, maintenanceResult); + } + + /** + * Select the better OCR result based on extracted field count and success + */ + private selectBestResult( + fuelResult: ReceiptExtractionResponse | null, + maintenanceResult: ReceiptExtractionResponse | null + ): { response: ReceiptExtractionResponse; recordType: EmailRecordType } | null { + const fuelFieldCount = fuelResult?.success + ? Object.keys(fuelResult.extractedFields).length + : 0; + const maintenanceFieldCount = maintenanceResult?.success + ? Object.keys(maintenanceResult.extractedFields).length + : 0; + + if (fuelFieldCount === 0 && maintenanceFieldCount === 0) { + return null; + } + + // Check for fuel-specific fields to improve classification + const hasFuelFields = fuelResult?.extractedFields['gallons'] || + fuelResult?.extractedFields['price_per_gallon'] || + fuelResult?.extractedFields['fuel_type']; + + const hasMaintenanceFields = maintenanceResult?.extractedFields['category'] || + maintenanceResult?.extractedFields['shop_name'] || + maintenanceResult?.extractedFields['description']; + + // Prefer the result with domain-specific fields + if (hasFuelFields && !hasMaintenanceFields) { + return { response: fuelResult!, recordType: 'fuel_log' }; + } + if (hasMaintenanceFields && !hasFuelFields) { + return { response: maintenanceResult!, recordType: 'maintenance_record' }; + } + + // Fall back to field count comparison + if (fuelFieldCount >= maintenanceFieldCount && fuelResult?.success) { + return { response: fuelResult, recordType: 'fuel_log' }; + } + if (maintenanceResult?.success) { + return { response: maintenanceResult, recordType: 'maintenance_record' }; + } + + return null; + } + + /** + * Map OCR extracted fields to our ExtractedReceiptData format + */ + private mapOcrToExtractedData(response: ReceiptExtractionResponse): ExtractedReceiptData { + const fields = response.extractedFields; + const getFieldValue = (key: string): string | null => + fields[key]?.value || null; + const getFieldNumber = (key: string): number | null => { + const val = fields[key]?.value; + if (!val) return null; + const num = parseFloat(val); + return isNaN(num) ? null : num; + }; + + return { + vendor: getFieldValue('vendor') || getFieldValue('shop_name'), + date: getFieldValue('date'), + total: getFieldNumber('total'), + odometerReading: getFieldNumber('odometer') || getFieldNumber('odometer_reading'), + gallons: getFieldNumber('gallons'), + pricePerGallon: getFieldNumber('price_per_gallon'), + fuelType: getFieldValue('fuel_type'), + category: getFieldValue('category'), + subtypes: fields['subtypes']?.value ? fields['subtypes'].value.split(',').map(s => s.trim()) : null, + shopName: getFieldValue('shop_name'), + description: getFieldValue('description'), + }; + } + + // ======================== + // Vehicle Association + // ======================== + + /** + * Handle vehicle association based on user's vehicle count. + * Single vehicle: auto-associate. Multiple: create pending association. + */ + private async handleVehicleAssociation( + userId: string, + userName: string, + userEmail: string, + recordType: EmailRecordType, + extractedData: ExtractedReceiptData + ): Promise { + const vehicles = await this.vehiclesRepository.findByUserId(userId); + + if (vehicles.length === 1) { + // Auto-associate with the single vehicle + const vehicleId = vehicles[0].id; + + // Create in-app notification + await this.notificationsRepository.insertUserNotification({ + userId, + notificationType: 'email_ingestion', + title: 'Receipt Processed', + message: `Your emailed ${recordType === 'fuel_log' ? 'fuel' : 'maintenance'} receipt has been processed and associated with your vehicle.`, + referenceType: recordType, + vehicleId, + }); + + return { + recordType, + vehicleId, + recordId: null, // Record creation handled by later sub-issue + documentId: null, + pendingAssociationId: null, + extractedData, + }; + } + + // Multiple vehicles (or none): create pending association + const pendingAssociation = await this.repository.insertPendingAssociation({ + userId, + recordType, + extractedData, + documentId: null, + }); + + // Create in-app notification for vehicle selection + await this.notificationsRepository.insertUserNotification({ + userId, + notificationType: 'email_ingestion', + title: 'Receipt Pending Vehicle Selection', + message: `Your emailed receipt has been processed. Please select which vehicle this ${recordType === 'fuel_log' ? 'fuel' : 'maintenance'} receipt belongs to.`, + referenceType: 'pending_vehicle_association', + referenceId: pendingAssociation.id, + }); + + // Send pending vehicle email + await this.sendPendingVehicleEmail(userEmail, userName, recordType, extractedData); + + return { + recordType, + vehicleId: null, + recordId: null, + documentId: null, + pendingAssociationId: pendingAssociation.id, + extractedData, + }; + } + + // ======================== + // Error Handling & Retries + // ======================== + + private async handleProcessingError( + emailId: string, + senderEmail: string, + subject: string | null, + error: unknown + ): Promise { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.error('Email processing pipeline error', { emailId, error: errorMessage }); + + // Get current queue entry for retry count + const queueEntry = await this.repository.getQueueEntry(emailId); + const currentRetryCount = queueEntry?.retryCount || 0; + const newRetryCount = currentRetryCount + 1; + + if (newRetryCount < MAX_RETRY_COUNT) { + // Mark for retry + await this.repository.updateQueueStatus(emailId, 'failed', { + errorMessage, + retryCount: newRetryCount, + }); + + logger.info('Email queued for retry', { + emailId, + retryCount: newRetryCount, + maxRetries: MAX_RETRY_COUNT, + }); + } else { + // Max retries exceeded - permanently failed + await this.repository.updateQueueStatus(emailId, 'failed', { + errorMessage: `Max retries (${MAX_RETRY_COUNT}) exceeded. Last error: ${errorMessage}`, + retryCount: newRetryCount, + }); + + // Send failure email to user + await this.sendFailureEmail(senderEmail, subject, errorMessage).catch(emailErr => { + logger.error('Failed to send failure notification email', { + emailId, + error: emailErr instanceof Error ? emailErr.message : String(emailErr), + }); + }); + } + } + + private async handleUnregisteredSender( + emailId: string, + senderEmail: string, + subject: string | null + ): Promise { + logger.info('Unregistered sender rejected', { emailId, senderEmail }); + + await this.repository.updateQueueStatus(emailId, 'failed', { + errorMessage: 'Sender email is not registered with MotoVaultPro', + }); + + // Send rejection email + await this.sendFailureEmail( + senderEmail, + subject, + 'This email address is not registered with MotoVaultPro. Please send receipts from the email address associated with your account.' + ).catch(error => { + logger.error('Failed to send unregistered sender notification', { + emailId, + error: error instanceof Error ? error.message : String(error), + }); + }); + } + + private async handleNoValidAttachments( + emailId: string, + senderEmail: string, + _userName: string, + subject: string | null + ): Promise { + logger.info('No valid attachments found', { emailId }); + + await this.repository.updateQueueStatus(emailId, 'failed', { + errorMessage: 'No valid attachments found. Supported types: PDF, PNG, JPG, JPEG, HEIC (max 10MB each)', + }); + + await this.sendFailureEmail( + senderEmail, + subject, + 'No valid attachments were found in your email. Please attach receipt images (PNG, JPG, JPEG, HEIC) or PDFs under 10MB.' + ).catch(error => { + logger.error('Failed to send no-attachments notification', { + emailId, + error: error instanceof Error ? error.message : String(error), + }); + }); + } + + private async handleOcrFailure( + emailId: string, + senderEmail: string, + _userName: string, + subject: string | null, + reason: string + ): Promise { + logger.info('OCR extraction failed for all attachments', { emailId, reason }); + + await this.repository.updateQueueStatus(emailId, 'failed', { + errorMessage: reason, + }); + + await this.sendFailureEmail(senderEmail, subject, reason).catch(error => { + logger.error('Failed to send OCR failure notification', { + emailId, + error: error instanceof Error ? error.message : String(error), + }); + }); + } + + // ======================== + // Email Replies + // ======================== + + private async sendConfirmationEmail( + recipientEmail: string, + userName: string, + result: EmailProcessingResult + ): Promise { + try { + const template = await this.notificationsRepository.getEmailTemplateByKey('receipt_processed'); + if (!template || !template.isActive) { + logger.warn('receipt_processed template not found or inactive'); + return; + } + + const recordTypeDisplay = result.recordType === 'fuel_log' ? 'Fuel Log' : 'Maintenance Record'; + const variables = { + userName, + vehicleName: result.vehicleId ? 'your vehicle' : 'Pending Selection', + recordType: recordTypeDisplay, + receiptDate: result.extractedData.date || 'N/A', + amount: result.extractedData.total?.toFixed(2) || 'N/A', + }; + + const renderedSubject = this.templateService.render(template.subject, variables); + const renderedHtml = this.templateService.renderEmailHtml(template.body, variables); + + await this.emailService.send(recipientEmail, renderedSubject, renderedHtml); + + logger.info('Confirmation email sent', { recipientEmail }); + } catch (error) { + logger.error('Failed to send confirmation email', { + recipientEmail, + error: error instanceof Error ? error.message : String(error), + }); + } + } + + private async sendFailureEmail( + recipientEmail: string, + emailSubject: string | null, + errorReason: string + ): Promise { + try { + const template = await this.notificationsRepository.getEmailTemplateByKey('receipt_failed'); + if (!template || !template.isActive) { + logger.warn('receipt_failed template not found or inactive'); + return; + } + + const variables = { + userName: 'MotoVaultPro User', + emailSubject: emailSubject || '(no subject)', + errorReason, + }; + + const renderedSubject = this.templateService.render(template.subject, variables); + const renderedHtml = this.templateService.renderEmailHtml(template.body, variables); + + await this.emailService.send(recipientEmail, renderedSubject, renderedHtml); + + logger.info('Failure email sent', { recipientEmail }); + } catch (error) { + logger.error('Failed to send failure email', { + recipientEmail, + error: error instanceof Error ? error.message : String(error), + }); + } + } + + private async sendPendingVehicleEmail( + recipientEmail: string, + userName: string, + recordType: EmailRecordType, + extractedData: ExtractedReceiptData + ): Promise { + try { + const template = await this.notificationsRepository.getEmailTemplateByKey('receipt_pending_vehicle'); + if (!template || !template.isActive) { + logger.warn('receipt_pending_vehicle template not found or inactive'); + return; + } + + const recordTypeDisplay = recordType === 'fuel_log' ? 'Fuel Log' : 'Maintenance Record'; + const variables = { + userName, + recordType: recordTypeDisplay, + receiptDate: extractedData.date || 'N/A', + amount: extractedData.total?.toFixed(2) || 'N/A', + }; + + const renderedSubject = this.templateService.render(template.subject, variables); + const renderedHtml = this.templateService.renderEmailHtml(template.body, variables); + + await this.emailService.send(recipientEmail, renderedSubject, renderedHtml); + + logger.info('Pending vehicle email sent', { recipientEmail }); + } catch (error) { + logger.error('Failed to send pending vehicle email', { + recipientEmail, + error: error instanceof Error ? error.message : String(error), + }); + } + } +} diff --git a/backend/src/features/email-ingestion/index.ts b/backend/src/features/email-ingestion/index.ts index 64057bd..e2b1a03 100644 --- a/backend/src/features/email-ingestion/index.ts +++ b/backend/src/features/email-ingestion/index.ts @@ -1,14 +1,19 @@ /** * @ai-summary Email ingestion feature barrel export - * @ai-context Exports webhook routes for Resend inbound email processing + * @ai-context Exports webhook routes, services, and types for Resend inbound email processing */ export { emailIngestionWebhookRoutes } from './api/email-ingestion.routes'; +export { EmailIngestionService } from './domain/email-ingestion.service'; +export { EmailIngestionRepository } from './data/email-ingestion.repository'; export { ResendInboundClient } from './external/resend-inbound.client'; export type { ParsedEmailResult, ParsedEmailAttachment } from './external/resend-inbound.client'; export type { EmailIngestionQueueRecord, EmailIngestionStatus, + EmailProcessingResult, + ExtractedReceiptData, + PendingVehicleAssociation, ResendWebhookEvent, ResendWebhookEventData, } from './domain/email-ingestion.types'; diff --git a/backend/src/features/notifications/domain/notifications.types.ts b/backend/src/features/notifications/domain/notifications.types.ts index 3f419c8..55eef3f 100644 --- a/backend/src/features/notifications/domain/notifications.types.ts +++ b/backend/src/features/notifications/domain/notifications.types.ts @@ -11,7 +11,10 @@ export type TemplateKey = | 'maintenance_overdue' | 'document_expiring' | 'document_expired' - | 'subscription_tier_change'; + | 'subscription_tier_change' + | 'receipt_processed' + | 'receipt_failed' + | 'receipt_pending_vehicle'; // Email template API response type (camelCase for frontend) export interface EmailTemplate { @@ -86,7 +89,10 @@ export const TemplateKeySchema = z.enum([ 'maintenance_overdue', 'document_expiring', 'document_expired', - 'subscription_tier_change' + 'subscription_tier_change', + 'receipt_processed', + 'receipt_failed', + 'receipt_pending_vehicle' ]); export const UpdateEmailTemplateSchema = z.object({