/** * @ai-summary Core processing service for the email-to-record pipeline * @ai-context Orchestrates sender validation, OCR extraction, record classification, * vehicle association, status tracking, retry logic, and reply emails */ import { Pool } from 'pg'; import pool from '../../../core/config/database'; import { logger } from '../../../core/logging/logger'; import { EmailIngestionRepository } from '../data/email-ingestion.repository'; import { ResendInboundClient, type ParsedEmailAttachment } from '../external/resend-inbound.client'; import { UserProfileRepository } from '../../user-profile/data/user-profile.repository'; import { VehiclesRepository } from '../../vehicles/data/vehicles.repository'; import { NotificationsRepository } from '../../notifications/data/notifications.repository'; import { TemplateService } from '../../notifications/domain/template.service'; import { EmailService } from '../../notifications/domain/email.service'; import { ocrService } from '../../ocr/domain/ocr.service'; import type { ReceiptExtractionResponse } from '../../ocr/domain/ocr.types'; import type { ResendWebhookEvent, EmailProcessingResult, ExtractedReceiptData, EmailRecordType, } from './email-ingestion.types'; /** Supported attachment MIME types */ const SUPPORTED_ATTACHMENT_TYPES = new Set([ 'application/pdf', 'image/png', 'image/jpeg', 'image/heic', 'image/heif', ]); /** Image types that work with receipt-specific OCR */ const OCR_RECEIPT_IMAGE_TYPES = new Set([ 'image/jpeg', 'image/png', 'image/heic', 'image/heif', ]); const MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024; // 10MB const MAX_RETRY_COUNT = 3; export class EmailIngestionService { private repository: EmailIngestionRepository; private resendClient: ResendInboundClient; private userProfileRepository: UserProfileRepository; private vehiclesRepository: VehiclesRepository; private notificationsRepository: NotificationsRepository; private templateService: TemplateService; private emailService: EmailService; constructor(dbPool?: Pool) { const p = dbPool || pool; this.repository = new EmailIngestionRepository(p); this.resendClient = new ResendInboundClient(); this.userProfileRepository = new UserProfileRepository(p); this.vehiclesRepository = new VehiclesRepository(p); this.notificationsRepository = new NotificationsRepository(p); this.templateService = new TemplateService(); this.emailService = new EmailService(); } // ======================== // Main Processing Pipeline // ======================== /** * Process an inbound email through the full pipeline. * Called asynchronously after webhook receipt is acknowledged. */ async processEmail(emailId: string, event: ResendWebhookEvent): Promise { const senderEmail = event.data.from; const subject = event.data.subject; try { // 1. Mark as processing await this.repository.updateQueueStatus(emailId, 'processing'); // 2. Validate sender const userProfile = await this.validateSender(senderEmail); if (!userProfile) { await this.handleUnregisteredSender(emailId, senderEmail, subject); return; } const userId = userProfile.auth0Sub; const userName = userProfile.displayName || userProfile.email; // Update queue with resolved user_id await this.repository.updateQueueStatus(emailId, 'processing', { userId }); // 3. Get attachments (from webhook data or by fetching raw email) const attachments = await this.getAttachments(emailId, event); // 4. Filter valid attachments const validAttachments = this.filterAttachments(attachments); if (validAttachments.length === 0) { await this.handleNoValidAttachments(emailId, senderEmail, userName, subject); return; } // 5. Process first valid image attachment through OCR const ocrResult = await this.processAttachmentsWithOcr(userId, validAttachments); if (!ocrResult) { await this.handleOcrFailure(emailId, senderEmail, userName, subject, 'No receipt data could be extracted from attachments'); return; } // 6. Build extracted data from OCR result const extractedData = this.mapOcrToExtractedData(ocrResult.response); const recordType = ocrResult.recordType; // 7. Handle vehicle association const processingResult = await this.handleVehicleAssociation( userId, userName, senderEmail, recordType, extractedData ); // 8. Mark as completed await this.repository.updateQueueStatus(emailId, 'completed', { processingResult, }); // 9. Send confirmation email await this.sendConfirmationEmail(senderEmail, userName, processingResult); logger.info('Email processing completed successfully', { emailId, userId, recordType, vehicleId: processingResult.vehicleId, pendingAssociationId: processingResult.pendingAssociationId, }); } catch (error) { await this.handleProcessingError(emailId, senderEmail, subject, error); } } // ======================== // Sender Validation // ======================== private async validateSender(senderEmail: string): Promise<{ auth0Sub: string; email: string; displayName: string | null; } | null> { // Case-insensitive lookup by lowercasing the sender email const profile = await this.userProfileRepository.getByEmail(senderEmail.toLowerCase()); if (profile) { return { auth0Sub: profile.auth0Sub, email: profile.email, displayName: profile.displayName ?? null, }; } // Try original case as fallback if (senderEmail !== senderEmail.toLowerCase()) { const fallback = await this.userProfileRepository.getByEmail(senderEmail); if (fallback) { return { auth0Sub: fallback.auth0Sub, email: fallback.email, displayName: fallback.displayName ?? null, }; } } return null; } // ======================== // Attachment Handling // ======================== /** * Get attachments from webhook data or by fetching the raw email */ private async getAttachments( emailId: string, event: ResendWebhookEvent ): Promise { // If webhook includes attachments with content, use those if (event.data.attachments && event.data.attachments.length > 0) { return event.data.attachments.map(att => ({ filename: att.filename, contentType: att.content_type, content: Buffer.from(att.content, 'base64'), size: Buffer.from(att.content, 'base64').length, })); } // Otherwise fetch and parse the raw email try { const { downloadUrl } = await this.resendClient.getEmail(emailId); const rawEmail = await this.resendClient.downloadRawEmail(downloadUrl); const parsed = await this.resendClient.parseEmail(rawEmail); return parsed.attachments; } catch (error) { logger.warn('Failed to fetch raw email for attachments', { emailId, error: error instanceof Error ? error.message : String(error), }); return []; } } /** * Filter attachments by supported type and size */ private filterAttachments(attachments: ParsedEmailAttachment[]): ParsedEmailAttachment[] { return attachments.filter(att => { if (!SUPPORTED_ATTACHMENT_TYPES.has(att.contentType)) { logger.info('Skipping unsupported attachment type', { filename: att.filename, contentType: att.contentType, }); return false; } if (att.size > MAX_ATTACHMENT_SIZE) { logger.info('Skipping oversized attachment', { filename: att.filename, size: att.size, maxSize: MAX_ATTACHMENT_SIZE, }); return false; } return true; }); } // ======================== // OCR Processing // ======================== /** * Process attachments through OCR, trying fuel then maintenance receipt extraction. * Returns the first successful result. */ private async processAttachmentsWithOcr( userId: string, attachments: ParsedEmailAttachment[] ): Promise<{ response: ReceiptExtractionResponse; recordType: EmailRecordType } | null> { // Process only image attachments that the receipt OCR supports const imageAttachments = attachments.filter(att => OCR_RECEIPT_IMAGE_TYPES.has(att.contentType)); for (const attachment of imageAttachments) { const result = await this.classifyAndExtract(userId, attachment); if (result) return result; } return null; } /** * Try both fuel and maintenance OCR extractors, return the better result */ private async classifyAndExtract( userId: string, attachment: ParsedEmailAttachment ): Promise<{ response: ReceiptExtractionResponse; recordType: EmailRecordType } | null> { let fuelResult: ReceiptExtractionResponse | null = null; let maintenanceResult: ReceiptExtractionResponse | null = null; // Try fuel receipt extraction try { fuelResult = await ocrService.extractReceipt(userId, { fileBuffer: attachment.content, contentType: attachment.contentType, receiptType: 'fuel', }); } catch (error) { logger.info('Fuel receipt extraction failed, trying maintenance', { filename: attachment.filename, error: error instanceof Error ? error.message : String(error), }); } // Try maintenance receipt extraction try { maintenanceResult = await ocrService.extractMaintenanceReceipt(userId, { fileBuffer: attachment.content, contentType: attachment.contentType, }); } catch (error) { logger.info('Maintenance receipt extraction failed', { filename: attachment.filename, error: error instanceof Error ? error.message : String(error), }); } // Compare results and pick the best one return this.selectBestResult(fuelResult, maintenanceResult); } /** * Select the better OCR result based on extracted field count and success */ private selectBestResult( fuelResult: ReceiptExtractionResponse | null, maintenanceResult: ReceiptExtractionResponse | null ): { response: ReceiptExtractionResponse; recordType: EmailRecordType } | null { const fuelFieldCount = fuelResult?.success ? Object.keys(fuelResult.extractedFields).length : 0; const maintenanceFieldCount = maintenanceResult?.success ? Object.keys(maintenanceResult.extractedFields).length : 0; if (fuelFieldCount === 0 && maintenanceFieldCount === 0) { return null; } // Check for fuel-specific fields to improve classification const hasFuelFields = fuelResult?.extractedFields['gallons'] || fuelResult?.extractedFields['price_per_gallon'] || fuelResult?.extractedFields['fuel_type']; const hasMaintenanceFields = maintenanceResult?.extractedFields['category'] || maintenanceResult?.extractedFields['shop_name'] || maintenanceResult?.extractedFields['description']; // Prefer the result with domain-specific fields if (hasFuelFields && !hasMaintenanceFields) { return { response: fuelResult!, recordType: 'fuel_log' }; } if (hasMaintenanceFields && !hasFuelFields) { return { response: maintenanceResult!, recordType: 'maintenance_record' }; } // Fall back to field count comparison if (fuelFieldCount >= maintenanceFieldCount && fuelResult?.success) { return { response: fuelResult, recordType: 'fuel_log' }; } if (maintenanceResult?.success) { return { response: maintenanceResult, recordType: 'maintenance_record' }; } return null; } /** * Map OCR extracted fields to our ExtractedReceiptData format */ private mapOcrToExtractedData(response: ReceiptExtractionResponse): ExtractedReceiptData { const fields = response.extractedFields; const getFieldValue = (key: string): string | null => fields[key]?.value || null; const getFieldNumber = (key: string): number | null => { const val = fields[key]?.value; if (!val) return null; const num = parseFloat(val); return isNaN(num) ? null : num; }; return { vendor: getFieldValue('vendor') || getFieldValue('shop_name'), date: getFieldValue('date'), total: getFieldNumber('total'), odometerReading: getFieldNumber('odometer') || getFieldNumber('odometer_reading'), gallons: getFieldNumber('gallons'), pricePerGallon: getFieldNumber('price_per_gallon'), fuelType: getFieldValue('fuel_type'), category: getFieldValue('category'), subtypes: fields['subtypes']?.value ? fields['subtypes'].value.split(',').map(s => s.trim()) : null, shopName: getFieldValue('shop_name'), description: getFieldValue('description'), }; } // ======================== // Vehicle Association // ======================== /** * Handle vehicle association based on user's vehicle count. * Single vehicle: auto-associate. Multiple: create pending association. */ private async handleVehicleAssociation( userId: string, userName: string, userEmail: string, recordType: EmailRecordType, extractedData: ExtractedReceiptData ): Promise { const vehicles = await this.vehiclesRepository.findByUserId(userId); if (vehicles.length === 1) { // Auto-associate with the single vehicle const vehicleId = vehicles[0].id; // Create in-app notification await this.notificationsRepository.insertUserNotification({ userId, notificationType: 'email_ingestion', title: 'Receipt Processed', message: `Your emailed ${recordType === 'fuel_log' ? 'fuel' : 'maintenance'} receipt has been processed and associated with your vehicle.`, referenceType: recordType, vehicleId, }); return { recordType, vehicleId, recordId: null, // Record creation handled by later sub-issue documentId: null, pendingAssociationId: null, extractedData, }; } // Multiple vehicles (or none): create pending association const pendingAssociation = await this.repository.insertPendingAssociation({ userId, recordType, extractedData, documentId: null, }); // Create in-app notification for vehicle selection await this.notificationsRepository.insertUserNotification({ userId, notificationType: 'email_ingestion', title: 'Receipt Pending Vehicle Selection', message: `Your emailed receipt has been processed. Please select which vehicle this ${recordType === 'fuel_log' ? 'fuel' : 'maintenance'} receipt belongs to.`, referenceType: 'pending_vehicle_association', referenceId: pendingAssociation.id, }); // Send pending vehicle email await this.sendPendingVehicleEmail(userEmail, userName, recordType, extractedData); return { recordType, vehicleId: null, recordId: null, documentId: null, pendingAssociationId: pendingAssociation.id, extractedData, }; } // ======================== // Error Handling & Retries // ======================== private async handleProcessingError( emailId: string, senderEmail: string, subject: string | null, error: unknown ): Promise { const errorMessage = error instanceof Error ? error.message : String(error); logger.error('Email processing pipeline error', { emailId, error: errorMessage }); // Get current queue entry for retry count const queueEntry = await this.repository.getQueueEntry(emailId); const currentRetryCount = queueEntry?.retryCount || 0; const newRetryCount = currentRetryCount + 1; if (newRetryCount < MAX_RETRY_COUNT) { // Mark for retry await this.repository.updateQueueStatus(emailId, 'failed', { errorMessage, retryCount: newRetryCount, }); logger.info('Email queued for retry', { emailId, retryCount: newRetryCount, maxRetries: MAX_RETRY_COUNT, }); } else { // Max retries exceeded - permanently failed await this.repository.updateQueueStatus(emailId, 'failed', { errorMessage: `Max retries (${MAX_RETRY_COUNT}) exceeded. Last error: ${errorMessage}`, retryCount: newRetryCount, }); // Send failure email to user await this.sendFailureEmail(senderEmail, subject, errorMessage).catch(emailErr => { logger.error('Failed to send failure notification email', { emailId, error: emailErr instanceof Error ? emailErr.message : String(emailErr), }); }); } } private async handleUnregisteredSender( emailId: string, senderEmail: string, subject: string | null ): Promise { logger.info('Unregistered sender rejected', { emailId, senderEmail }); await this.repository.updateQueueStatus(emailId, 'failed', { errorMessage: 'Sender email is not registered with MotoVaultPro', }); // Send rejection email await this.sendFailureEmail( senderEmail, subject, 'This email address is not registered with MotoVaultPro. Please send receipts from the email address associated with your account.' ).catch(error => { logger.error('Failed to send unregistered sender notification', { emailId, error: error instanceof Error ? error.message : String(error), }); }); } private async handleNoValidAttachments( emailId: string, senderEmail: string, _userName: string, subject: string | null ): Promise { logger.info('No valid attachments found', { emailId }); await this.repository.updateQueueStatus(emailId, 'failed', { errorMessage: 'No valid attachments found. Supported types: PDF, PNG, JPG, JPEG, HEIC (max 10MB each)', }); await this.sendFailureEmail( senderEmail, subject, 'No valid attachments were found in your email. Please attach receipt images (PNG, JPG, JPEG, HEIC) or PDFs under 10MB.' ).catch(error => { logger.error('Failed to send no-attachments notification', { emailId, error: error instanceof Error ? error.message : String(error), }); }); } private async handleOcrFailure( emailId: string, senderEmail: string, _userName: string, subject: string | null, reason: string ): Promise { logger.info('OCR extraction failed for all attachments', { emailId, reason }); await this.repository.updateQueueStatus(emailId, 'failed', { errorMessage: reason, }); await this.sendFailureEmail(senderEmail, subject, reason).catch(error => { logger.error('Failed to send OCR failure notification', { emailId, error: error instanceof Error ? error.message : String(error), }); }); } // ======================== // Email Replies // ======================== private async sendConfirmationEmail( recipientEmail: string, userName: string, result: EmailProcessingResult ): Promise { try { const template = await this.notificationsRepository.getEmailTemplateByKey('receipt_processed'); if (!template || !template.isActive) { logger.warn('receipt_processed template not found or inactive'); return; } const recordTypeDisplay = result.recordType === 'fuel_log' ? 'Fuel Log' : 'Maintenance Record'; const variables = { userName, vehicleName: result.vehicleId ? 'your vehicle' : 'Pending Selection', recordType: recordTypeDisplay, receiptDate: result.extractedData.date || 'N/A', amount: result.extractedData.total?.toFixed(2) || 'N/A', }; const renderedSubject = this.templateService.render(template.subject, variables); const renderedHtml = this.templateService.renderEmailHtml(template.body, variables); await this.emailService.send(recipientEmail, renderedSubject, renderedHtml); logger.info('Confirmation email sent', { recipientEmail }); } catch (error) { logger.error('Failed to send confirmation email', { recipientEmail, error: error instanceof Error ? error.message : String(error), }); } } private async sendFailureEmail( recipientEmail: string, emailSubject: string | null, errorReason: string ): Promise { try { const template = await this.notificationsRepository.getEmailTemplateByKey('receipt_failed'); if (!template || !template.isActive) { logger.warn('receipt_failed template not found or inactive'); return; } const variables = { userName: 'MotoVaultPro User', emailSubject: emailSubject || '(no subject)', errorReason, }; const renderedSubject = this.templateService.render(template.subject, variables); const renderedHtml = this.templateService.renderEmailHtml(template.body, variables); await this.emailService.send(recipientEmail, renderedSubject, renderedHtml); logger.info('Failure email sent', { recipientEmail }); } catch (error) { logger.error('Failed to send failure email', { recipientEmail, error: error instanceof Error ? error.message : String(error), }); } } private async sendPendingVehicleEmail( recipientEmail: string, userName: string, recordType: EmailRecordType, extractedData: ExtractedReceiptData ): Promise { try { const template = await this.notificationsRepository.getEmailTemplateByKey('receipt_pending_vehicle'); if (!template || !template.isActive) { logger.warn('receipt_pending_vehicle template not found or inactive'); return; } const recordTypeDisplay = recordType === 'fuel_log' ? 'Fuel Log' : 'Maintenance Record'; const variables = { userName, recordType: recordTypeDisplay, receiptDate: extractedData.date || 'N/A', amount: extractedData.total?.toFixed(2) || 'N/A', }; const renderedSubject = this.templateService.render(template.subject, variables); const renderedHtml = this.templateService.renderEmailHtml(template.body, variables); await this.emailService.send(recipientEmail, renderedSubject, renderedHtml); logger.info('Pending vehicle email sent', { recipientEmail }); } catch (error) { logger.error('Failed to send pending vehicle email', { recipientEmail, error: error instanceof Error ? error.message : String(error), }); } } }