feat: rewrite ManualExtractor to use Gemini engine (refs #134)

Replace traditional OCR pipeline (table_detector, table_parser,
maintenance_patterns) with GeminiEngine for semantic PDF extraction.
Map Gemini serviceName values to 27 maintenance subtypes via
ServiceMapper fuzzy matching. Add 8 unit tests covering normal
extraction, unusual names, empty response, and error handling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Eric Gullickson
2026-02-11 10:24:11 -06:00
parent 3705e63fde
commit 57ed04d955
2 changed files with 329 additions and 309 deletions

View File

@@ -1,17 +1,11 @@
"""Owner's manual extractor for maintenance schedule extraction."""
import io
"""Owner's manual extractor for maintenance schedule extraction via Gemini."""
import logging
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
from PIL import Image
from app.engines import create_engine, OcrConfig
from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo
from app.table_extraction.detector import table_detector, DetectedTable
from app.table_extraction.parser import table_parser, ParsedScheduleRow
from app.patterns.maintenance_patterns import maintenance_matcher
from app.engines.gemini_engine import GeminiEngine, GeminiEngineError
from app.patterns.service_mapping import service_mapper
logger = logging.getLogger(__name__)
@@ -52,30 +46,26 @@ class ManualExtractionResult:
class ManualExtractor:
"""Extract maintenance schedules from owner's manuals.
"""Extract maintenance schedules from owner's manuals using Gemini.
Processing pipeline:
1. Analyze PDF structure
2. Find maintenance section pages
3. Extract text (native) or OCR (scanned)
4. Detect tables
5. Parse schedules
6. Normalize and deduplicate
1. Send entire PDF to Gemini for semantic extraction
2. Map extracted service names to system maintenance subtypes via fuzzy matching
3. Return structured results
"""
# Maximum pages to process for performance
MAX_PAGES_TO_PROCESS = 50
# Default confidence for Gemini-extracted items without a subtype match
DEFAULT_CONFIDENCE = 0.85
# Minimum confidence to include schedule
MIN_CONFIDENCE = 0.5
def __init__(self) -> None:
self._engine = GeminiEngine()
def extract(
self,
pdf_bytes: bytes,
progress_callback: Optional[Callable[[int, str], None]] = None,
) -> ManualExtractionResult:
"""
Extract maintenance schedules from an owner's manual PDF.
"""Extract maintenance schedules from an owner's manual PDF.
Args:
pdf_bytes: Raw PDF bytes
@@ -92,97 +82,69 @@ class ManualExtractor:
logger.info(f"Progress {percent}%: {message}")
try:
update_progress(5, "Analyzing PDF structure")
update_progress(5, "Sending PDF to Gemini for analysis")
# Get PDF info
pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes)
logger.info(
f"PDF: {pdf_info.total_pages} pages, "
f"has_text={pdf_info.has_text_layer}, "
f"is_scanned={pdf_info.is_scanned}"
)
gemini_result = self._engine.extract_maintenance(pdf_bytes)
update_progress(10, "Finding maintenance sections")
update_progress(50, "Mapping service names to maintenance subtypes")
# Find pages likely to contain maintenance schedules
maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes)
schedules: list[ExtractedSchedule] = []
for item in gemini_result.items:
mapping = service_mapper.map_service_fuzzy(item.service_name)
if not maintenance_pages:
# If no specific pages found, process first N pages
maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages)))
logger.info("No specific maintenance section found, processing all pages")
else:
# Include pages before and after detected maintenance pages
expanded_pages: set[int] = set()
for page in maintenance_pages:
for offset in range(-2, 5): # Include 2 before, 4 after
new_page = page + offset
if 0 <= new_page < pdf_info.total_pages:
expanded_pages.add(new_page)
maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS]
logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section")
update_progress(15, "Extracting page content")
# Extract content from pages
all_schedules: list[ParsedScheduleRow] = []
all_tables: list[dict] = []
pages_processed = 0
for i, page_num in enumerate(maintenance_pages):
page_progress = 15 + int((i / len(maintenance_pages)) * 60)
update_progress(page_progress, f"Processing page {page_num + 1}")
# Extract page content
page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num)
pages_processed += 1
# Process based on content type
if page_content.has_text:
# Native PDF - use text directly
schedules, tables = self._process_text_page(
page_content.text_content, page_num
)
elif page_content.image_bytes:
# Scanned PDF - OCR required
schedules, tables = self._process_scanned_page(
page_content.image_bytes, page_num
)
if mapping:
subtypes = mapping.subtypes
confidence = mapping.confidence
service_name = mapping.normalized_name
else:
continue
subtypes = []
confidence = self.DEFAULT_CONFIDENCE
service_name = item.service_name
all_schedules.extend(schedules)
all_tables.extend(tables)
schedules.append(
ExtractedSchedule(
service=service_name,
interval_miles=item.interval_miles,
interval_months=item.interval_months,
details=item.details,
confidence=confidence,
subtypes=subtypes,
)
)
update_progress(75, "Normalizing results")
# Deduplicate and normalize schedules
normalized_schedules = self._normalize_schedules(all_schedules)
update_progress(85, "Extracting vehicle information")
# Try to extract vehicle info from first few pages
vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info)
update_progress(95, "Finalizing results")
update_progress(90, "Finalizing results")
processing_time_ms = int((time.time() - start_time) * 1000)
logger.info(
f"Extraction complete: {len(normalized_schedules)} schedules from "
f"{pages_processed} pages in {processing_time_ms}ms"
f"Extraction complete: {len(schedules)} schedules in {processing_time_ms}ms"
)
update_progress(100, "Complete")
return ManualExtractionResult(
success=True,
vehicle_info=vehicle_info,
maintenance_schedules=normalized_schedules,
raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables],
vehicle_info=None,
maintenance_schedules=schedules,
raw_tables=[],
processing_time_ms=processing_time_ms,
total_pages=pdf_info.total_pages,
pages_processed=pages_processed,
total_pages=0,
pages_processed=0,
)
except GeminiEngineError as e:
logger.error(f"Gemini extraction failed: {e}", exc_info=True)
processing_time_ms = int((time.time() - start_time) * 1000)
return ManualExtractionResult(
success=False,
vehicle_info=None,
maintenance_schedules=[],
raw_tables=[],
processing_time_ms=processing_time_ms,
total_pages=0,
pages_processed=0,
error=str(e),
)
except Exception as e:
@@ -200,220 +162,6 @@ class ManualExtractor:
error=str(e),
)
def _process_text_page(
self, text: str, page_number: int
) -> tuple[list[ParsedScheduleRow], list[dict]]:
"""Process a native PDF page with text."""
schedules: list[ParsedScheduleRow] = []
tables: list[dict] = []
# Detect tables in text
detected_tables = table_detector.detect_tables_in_text(text, page_number)
for table in detected_tables:
if table.is_maintenance_table and table.header_row:
# Parse table
parsed = table_parser.parse_table(
table.header_row,
table.raw_content,
)
schedules.extend(parsed)
tables.append({
"page": page_number,
"rows": len(table.raw_content),
"is_maintenance": True,
})
# Also try to extract from unstructured text
text_schedules = table_parser.parse_text_block(text)
schedules.extend(text_schedules)
return schedules, tables
def _process_scanned_page(
self, image_bytes: bytes, page_number: int
) -> tuple[list[ParsedScheduleRow], list[dict]]:
"""Process a scanned PDF page with OCR."""
schedules: list[ParsedScheduleRow] = []
tables: list[dict] = []
# Detect tables in image
detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number)
# OCR the full page
try:
engine = create_engine()
ocr_result = engine.recognize(image_bytes, OcrConfig())
ocr_text = ocr_result.text
# Mark tables as maintenance if page contains maintenance keywords
for table in detected_tables:
table.is_maintenance_table = table_detector.is_maintenance_table(
table, ocr_text
)
# Try to extract from OCR text
text_tables = table_detector.detect_tables_in_text(ocr_text, page_number)
for table in text_tables:
if table.is_maintenance_table and table.header_row:
parsed = table_parser.parse_table(
table.header_row,
table.raw_content,
)
schedules.extend(parsed)
tables.append({
"page": page_number,
"rows": len(table.raw_content),
"is_maintenance": True,
})
# Also try unstructured text
text_schedules = table_parser.parse_text_block(ocr_text)
schedules.extend(text_schedules)
except Exception as e:
logger.warning(f"OCR failed for page {page_number}: {e}")
return schedules, tables
def _normalize_schedules(
self, schedules: list[ParsedScheduleRow]
) -> list[ExtractedSchedule]:
"""Normalize and deduplicate extracted schedules."""
# Group by normalized service name
by_service: dict[str, list[ParsedScheduleRow]] = {}
for schedule in schedules:
if schedule.confidence < self.MIN_CONFIDENCE:
continue
key = schedule.normalized_service or schedule.service.lower()
if key not in by_service:
by_service[key] = []
by_service[key].append(schedule)
# Merge duplicates, keeping highest confidence
results: list[ExtractedSchedule] = []
for service_key, items in by_service.items():
# Sort by confidence
items.sort(key=lambda x: x.confidence, reverse=True)
best = items[0]
# Merge interval info from other items if missing
miles = best.interval_miles
months = best.interval_months
details = best.details
fluid_spec = best.fluid_spec
for item in items[1:]:
if not miles and item.interval_miles:
miles = item.interval_miles
if not months and item.interval_months:
months = item.interval_months
if not details and item.details:
details = item.details
if not fluid_spec and item.fluid_spec:
fluid_spec = item.fluid_spec
# Build details string
detail_parts = []
if details:
detail_parts.append(details)
if fluid_spec:
detail_parts.append(f"Use {fluid_spec}")
results.append(
ExtractedSchedule(
service=best.normalized_service or best.service,
interval_miles=miles,
interval_months=months,
details=" - ".join(detail_parts) if detail_parts else None,
confidence=best.confidence,
subtypes=best.subtypes,
)
)
# Sort by confidence
results.sort(key=lambda x: x.confidence, reverse=True)
return results
def _extract_vehicle_info(
self, pdf_bytes: bytes, pdf_info: PdfInfo
) -> Optional[VehicleInfo]:
"""Extract vehicle make/model/year from manual."""
# Check metadata first
if pdf_info.title:
info = self._parse_vehicle_from_title(pdf_info.title)
if info:
return info
# Try first page
try:
first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0)
text = first_page.text_content
if not text and first_page.image_bytes:
# OCR first page
engine = create_engine()
ocr_result = engine.recognize(first_page.image_bytes, OcrConfig())
text = ocr_result.text
if text:
return self._parse_vehicle_from_text(text)
except Exception as e:
logger.warning(f"Failed to extract vehicle info: {e}")
return None
def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]:
"""Parse vehicle info from document title."""
import re
# Common patterns: "2024 Honda Civic Owner's Manual"
year_match = re.search(r"(20\d{2}|19\d{2})", title)
year = int(year_match.group(1)) if year_match else None
# Common makes
makes = [
"Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac",
"Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford",
"Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar",
"Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln",
"Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi",
"Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla",
"Toyota", "Volkswagen", "Volvo",
]
make = None
model = None
for m in makes:
if m.lower() in title.lower():
make = m
# Try to find model after make
idx = title.lower().find(m.lower())
after = title[idx + len(m):].strip()
# First word after make is likely model
model_match = re.match(r"^(\w+)", after)
if model_match:
model = model_match.group(1)
break
if year or make:
return VehicleInfo(make=make, model=model, year=year)
return None
def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]:
"""Parse vehicle info from page text."""
return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars
# Singleton instance
manual_extractor = ManualExtractor()