From 57ed04d955e002baad1a6196e6fd75bb03fdc019 Mon Sep 17 00:00:00 2001 From: Eric Gullickson <16152721+ericgullickson@users.noreply.github.com> Date: Wed, 11 Feb 2026 10:24:11 -0600 Subject: [PATCH] feat: rewrite ManualExtractor to use Gemini engine (refs #134) Replace traditional OCR pipeline (table_detector, table_parser, maintenance_patterns) with GeminiEngine for semantic PDF extraction. Map Gemini serviceName values to 27 maintenance subtypes via ServiceMapper fuzzy matching. Add 8 unit tests covering normal extraction, unusual names, empty response, and error handling. Co-Authored-By: Claude Opus 4.6 --- ocr/app/extractors/manual_extractor.py | 366 ++++--------------------- ocr/tests/test_manual_extractor.py | 272 ++++++++++++++++++ 2 files changed, 329 insertions(+), 309 deletions(-) create mode 100644 ocr/tests/test_manual_extractor.py diff --git a/ocr/app/extractors/manual_extractor.py b/ocr/app/extractors/manual_extractor.py index ad5f159..174828e 100644 --- a/ocr/app/extractors/manual_extractor.py +++ b/ocr/app/extractors/manual_extractor.py @@ -1,17 +1,11 @@ -"""Owner's manual extractor for maintenance schedule extraction.""" -import io +"""Owner's manual extractor for maintenance schedule extraction via Gemini.""" import logging import time from dataclasses import dataclass, field from typing import Callable, Optional -from PIL import Image - -from app.engines import create_engine, OcrConfig -from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo -from app.table_extraction.detector import table_detector, DetectedTable -from app.table_extraction.parser import table_parser, ParsedScheduleRow -from app.patterns.maintenance_patterns import maintenance_matcher +from app.engines.gemini_engine import GeminiEngine, GeminiEngineError +from app.patterns.service_mapping import service_mapper logger = logging.getLogger(__name__) @@ -52,30 +46,26 @@ class ManualExtractionResult: class ManualExtractor: - """Extract maintenance schedules from owner's manuals. + """Extract maintenance schedules from owner's manuals using Gemini. Processing pipeline: - 1. Analyze PDF structure - 2. Find maintenance section pages - 3. Extract text (native) or OCR (scanned) - 4. Detect tables - 5. Parse schedules - 6. Normalize and deduplicate + 1. Send entire PDF to Gemini for semantic extraction + 2. Map extracted service names to system maintenance subtypes via fuzzy matching + 3. Return structured results """ - # Maximum pages to process for performance - MAX_PAGES_TO_PROCESS = 50 + # Default confidence for Gemini-extracted items without a subtype match + DEFAULT_CONFIDENCE = 0.85 - # Minimum confidence to include schedule - MIN_CONFIDENCE = 0.5 + def __init__(self) -> None: + self._engine = GeminiEngine() def extract( self, pdf_bytes: bytes, progress_callback: Optional[Callable[[int, str], None]] = None, ) -> ManualExtractionResult: - """ - Extract maintenance schedules from an owner's manual PDF. + """Extract maintenance schedules from an owner's manual PDF. Args: pdf_bytes: Raw PDF bytes @@ -92,97 +82,69 @@ class ManualExtractor: logger.info(f"Progress {percent}%: {message}") try: - update_progress(5, "Analyzing PDF structure") + update_progress(5, "Sending PDF to Gemini for analysis") - # Get PDF info - pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes) - logger.info( - f"PDF: {pdf_info.total_pages} pages, " - f"has_text={pdf_info.has_text_layer}, " - f"is_scanned={pdf_info.is_scanned}" - ) + gemini_result = self._engine.extract_maintenance(pdf_bytes) - update_progress(10, "Finding maintenance sections") + update_progress(50, "Mapping service names to maintenance subtypes") - # Find pages likely to contain maintenance schedules - maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes) + schedules: list[ExtractedSchedule] = [] + for item in gemini_result.items: + mapping = service_mapper.map_service_fuzzy(item.service_name) - if not maintenance_pages: - # If no specific pages found, process first N pages - maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages))) - logger.info("No specific maintenance section found, processing all pages") - else: - # Include pages before and after detected maintenance pages - expanded_pages: set[int] = set() - for page in maintenance_pages: - for offset in range(-2, 5): # Include 2 before, 4 after - new_page = page + offset - if 0 <= new_page < pdf_info.total_pages: - expanded_pages.add(new_page) - maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS] - logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section") - - update_progress(15, "Extracting page content") - - # Extract content from pages - all_schedules: list[ParsedScheduleRow] = [] - all_tables: list[dict] = [] - pages_processed = 0 - - for i, page_num in enumerate(maintenance_pages): - page_progress = 15 + int((i / len(maintenance_pages)) * 60) - update_progress(page_progress, f"Processing page {page_num + 1}") - - # Extract page content - page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num) - pages_processed += 1 - - # Process based on content type - if page_content.has_text: - # Native PDF - use text directly - schedules, tables = self._process_text_page( - page_content.text_content, page_num - ) - elif page_content.image_bytes: - # Scanned PDF - OCR required - schedules, tables = self._process_scanned_page( - page_content.image_bytes, page_num - ) + if mapping: + subtypes = mapping.subtypes + confidence = mapping.confidence + service_name = mapping.normalized_name else: - continue + subtypes = [] + confidence = self.DEFAULT_CONFIDENCE + service_name = item.service_name - all_schedules.extend(schedules) - all_tables.extend(tables) + schedules.append( + ExtractedSchedule( + service=service_name, + interval_miles=item.interval_miles, + interval_months=item.interval_months, + details=item.details, + confidence=confidence, + subtypes=subtypes, + ) + ) - update_progress(75, "Normalizing results") - - # Deduplicate and normalize schedules - normalized_schedules = self._normalize_schedules(all_schedules) - - update_progress(85, "Extracting vehicle information") - - # Try to extract vehicle info from first few pages - vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info) - - update_progress(95, "Finalizing results") + update_progress(90, "Finalizing results") processing_time_ms = int((time.time() - start_time) * 1000) logger.info( - f"Extraction complete: {len(normalized_schedules)} schedules from " - f"{pages_processed} pages in {processing_time_ms}ms" + f"Extraction complete: {len(schedules)} schedules in {processing_time_ms}ms" ) update_progress(100, "Complete") return ManualExtractionResult( success=True, - vehicle_info=vehicle_info, - maintenance_schedules=normalized_schedules, - raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables], + vehicle_info=None, + maintenance_schedules=schedules, + raw_tables=[], processing_time_ms=processing_time_ms, - total_pages=pdf_info.total_pages, - pages_processed=pages_processed, + total_pages=0, + pages_processed=0, + ) + + except GeminiEngineError as e: + logger.error(f"Gemini extraction failed: {e}", exc_info=True) + processing_time_ms = int((time.time() - start_time) * 1000) + + return ManualExtractionResult( + success=False, + vehicle_info=None, + maintenance_schedules=[], + raw_tables=[], + processing_time_ms=processing_time_ms, + total_pages=0, + pages_processed=0, + error=str(e), ) except Exception as e: @@ -200,220 +162,6 @@ class ManualExtractor: error=str(e), ) - def _process_text_page( - self, text: str, page_number: int - ) -> tuple[list[ParsedScheduleRow], list[dict]]: - """Process a native PDF page with text.""" - schedules: list[ParsedScheduleRow] = [] - tables: list[dict] = [] - - # Detect tables in text - detected_tables = table_detector.detect_tables_in_text(text, page_number) - - for table in detected_tables: - if table.is_maintenance_table and table.header_row: - # Parse table - parsed = table_parser.parse_table( - table.header_row, - table.raw_content, - ) - schedules.extend(parsed) - - tables.append({ - "page": page_number, - "rows": len(table.raw_content), - "is_maintenance": True, - }) - - # Also try to extract from unstructured text - text_schedules = table_parser.parse_text_block(text) - schedules.extend(text_schedules) - - return schedules, tables - - def _process_scanned_page( - self, image_bytes: bytes, page_number: int - ) -> tuple[list[ParsedScheduleRow], list[dict]]: - """Process a scanned PDF page with OCR.""" - schedules: list[ParsedScheduleRow] = [] - tables: list[dict] = [] - - # Detect tables in image - detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number) - - # OCR the full page - try: - engine = create_engine() - ocr_result = engine.recognize(image_bytes, OcrConfig()) - ocr_text = ocr_result.text - - # Mark tables as maintenance if page contains maintenance keywords - for table in detected_tables: - table.is_maintenance_table = table_detector.is_maintenance_table( - table, ocr_text - ) - - # Try to extract from OCR text - text_tables = table_detector.detect_tables_in_text(ocr_text, page_number) - - for table in text_tables: - if table.is_maintenance_table and table.header_row: - parsed = table_parser.parse_table( - table.header_row, - table.raw_content, - ) - schedules.extend(parsed) - - tables.append({ - "page": page_number, - "rows": len(table.raw_content), - "is_maintenance": True, - }) - - # Also try unstructured text - text_schedules = table_parser.parse_text_block(ocr_text) - schedules.extend(text_schedules) - - except Exception as e: - logger.warning(f"OCR failed for page {page_number}: {e}") - - return schedules, tables - - def _normalize_schedules( - self, schedules: list[ParsedScheduleRow] - ) -> list[ExtractedSchedule]: - """Normalize and deduplicate extracted schedules.""" - # Group by normalized service name - by_service: dict[str, list[ParsedScheduleRow]] = {} - - for schedule in schedules: - if schedule.confidence < self.MIN_CONFIDENCE: - continue - - key = schedule.normalized_service or schedule.service.lower() - if key not in by_service: - by_service[key] = [] - by_service[key].append(schedule) - - # Merge duplicates, keeping highest confidence - results: list[ExtractedSchedule] = [] - - for service_key, items in by_service.items(): - # Sort by confidence - items.sort(key=lambda x: x.confidence, reverse=True) - best = items[0] - - # Merge interval info from other items if missing - miles = best.interval_miles - months = best.interval_months - details = best.details - fluid_spec = best.fluid_spec - - for item in items[1:]: - if not miles and item.interval_miles: - miles = item.interval_miles - if not months and item.interval_months: - months = item.interval_months - if not details and item.details: - details = item.details - if not fluid_spec and item.fluid_spec: - fluid_spec = item.fluid_spec - - # Build details string - detail_parts = [] - if details: - detail_parts.append(details) - if fluid_spec: - detail_parts.append(f"Use {fluid_spec}") - - results.append( - ExtractedSchedule( - service=best.normalized_service or best.service, - interval_miles=miles, - interval_months=months, - details=" - ".join(detail_parts) if detail_parts else None, - confidence=best.confidence, - subtypes=best.subtypes, - ) - ) - - # Sort by confidence - results.sort(key=lambda x: x.confidence, reverse=True) - - return results - - def _extract_vehicle_info( - self, pdf_bytes: bytes, pdf_info: PdfInfo - ) -> Optional[VehicleInfo]: - """Extract vehicle make/model/year from manual.""" - # Check metadata first - if pdf_info.title: - info = self._parse_vehicle_from_title(pdf_info.title) - if info: - return info - - # Try first page - try: - first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0) - text = first_page.text_content - - if not text and first_page.image_bytes: - # OCR first page - engine = create_engine() - ocr_result = engine.recognize(first_page.image_bytes, OcrConfig()) - text = ocr_result.text - - if text: - return self._parse_vehicle_from_text(text) - - except Exception as e: - logger.warning(f"Failed to extract vehicle info: {e}") - - return None - - def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]: - """Parse vehicle info from document title.""" - import re - - # Common patterns: "2024 Honda Civic Owner's Manual" - year_match = re.search(r"(20\d{2}|19\d{2})", title) - year = int(year_match.group(1)) if year_match else None - - # Common makes - makes = [ - "Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac", - "Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford", - "Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar", - "Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln", - "Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi", - "Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla", - "Toyota", "Volkswagen", "Volvo", - ] - - make = None - model = None - - for m in makes: - if m.lower() in title.lower(): - make = m - # Try to find model after make - idx = title.lower().find(m.lower()) - after = title[idx + len(m):].strip() - # First word after make is likely model - model_match = re.match(r"^(\w+)", after) - if model_match: - model = model_match.group(1) - break - - if year or make: - return VehicleInfo(make=make, model=model, year=year) - - return None - - def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]: - """Parse vehicle info from page text.""" - return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars - # Singleton instance manual_extractor = ManualExtractor() diff --git a/ocr/tests/test_manual_extractor.py b/ocr/tests/test_manual_extractor.py new file mode 100644 index 0000000..38481b2 --- /dev/null +++ b/ocr/tests/test_manual_extractor.py @@ -0,0 +1,272 @@ +"""Tests for ManualExtractor Gemini-based maintenance schedule extraction. + +Covers: normal extraction with subtype mapping, unusual service names, +empty Gemini response, and Gemini call failure. +All GeminiEngine calls are mocked. +""" +from unittest.mock import MagicMock, patch + +import pytest + +from app.engines.gemini_engine import ( + GeminiProcessingError, + MaintenanceExtractionResult, + MaintenanceItem, +) +from app.extractors.manual_extractor import ( + ExtractedSchedule, + ManualExtractionResult, + ManualExtractor, +) + + +# --- Helpers --- + + +def _make_pdf_bytes(size: int = 1024) -> bytes: + """Create fake PDF bytes of a given size.""" + header = b"%PDF-1.4 fake" + return header + b"\x00" * max(0, size - len(header)) + + +def _make_gemini_result(items: list[MaintenanceItem]) -> MaintenanceExtractionResult: + """Create a mock Gemini extraction result.""" + return MaintenanceExtractionResult(items=items, model="gemini-2.5-flash") + + +# --- Successful extraction --- + + +class TestNormalExtraction: + """Verify normal PDF extraction returns mapped schedules with subtypes.""" + + def test_pdf_with_maintenance_schedule_returns_mapped_items(self): + """Normal: PDF with maintenance schedule returns extracted items with subtypes.""" + items = [ + MaintenanceItem( + service_name="Engine Oil Change", + interval_miles=5000, + interval_months=6, + details="Use 0W-20 full synthetic oil", + ), + MaintenanceItem( + service_name="Tire Rotation", + interval_miles=5000, + interval_months=6, + details=None, + ), + MaintenanceItem( + service_name="Cabin Filter", + interval_miles=15000, + interval_months=12, + details=None, + ), + ] + + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.return_value = _make_gemini_result(items) + + result = extractor.extract(_make_pdf_bytes()) + + assert result.success is True + assert result.error is None + assert len(result.maintenance_schedules) == 3 + + # Oil change should map to Engine Oil subtype + oil = result.maintenance_schedules[0] + assert oil.service == "Engine Oil Change" + assert oil.interval_miles == 5000 + assert oil.interval_months == 6 + assert oil.details == "Use 0W-20 full synthetic oil" + assert "Engine Oil" in oil.subtypes + assert oil.confidence > 0.0 + + # Tire rotation should map to Tires subtype + tire = result.maintenance_schedules[1] + assert tire.service == "Tire Rotation" + assert "Tires" in tire.subtypes + + # Cabin filter should map to Cabin Air Filter / Purifier + cabin = result.maintenance_schedules[2] + assert "Cabin Air Filter / Purifier" in cabin.subtypes + + def test_progress_callbacks_fire_at_intervals(self): + """Progress callbacks fire at appropriate intervals during processing.""" + items = [ + MaintenanceItem(service_name="Oil Change", interval_miles=5000), + ] + + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.return_value = _make_gemini_result(items) + + progress_calls: list[tuple[int, str]] = [] + + def track_progress(percent: int, message: str) -> None: + progress_calls.append((percent, message)) + + extractor.extract(_make_pdf_bytes(), progress_callback=track_progress) + + # Should have progress calls at 5, 50, 90, 100 + percents = [p for p, _ in progress_calls] + assert 5 in percents + assert 50 in percents + assert 90 in percents + assert 100 in percents + # Percents should be non-decreasing + assert percents == sorted(percents) + + +# --- Unusual service names --- + + +class TestUnusualServiceNames: + """Verify that unusual service names still map to closest subtype.""" + + def test_unusual_names_fuzzy_match_to_subtypes(self): + """Edge: PDF with unusual service names still maps to closest subtype.""" + items = [ + MaintenanceItem( + service_name="Replace engine air cleaner element", + interval_miles=30000, + ), + MaintenanceItem( + service_name="Inspect drive belt for cracks", + interval_miles=60000, + ), + ] + + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.return_value = _make_gemini_result(items) + + result = extractor.extract(_make_pdf_bytes()) + + assert result.success is True + assert len(result.maintenance_schedules) == 2 + + # "air cleaner element" should fuzzy match to Air Filter Element + air_filter = result.maintenance_schedules[0] + assert "Air Filter Element" in air_filter.subtypes + + # "drive belt" should match to Drive Belt + belt = result.maintenance_schedules[1] + assert "Drive Belt" in belt.subtypes + + def test_unmapped_service_uses_gemini_name_directly(self): + """Edge: Service name with no match uses Gemini name and default confidence.""" + items = [ + MaintenanceItem( + service_name="Recalibrate Quantum Flux Capacitor", + interval_miles=100000, + ), + ] + + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.return_value = _make_gemini_result(items) + + result = extractor.extract(_make_pdf_bytes()) + + assert result.success is True + assert len(result.maintenance_schedules) == 1 + + item = result.maintenance_schedules[0] + assert item.service == "Recalibrate Quantum Flux Capacitor" + assert item.subtypes == [] + assert item.confidence == ManualExtractor.DEFAULT_CONFIDENCE + + +# --- Empty response --- + + +class TestEmptyResponse: + """Verify handling of empty Gemini responses.""" + + def test_empty_gemini_response_returns_empty_schedules(self): + """Edge: Empty Gemini response returns empty schedules list.""" + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.return_value = _make_gemini_result([]) + + result = extractor.extract(_make_pdf_bytes()) + + assert result.success is True + assert result.maintenance_schedules == [] + assert result.error is None + assert result.processing_time_ms >= 0 + + +# --- Error handling --- + + +class TestErrorHandling: + """Verify error handling when Gemini calls fail.""" + + def test_gemini_failure_returns_error_result(self): + """Error: Gemini call failure returns ManualExtractionResult with error.""" + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.side_effect = GeminiProcessingError( + "Gemini maintenance extraction failed: API quota exceeded" + ) + + result = extractor.extract(_make_pdf_bytes()) + + assert result.success is False + assert result.maintenance_schedules == [] + assert result.error is not None + assert "quota exceeded" in result.error.lower() + + def test_unexpected_exception_returns_error_result(self): + """Error: Unexpected exception is caught and returned as error.""" + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.side_effect = RuntimeError( + "Unexpected failure" + ) + + result = extractor.extract(_make_pdf_bytes()) + + assert result.success is False + assert result.error is not None + assert "Unexpected failure" in result.error + + +# --- Job queue integration --- + + +class TestJobQueueIntegration: + """Verify the extractor works within the existing job queue flow.""" + + def test_extract_returns_all_required_fields(self): + """The result contains all fields needed by process_manual_job in extract.py.""" + items = [ + MaintenanceItem(service_name="Oil Change", interval_miles=5000), + ] + + extractor = ManualExtractor() + extractor._engine = MagicMock() + extractor._engine.extract_maintenance.return_value = _make_gemini_result(items) + + result = extractor.extract(_make_pdf_bytes()) + + # All fields used by process_manual_job must be present + assert hasattr(result, "success") + assert hasattr(result, "vehicle_info") + assert hasattr(result, "maintenance_schedules") + assert hasattr(result, "raw_tables") + assert hasattr(result, "processing_time_ms") + assert hasattr(result, "total_pages") + assert hasattr(result, "pages_processed") + assert hasattr(result, "error") + + # Schedules have required fields + schedule = result.maintenance_schedules[0] + assert hasattr(schedule, "service") + assert hasattr(schedule, "interval_miles") + assert hasattr(schedule, "interval_months") + assert hasattr(schedule, "details") + assert hasattr(schedule, "confidence") + assert hasattr(schedule, "subtypes")