"""Owner's manual extractor for maintenance schedule extraction.""" import io import logging import time from dataclasses import dataclass, field from typing import Callable, Optional from PIL import Image from app.engines import create_engine, OcrConfig from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo from app.table_extraction.detector import table_detector, DetectedTable from app.table_extraction.parser import table_parser, ParsedScheduleRow from app.patterns.maintenance_patterns import maintenance_matcher logger = logging.getLogger(__name__) @dataclass class ExtractedSchedule: """A single extracted maintenance schedule.""" service: str interval_miles: Optional[int] interval_months: Optional[int] details: Optional[str] confidence: float subtypes: list[str] = field(default_factory=list) @dataclass class VehicleInfo: """Vehicle information extracted from manual.""" make: Optional[str] model: Optional[str] year: Optional[int] @dataclass class ManualExtractionResult: """Complete result of manual extraction.""" success: bool vehicle_info: Optional[VehicleInfo] maintenance_schedules: list[ExtractedSchedule] raw_tables: list[dict] processing_time_ms: int total_pages: int pages_processed: int error: Optional[str] = None class ManualExtractor: """Extract maintenance schedules from owner's manuals. Processing pipeline: 1. Analyze PDF structure 2. Find maintenance section pages 3. Extract text (native) or OCR (scanned) 4. Detect tables 5. Parse schedules 6. Normalize and deduplicate """ # Maximum pages to process for performance MAX_PAGES_TO_PROCESS = 50 # Minimum confidence to include schedule MIN_CONFIDENCE = 0.5 def extract( self, pdf_bytes: bytes, progress_callback: Optional[Callable[[int, str], None]] = None, ) -> ManualExtractionResult: """ Extract maintenance schedules from an owner's manual PDF. Args: pdf_bytes: Raw PDF bytes progress_callback: Optional callback for progress updates (percent, message) Returns: ManualExtractionResult with extracted data """ start_time = time.time() def update_progress(percent: int, message: str) -> None: if progress_callback: progress_callback(percent, message) logger.info(f"Progress {percent}%: {message}") try: update_progress(5, "Analyzing PDF structure") # Get PDF info pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes) logger.info( f"PDF: {pdf_info.total_pages} pages, " f"has_text={pdf_info.has_text_layer}, " f"is_scanned={pdf_info.is_scanned}" ) update_progress(10, "Finding maintenance sections") # Find pages likely to contain maintenance schedules maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes) if not maintenance_pages: # If no specific pages found, process first N pages maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages))) logger.info("No specific maintenance section found, processing all pages") else: # Include pages before and after detected maintenance pages expanded_pages: set[int] = set() for page in maintenance_pages: for offset in range(-2, 5): # Include 2 before, 4 after new_page = page + offset if 0 <= new_page < pdf_info.total_pages: expanded_pages.add(new_page) maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS] logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section") update_progress(15, "Extracting page content") # Extract content from pages all_schedules: list[ParsedScheduleRow] = [] all_tables: list[dict] = [] pages_processed = 0 for i, page_num in enumerate(maintenance_pages): page_progress = 15 + int((i / len(maintenance_pages)) * 60) update_progress(page_progress, f"Processing page {page_num + 1}") # Extract page content page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num) pages_processed += 1 # Process based on content type if page_content.has_text: # Native PDF - use text directly schedules, tables = self._process_text_page( page_content.text_content, page_num ) elif page_content.image_bytes: # Scanned PDF - OCR required schedules, tables = self._process_scanned_page( page_content.image_bytes, page_num ) else: continue all_schedules.extend(schedules) all_tables.extend(tables) update_progress(75, "Normalizing results") # Deduplicate and normalize schedules normalized_schedules = self._normalize_schedules(all_schedules) update_progress(85, "Extracting vehicle information") # Try to extract vehicle info from first few pages vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info) update_progress(95, "Finalizing results") processing_time_ms = int((time.time() - start_time) * 1000) logger.info( f"Extraction complete: {len(normalized_schedules)} schedules from " f"{pages_processed} pages in {processing_time_ms}ms" ) update_progress(100, "Complete") return ManualExtractionResult( success=True, vehicle_info=vehicle_info, maintenance_schedules=normalized_schedules, raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables], processing_time_ms=processing_time_ms, total_pages=pdf_info.total_pages, pages_processed=pages_processed, ) except Exception as e: logger.error(f"Manual extraction failed: {e}", exc_info=True) processing_time_ms = int((time.time() - start_time) * 1000) return ManualExtractionResult( success=False, vehicle_info=None, maintenance_schedules=[], raw_tables=[], processing_time_ms=processing_time_ms, total_pages=0, pages_processed=0, error=str(e), ) def _process_text_page( self, text: str, page_number: int ) -> tuple[list[ParsedScheduleRow], list[dict]]: """Process a native PDF page with text.""" schedules: list[ParsedScheduleRow] = [] tables: list[dict] = [] # Detect tables in text detected_tables = table_detector.detect_tables_in_text(text, page_number) for table in detected_tables: if table.is_maintenance_table and table.header_row: # Parse table parsed = table_parser.parse_table( table.header_row, table.raw_content, ) schedules.extend(parsed) tables.append({ "page": page_number, "rows": len(table.raw_content), "is_maintenance": True, }) # Also try to extract from unstructured text text_schedules = table_parser.parse_text_block(text) schedules.extend(text_schedules) return schedules, tables def _process_scanned_page( self, image_bytes: bytes, page_number: int ) -> tuple[list[ParsedScheduleRow], list[dict]]: """Process a scanned PDF page with OCR.""" schedules: list[ParsedScheduleRow] = [] tables: list[dict] = [] # Detect tables in image detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number) # OCR the full page try: engine = create_engine() ocr_result = engine.recognize(image_bytes, OcrConfig()) ocr_text = ocr_result.text # Mark tables as maintenance if page contains maintenance keywords for table in detected_tables: table.is_maintenance_table = table_detector.is_maintenance_table( table, ocr_text ) # Try to extract from OCR text text_tables = table_detector.detect_tables_in_text(ocr_text, page_number) for table in text_tables: if table.is_maintenance_table and table.header_row: parsed = table_parser.parse_table( table.header_row, table.raw_content, ) schedules.extend(parsed) tables.append({ "page": page_number, "rows": len(table.raw_content), "is_maintenance": True, }) # Also try unstructured text text_schedules = table_parser.parse_text_block(ocr_text) schedules.extend(text_schedules) except Exception as e: logger.warning(f"OCR failed for page {page_number}: {e}") return schedules, tables def _normalize_schedules( self, schedules: list[ParsedScheduleRow] ) -> list[ExtractedSchedule]: """Normalize and deduplicate extracted schedules.""" # Group by normalized service name by_service: dict[str, list[ParsedScheduleRow]] = {} for schedule in schedules: if schedule.confidence < self.MIN_CONFIDENCE: continue key = schedule.normalized_service or schedule.service.lower() if key not in by_service: by_service[key] = [] by_service[key].append(schedule) # Merge duplicates, keeping highest confidence results: list[ExtractedSchedule] = [] for service_key, items in by_service.items(): # Sort by confidence items.sort(key=lambda x: x.confidence, reverse=True) best = items[0] # Merge interval info from other items if missing miles = best.interval_miles months = best.interval_months details = best.details fluid_spec = best.fluid_spec for item in items[1:]: if not miles and item.interval_miles: miles = item.interval_miles if not months and item.interval_months: months = item.interval_months if not details and item.details: details = item.details if not fluid_spec and item.fluid_spec: fluid_spec = item.fluid_spec # Build details string detail_parts = [] if details: detail_parts.append(details) if fluid_spec: detail_parts.append(f"Use {fluid_spec}") results.append( ExtractedSchedule( service=best.normalized_service or best.service, interval_miles=miles, interval_months=months, details=" - ".join(detail_parts) if detail_parts else None, confidence=best.confidence, subtypes=best.subtypes, ) ) # Sort by confidence results.sort(key=lambda x: x.confidence, reverse=True) return results def _extract_vehicle_info( self, pdf_bytes: bytes, pdf_info: PdfInfo ) -> Optional[VehicleInfo]: """Extract vehicle make/model/year from manual.""" # Check metadata first if pdf_info.title: info = self._parse_vehicle_from_title(pdf_info.title) if info: return info # Try first page try: first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0) text = first_page.text_content if not text and first_page.image_bytes: # OCR first page engine = create_engine() ocr_result = engine.recognize(first_page.image_bytes, OcrConfig()) text = ocr_result.text if text: return self._parse_vehicle_from_text(text) except Exception as e: logger.warning(f"Failed to extract vehicle info: {e}") return None def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]: """Parse vehicle info from document title.""" import re # Common patterns: "2024 Honda Civic Owner's Manual" year_match = re.search(r"(20\d{2}|19\d{2})", title) year = int(year_match.group(1)) if year_match else None # Common makes makes = [ "Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac", "Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford", "Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar", "Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln", "Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi", "Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla", "Toyota", "Volkswagen", "Volvo", ] make = None model = None for m in makes: if m.lower() in title.lower(): make = m # Try to find model after make idx = title.lower().find(m.lower()) after = title[idx + len(m):].strip() # First word after make is likely model model_match = re.match(r"^(\w+)", after) if model_match: model = model_match.group(1) break if year or make: return VehicleInfo(make=make, model=model, year=year) return None def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]: """Parse vehicle info from page text.""" return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars # Singleton instance manual_extractor = ManualExtractor()