Files
motovaultpro/ocr/app/extractors/manual_extractor.py
Eric Gullickson b9fe222f12
Some checks failed
Deploy to Staging / Build Images (pull_request) Failing after 4m14s
Deploy to Staging / Deploy to Staging (pull_request) Has been skipped
Deploy to Staging / Verify Staging (pull_request) Has been skipped
Deploy to Staging / Notify Staging Ready (pull_request) Has been skipped
Deploy to Staging / Notify Staging Failure (pull_request) Successful in 8s
fix: Build errors and tesseract removal
2026-02-07 12:12:04 -06:00

420 lines
15 KiB
Python

"""Owner's manual extractor for maintenance schedule extraction."""
import io
import logging
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
from PIL import Image
from app.engines import create_engine, OcrConfig
from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo
from app.table_extraction.detector import table_detector, DetectedTable
from app.table_extraction.parser import table_parser, ParsedScheduleRow
from app.patterns.maintenance_patterns import maintenance_matcher
logger = logging.getLogger(__name__)
@dataclass
class ExtractedSchedule:
"""A single extracted maintenance schedule."""
service: str
interval_miles: Optional[int]
interval_months: Optional[int]
details: Optional[str]
confidence: float
subtypes: list[str] = field(default_factory=list)
@dataclass
class VehicleInfo:
"""Vehicle information extracted from manual."""
make: Optional[str]
model: Optional[str]
year: Optional[int]
@dataclass
class ManualExtractionResult:
"""Complete result of manual extraction."""
success: bool
vehicle_info: Optional[VehicleInfo]
maintenance_schedules: list[ExtractedSchedule]
raw_tables: list[dict]
processing_time_ms: int
total_pages: int
pages_processed: int
error: Optional[str] = None
class ManualExtractor:
"""Extract maintenance schedules from owner's manuals.
Processing pipeline:
1. Analyze PDF structure
2. Find maintenance section pages
3. Extract text (native) or OCR (scanned)
4. Detect tables
5. Parse schedules
6. Normalize and deduplicate
"""
# Maximum pages to process for performance
MAX_PAGES_TO_PROCESS = 50
# Minimum confidence to include schedule
MIN_CONFIDENCE = 0.5
def extract(
self,
pdf_bytes: bytes,
progress_callback: Optional[Callable[[int, str], None]] = None,
) -> ManualExtractionResult:
"""
Extract maintenance schedules from an owner's manual PDF.
Args:
pdf_bytes: Raw PDF bytes
progress_callback: Optional callback for progress updates (percent, message)
Returns:
ManualExtractionResult with extracted data
"""
start_time = time.time()
def update_progress(percent: int, message: str) -> None:
if progress_callback:
progress_callback(percent, message)
logger.info(f"Progress {percent}%: {message}")
try:
update_progress(5, "Analyzing PDF structure")
# Get PDF info
pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes)
logger.info(
f"PDF: {pdf_info.total_pages} pages, "
f"has_text={pdf_info.has_text_layer}, "
f"is_scanned={pdf_info.is_scanned}"
)
update_progress(10, "Finding maintenance sections")
# Find pages likely to contain maintenance schedules
maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes)
if not maintenance_pages:
# If no specific pages found, process first N pages
maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages)))
logger.info("No specific maintenance section found, processing all pages")
else:
# Include pages before and after detected maintenance pages
expanded_pages: set[int] = set()
for page in maintenance_pages:
for offset in range(-2, 5): # Include 2 before, 4 after
new_page = page + offset
if 0 <= new_page < pdf_info.total_pages:
expanded_pages.add(new_page)
maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS]
logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section")
update_progress(15, "Extracting page content")
# Extract content from pages
all_schedules: list[ParsedScheduleRow] = []
all_tables: list[dict] = []
pages_processed = 0
for i, page_num in enumerate(maintenance_pages):
page_progress = 15 + int((i / len(maintenance_pages)) * 60)
update_progress(page_progress, f"Processing page {page_num + 1}")
# Extract page content
page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num)
pages_processed += 1
# Process based on content type
if page_content.has_text:
# Native PDF - use text directly
schedules, tables = self._process_text_page(
page_content.text_content, page_num
)
elif page_content.image_bytes:
# Scanned PDF - OCR required
schedules, tables = self._process_scanned_page(
page_content.image_bytes, page_num
)
else:
continue
all_schedules.extend(schedules)
all_tables.extend(tables)
update_progress(75, "Normalizing results")
# Deduplicate and normalize schedules
normalized_schedules = self._normalize_schedules(all_schedules)
update_progress(85, "Extracting vehicle information")
# Try to extract vehicle info from first few pages
vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info)
update_progress(95, "Finalizing results")
processing_time_ms = int((time.time() - start_time) * 1000)
logger.info(
f"Extraction complete: {len(normalized_schedules)} schedules from "
f"{pages_processed} pages in {processing_time_ms}ms"
)
update_progress(100, "Complete")
return ManualExtractionResult(
success=True,
vehicle_info=vehicle_info,
maintenance_schedules=normalized_schedules,
raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables],
processing_time_ms=processing_time_ms,
total_pages=pdf_info.total_pages,
pages_processed=pages_processed,
)
except Exception as e:
logger.error(f"Manual extraction failed: {e}", exc_info=True)
processing_time_ms = int((time.time() - start_time) * 1000)
return ManualExtractionResult(
success=False,
vehicle_info=None,
maintenance_schedules=[],
raw_tables=[],
processing_time_ms=processing_time_ms,
total_pages=0,
pages_processed=0,
error=str(e),
)
def _process_text_page(
self, text: str, page_number: int
) -> tuple[list[ParsedScheduleRow], list[dict]]:
"""Process a native PDF page with text."""
schedules: list[ParsedScheduleRow] = []
tables: list[dict] = []
# Detect tables in text
detected_tables = table_detector.detect_tables_in_text(text, page_number)
for table in detected_tables:
if table.is_maintenance_table and table.header_row:
# Parse table
parsed = table_parser.parse_table(
table.header_row,
table.raw_content,
)
schedules.extend(parsed)
tables.append({
"page": page_number,
"rows": len(table.raw_content),
"is_maintenance": True,
})
# Also try to extract from unstructured text
text_schedules = table_parser.parse_text_block(text)
schedules.extend(text_schedules)
return schedules, tables
def _process_scanned_page(
self, image_bytes: bytes, page_number: int
) -> tuple[list[ParsedScheduleRow], list[dict]]:
"""Process a scanned PDF page with OCR."""
schedules: list[ParsedScheduleRow] = []
tables: list[dict] = []
# Detect tables in image
detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number)
# OCR the full page
try:
engine = create_engine()
ocr_result = engine.recognize(image_bytes, OcrConfig())
ocr_text = ocr_result.text
# Mark tables as maintenance if page contains maintenance keywords
for table in detected_tables:
table.is_maintenance_table = table_detector.is_maintenance_table(
table, ocr_text
)
# Try to extract from OCR text
text_tables = table_detector.detect_tables_in_text(ocr_text, page_number)
for table in text_tables:
if table.is_maintenance_table and table.header_row:
parsed = table_parser.parse_table(
table.header_row,
table.raw_content,
)
schedules.extend(parsed)
tables.append({
"page": page_number,
"rows": len(table.raw_content),
"is_maintenance": True,
})
# Also try unstructured text
text_schedules = table_parser.parse_text_block(ocr_text)
schedules.extend(text_schedules)
except Exception as e:
logger.warning(f"OCR failed for page {page_number}: {e}")
return schedules, tables
def _normalize_schedules(
self, schedules: list[ParsedScheduleRow]
) -> list[ExtractedSchedule]:
"""Normalize and deduplicate extracted schedules."""
# Group by normalized service name
by_service: dict[str, list[ParsedScheduleRow]] = {}
for schedule in schedules:
if schedule.confidence < self.MIN_CONFIDENCE:
continue
key = schedule.normalized_service or schedule.service.lower()
if key not in by_service:
by_service[key] = []
by_service[key].append(schedule)
# Merge duplicates, keeping highest confidence
results: list[ExtractedSchedule] = []
for service_key, items in by_service.items():
# Sort by confidence
items.sort(key=lambda x: x.confidence, reverse=True)
best = items[0]
# Merge interval info from other items if missing
miles = best.interval_miles
months = best.interval_months
details = best.details
fluid_spec = best.fluid_spec
for item in items[1:]:
if not miles and item.interval_miles:
miles = item.interval_miles
if not months and item.interval_months:
months = item.interval_months
if not details and item.details:
details = item.details
if not fluid_spec and item.fluid_spec:
fluid_spec = item.fluid_spec
# Build details string
detail_parts = []
if details:
detail_parts.append(details)
if fluid_spec:
detail_parts.append(f"Use {fluid_spec}")
results.append(
ExtractedSchedule(
service=best.normalized_service or best.service,
interval_miles=miles,
interval_months=months,
details=" - ".join(detail_parts) if detail_parts else None,
confidence=best.confidence,
subtypes=best.subtypes,
)
)
# Sort by confidence
results.sort(key=lambda x: x.confidence, reverse=True)
return results
def _extract_vehicle_info(
self, pdf_bytes: bytes, pdf_info: PdfInfo
) -> Optional[VehicleInfo]:
"""Extract vehicle make/model/year from manual."""
# Check metadata first
if pdf_info.title:
info = self._parse_vehicle_from_title(pdf_info.title)
if info:
return info
# Try first page
try:
first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0)
text = first_page.text_content
if not text and first_page.image_bytes:
# OCR first page
engine = create_engine()
ocr_result = engine.recognize(first_page.image_bytes, OcrConfig())
text = ocr_result.text
if text:
return self._parse_vehicle_from_text(text)
except Exception as e:
logger.warning(f"Failed to extract vehicle info: {e}")
return None
def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]:
"""Parse vehicle info from document title."""
import re
# Common patterns: "2024 Honda Civic Owner's Manual"
year_match = re.search(r"(20\d{2}|19\d{2})", title)
year = int(year_match.group(1)) if year_match else None
# Common makes
makes = [
"Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac",
"Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford",
"Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar",
"Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln",
"Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi",
"Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla",
"Toyota", "Volkswagen", "Volvo",
]
make = None
model = None
for m in makes:
if m.lower() in title.lower():
make = m
# Try to find model after make
idx = title.lower().find(m.lower())
after = title[idx + len(m):].strip()
# First word after make is likely model
model_match = re.match(r"^(\w+)", after)
if model_match:
model = model_match.group(1)
break
if year or make:
return VehicleInfo(make=make, model=model, year=year)
return None
def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]:
"""Parse vehicle info from page text."""
return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars
# Singleton instance
manual_extractor = ManualExtractor()