"""Owner's manual extractor for maintenance schedule extraction via Gemini.""" import logging import time from dataclasses import dataclass, field from typing import Callable, Optional from app.engines.gemini_engine import GeminiEngine, GeminiEngineError from app.patterns.service_mapping import service_mapper logger = logging.getLogger(__name__) @dataclass class ExtractedSchedule: """A single extracted maintenance schedule.""" service: str interval_miles: Optional[int] interval_months: Optional[int] details: Optional[str] confidence: float subtypes: list[str] = field(default_factory=list) @dataclass class VehicleInfo: """Vehicle information extracted from manual.""" make: Optional[str] model: Optional[str] year: Optional[int] @dataclass class ManualExtractionResult: """Complete result of manual extraction.""" success: bool vehicle_info: Optional[VehicleInfo] maintenance_schedules: list[ExtractedSchedule] raw_tables: list[dict] processing_time_ms: int total_pages: int pages_processed: int error: Optional[str] = None class ManualExtractor: """Extract maintenance schedules from owner's manuals using Gemini. Processing pipeline: 1. Send entire PDF to Gemini for semantic extraction 2. Map extracted service names to system maintenance subtypes via fuzzy matching 3. Return structured results """ # Default confidence for Gemini-extracted items without a subtype match DEFAULT_CONFIDENCE = 0.85 def __init__(self) -> None: self._engine = GeminiEngine() def extract( self, pdf_bytes: bytes, progress_callback: Optional[Callable[[int, str], None]] = None, ) -> ManualExtractionResult: """Extract maintenance schedules from an owner's manual PDF. Args: pdf_bytes: Raw PDF bytes progress_callback: Optional callback for progress updates (percent, message) Returns: ManualExtractionResult with extracted data """ start_time = time.time() def update_progress(percent: int, message: str) -> None: if progress_callback: progress_callback(percent, message) logger.info(f"Progress {percent}%: {message}") try: update_progress(10, "Preparing extraction") update_progress(50, "Processing with Gemini") gemini_result = self._engine.extract_maintenance(pdf_bytes) update_progress(95, "Mapping results") schedules: list[ExtractedSchedule] = [] for item in gemini_result.items: mapping = service_mapper.map_service_fuzzy(item.service_name) if mapping: subtypes = mapping.subtypes confidence = mapping.confidence service_name = mapping.normalized_name else: subtypes = [] confidence = self.DEFAULT_CONFIDENCE service_name = item.service_name schedules.append( ExtractedSchedule( service=service_name, interval_miles=item.interval_miles, interval_months=item.interval_months, details=item.details, confidence=confidence, subtypes=subtypes, ) ) processing_time_ms = int((time.time() - start_time) * 1000) logger.info( f"Extraction complete: {len(schedules)} schedules in {processing_time_ms}ms" ) # Note: do NOT send 100% progress here. The caller sets status=COMPLETED # after this returns. Because this runs in a thread executor and the # progress callback uses run_coroutine_threadsafe (fire-and-forget), # a 100% update here races with complete_manual_job() and can overwrite # COMPLETED back to PROCESSING. return ManualExtractionResult( success=True, vehicle_info=None, maintenance_schedules=schedules, raw_tables=[], processing_time_ms=processing_time_ms, total_pages=0, pages_processed=0, ) except GeminiEngineError as e: logger.error(f"Gemini extraction failed: {e}", exc_info=True) processing_time_ms = int((time.time() - start_time) * 1000) return ManualExtractionResult( success=False, vehicle_info=None, maintenance_schedules=[], raw_tables=[], processing_time_ms=processing_time_ms, total_pages=0, pages_processed=0, error=str(e), ) except Exception as e: logger.error(f"Manual extraction failed: {e}", exc_info=True) processing_time_ms = int((time.time() - start_time) * 1000) return ManualExtractionResult( success=False, vehicle_info=None, maintenance_schedules=[], raw_tables=[], processing_time_ms=processing_time_ms, total_pages=0, pages_processed=0, error=str(e), ) # Singleton instance manual_extractor = ManualExtractor()