Some checks failed
Deploy to Staging / Build Images (pull_request) Failing after 4m14s
Deploy to Staging / Deploy to Staging (pull_request) Has been skipped
Deploy to Staging / Verify Staging (pull_request) Has been skipped
Deploy to Staging / Notify Staging Ready (pull_request) Has been skipped
Deploy to Staging / Notify Staging Failure (pull_request) Successful in 8s
420 lines
15 KiB
Python
420 lines
15 KiB
Python
"""Owner's manual extractor for maintenance schedule extraction."""
|
|
import io
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Callable, Optional
|
|
|
|
from PIL import Image
|
|
|
|
from app.engines import create_engine, OcrConfig
|
|
from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo
|
|
from app.table_extraction.detector import table_detector, DetectedTable
|
|
from app.table_extraction.parser import table_parser, ParsedScheduleRow
|
|
from app.patterns.maintenance_patterns import maintenance_matcher
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ExtractedSchedule:
|
|
"""A single extracted maintenance schedule."""
|
|
|
|
service: str
|
|
interval_miles: Optional[int]
|
|
interval_months: Optional[int]
|
|
details: Optional[str]
|
|
confidence: float
|
|
subtypes: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class VehicleInfo:
|
|
"""Vehicle information extracted from manual."""
|
|
|
|
make: Optional[str]
|
|
model: Optional[str]
|
|
year: Optional[int]
|
|
|
|
|
|
@dataclass
|
|
class ManualExtractionResult:
|
|
"""Complete result of manual extraction."""
|
|
|
|
success: bool
|
|
vehicle_info: Optional[VehicleInfo]
|
|
maintenance_schedules: list[ExtractedSchedule]
|
|
raw_tables: list[dict]
|
|
processing_time_ms: int
|
|
total_pages: int
|
|
pages_processed: int
|
|
error: Optional[str] = None
|
|
|
|
|
|
class ManualExtractor:
|
|
"""Extract maintenance schedules from owner's manuals.
|
|
|
|
Processing pipeline:
|
|
1. Analyze PDF structure
|
|
2. Find maintenance section pages
|
|
3. Extract text (native) or OCR (scanned)
|
|
4. Detect tables
|
|
5. Parse schedules
|
|
6. Normalize and deduplicate
|
|
"""
|
|
|
|
# Maximum pages to process for performance
|
|
MAX_PAGES_TO_PROCESS = 50
|
|
|
|
# Minimum confidence to include schedule
|
|
MIN_CONFIDENCE = 0.5
|
|
|
|
def extract(
|
|
self,
|
|
pdf_bytes: bytes,
|
|
progress_callback: Optional[Callable[[int, str], None]] = None,
|
|
) -> ManualExtractionResult:
|
|
"""
|
|
Extract maintenance schedules from an owner's manual PDF.
|
|
|
|
Args:
|
|
pdf_bytes: Raw PDF bytes
|
|
progress_callback: Optional callback for progress updates (percent, message)
|
|
|
|
Returns:
|
|
ManualExtractionResult with extracted data
|
|
"""
|
|
start_time = time.time()
|
|
|
|
def update_progress(percent: int, message: str) -> None:
|
|
if progress_callback:
|
|
progress_callback(percent, message)
|
|
logger.info(f"Progress {percent}%: {message}")
|
|
|
|
try:
|
|
update_progress(5, "Analyzing PDF structure")
|
|
|
|
# Get PDF info
|
|
pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes)
|
|
logger.info(
|
|
f"PDF: {pdf_info.total_pages} pages, "
|
|
f"has_text={pdf_info.has_text_layer}, "
|
|
f"is_scanned={pdf_info.is_scanned}"
|
|
)
|
|
|
|
update_progress(10, "Finding maintenance sections")
|
|
|
|
# Find pages likely to contain maintenance schedules
|
|
maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes)
|
|
|
|
if not maintenance_pages:
|
|
# If no specific pages found, process first N pages
|
|
maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages)))
|
|
logger.info("No specific maintenance section found, processing all pages")
|
|
else:
|
|
# Include pages before and after detected maintenance pages
|
|
expanded_pages: set[int] = set()
|
|
for page in maintenance_pages:
|
|
for offset in range(-2, 5): # Include 2 before, 4 after
|
|
new_page = page + offset
|
|
if 0 <= new_page < pdf_info.total_pages:
|
|
expanded_pages.add(new_page)
|
|
maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS]
|
|
logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section")
|
|
|
|
update_progress(15, "Extracting page content")
|
|
|
|
# Extract content from pages
|
|
all_schedules: list[ParsedScheduleRow] = []
|
|
all_tables: list[dict] = []
|
|
pages_processed = 0
|
|
|
|
for i, page_num in enumerate(maintenance_pages):
|
|
page_progress = 15 + int((i / len(maintenance_pages)) * 60)
|
|
update_progress(page_progress, f"Processing page {page_num + 1}")
|
|
|
|
# Extract page content
|
|
page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num)
|
|
pages_processed += 1
|
|
|
|
# Process based on content type
|
|
if page_content.has_text:
|
|
# Native PDF - use text directly
|
|
schedules, tables = self._process_text_page(
|
|
page_content.text_content, page_num
|
|
)
|
|
elif page_content.image_bytes:
|
|
# Scanned PDF - OCR required
|
|
schedules, tables = self._process_scanned_page(
|
|
page_content.image_bytes, page_num
|
|
)
|
|
else:
|
|
continue
|
|
|
|
all_schedules.extend(schedules)
|
|
all_tables.extend(tables)
|
|
|
|
update_progress(75, "Normalizing results")
|
|
|
|
# Deduplicate and normalize schedules
|
|
normalized_schedules = self._normalize_schedules(all_schedules)
|
|
|
|
update_progress(85, "Extracting vehicle information")
|
|
|
|
# Try to extract vehicle info from first few pages
|
|
vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info)
|
|
|
|
update_progress(95, "Finalizing results")
|
|
|
|
processing_time_ms = int((time.time() - start_time) * 1000)
|
|
|
|
logger.info(
|
|
f"Extraction complete: {len(normalized_schedules)} schedules from "
|
|
f"{pages_processed} pages in {processing_time_ms}ms"
|
|
)
|
|
|
|
update_progress(100, "Complete")
|
|
|
|
return ManualExtractionResult(
|
|
success=True,
|
|
vehicle_info=vehicle_info,
|
|
maintenance_schedules=normalized_schedules,
|
|
raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables],
|
|
processing_time_ms=processing_time_ms,
|
|
total_pages=pdf_info.total_pages,
|
|
pages_processed=pages_processed,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Manual extraction failed: {e}", exc_info=True)
|
|
processing_time_ms = int((time.time() - start_time) * 1000)
|
|
|
|
return ManualExtractionResult(
|
|
success=False,
|
|
vehicle_info=None,
|
|
maintenance_schedules=[],
|
|
raw_tables=[],
|
|
processing_time_ms=processing_time_ms,
|
|
total_pages=0,
|
|
pages_processed=0,
|
|
error=str(e),
|
|
)
|
|
|
|
def _process_text_page(
|
|
self, text: str, page_number: int
|
|
) -> tuple[list[ParsedScheduleRow], list[dict]]:
|
|
"""Process a native PDF page with text."""
|
|
schedules: list[ParsedScheduleRow] = []
|
|
tables: list[dict] = []
|
|
|
|
# Detect tables in text
|
|
detected_tables = table_detector.detect_tables_in_text(text, page_number)
|
|
|
|
for table in detected_tables:
|
|
if table.is_maintenance_table and table.header_row:
|
|
# Parse table
|
|
parsed = table_parser.parse_table(
|
|
table.header_row,
|
|
table.raw_content,
|
|
)
|
|
schedules.extend(parsed)
|
|
|
|
tables.append({
|
|
"page": page_number,
|
|
"rows": len(table.raw_content),
|
|
"is_maintenance": True,
|
|
})
|
|
|
|
# Also try to extract from unstructured text
|
|
text_schedules = table_parser.parse_text_block(text)
|
|
schedules.extend(text_schedules)
|
|
|
|
return schedules, tables
|
|
|
|
def _process_scanned_page(
|
|
self, image_bytes: bytes, page_number: int
|
|
) -> tuple[list[ParsedScheduleRow], list[dict]]:
|
|
"""Process a scanned PDF page with OCR."""
|
|
schedules: list[ParsedScheduleRow] = []
|
|
tables: list[dict] = []
|
|
|
|
# Detect tables in image
|
|
detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number)
|
|
|
|
# OCR the full page
|
|
try:
|
|
engine = create_engine()
|
|
ocr_result = engine.recognize(image_bytes, OcrConfig())
|
|
ocr_text = ocr_result.text
|
|
|
|
# Mark tables as maintenance if page contains maintenance keywords
|
|
for table in detected_tables:
|
|
table.is_maintenance_table = table_detector.is_maintenance_table(
|
|
table, ocr_text
|
|
)
|
|
|
|
# Try to extract from OCR text
|
|
text_tables = table_detector.detect_tables_in_text(ocr_text, page_number)
|
|
|
|
for table in text_tables:
|
|
if table.is_maintenance_table and table.header_row:
|
|
parsed = table_parser.parse_table(
|
|
table.header_row,
|
|
table.raw_content,
|
|
)
|
|
schedules.extend(parsed)
|
|
|
|
tables.append({
|
|
"page": page_number,
|
|
"rows": len(table.raw_content),
|
|
"is_maintenance": True,
|
|
})
|
|
|
|
# Also try unstructured text
|
|
text_schedules = table_parser.parse_text_block(ocr_text)
|
|
schedules.extend(text_schedules)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"OCR failed for page {page_number}: {e}")
|
|
|
|
return schedules, tables
|
|
|
|
def _normalize_schedules(
|
|
self, schedules: list[ParsedScheduleRow]
|
|
) -> list[ExtractedSchedule]:
|
|
"""Normalize and deduplicate extracted schedules."""
|
|
# Group by normalized service name
|
|
by_service: dict[str, list[ParsedScheduleRow]] = {}
|
|
|
|
for schedule in schedules:
|
|
if schedule.confidence < self.MIN_CONFIDENCE:
|
|
continue
|
|
|
|
key = schedule.normalized_service or schedule.service.lower()
|
|
if key not in by_service:
|
|
by_service[key] = []
|
|
by_service[key].append(schedule)
|
|
|
|
# Merge duplicates, keeping highest confidence
|
|
results: list[ExtractedSchedule] = []
|
|
|
|
for service_key, items in by_service.items():
|
|
# Sort by confidence
|
|
items.sort(key=lambda x: x.confidence, reverse=True)
|
|
best = items[0]
|
|
|
|
# Merge interval info from other items if missing
|
|
miles = best.interval_miles
|
|
months = best.interval_months
|
|
details = best.details
|
|
fluid_spec = best.fluid_spec
|
|
|
|
for item in items[1:]:
|
|
if not miles and item.interval_miles:
|
|
miles = item.interval_miles
|
|
if not months and item.interval_months:
|
|
months = item.interval_months
|
|
if not details and item.details:
|
|
details = item.details
|
|
if not fluid_spec and item.fluid_spec:
|
|
fluid_spec = item.fluid_spec
|
|
|
|
# Build details string
|
|
detail_parts = []
|
|
if details:
|
|
detail_parts.append(details)
|
|
if fluid_spec:
|
|
detail_parts.append(f"Use {fluid_spec}")
|
|
|
|
results.append(
|
|
ExtractedSchedule(
|
|
service=best.normalized_service or best.service,
|
|
interval_miles=miles,
|
|
interval_months=months,
|
|
details=" - ".join(detail_parts) if detail_parts else None,
|
|
confidence=best.confidence,
|
|
subtypes=best.subtypes,
|
|
)
|
|
)
|
|
|
|
# Sort by confidence
|
|
results.sort(key=lambda x: x.confidence, reverse=True)
|
|
|
|
return results
|
|
|
|
def _extract_vehicle_info(
|
|
self, pdf_bytes: bytes, pdf_info: PdfInfo
|
|
) -> Optional[VehicleInfo]:
|
|
"""Extract vehicle make/model/year from manual."""
|
|
# Check metadata first
|
|
if pdf_info.title:
|
|
info = self._parse_vehicle_from_title(pdf_info.title)
|
|
if info:
|
|
return info
|
|
|
|
# Try first page
|
|
try:
|
|
first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0)
|
|
text = first_page.text_content
|
|
|
|
if not text and first_page.image_bytes:
|
|
# OCR first page
|
|
engine = create_engine()
|
|
ocr_result = engine.recognize(first_page.image_bytes, OcrConfig())
|
|
text = ocr_result.text
|
|
|
|
if text:
|
|
return self._parse_vehicle_from_text(text)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract vehicle info: {e}")
|
|
|
|
return None
|
|
|
|
def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]:
|
|
"""Parse vehicle info from document title."""
|
|
import re
|
|
|
|
# Common patterns: "2024 Honda Civic Owner's Manual"
|
|
year_match = re.search(r"(20\d{2}|19\d{2})", title)
|
|
year = int(year_match.group(1)) if year_match else None
|
|
|
|
# Common makes
|
|
makes = [
|
|
"Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac",
|
|
"Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford",
|
|
"Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar",
|
|
"Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln",
|
|
"Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi",
|
|
"Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla",
|
|
"Toyota", "Volkswagen", "Volvo",
|
|
]
|
|
|
|
make = None
|
|
model = None
|
|
|
|
for m in makes:
|
|
if m.lower() in title.lower():
|
|
make = m
|
|
# Try to find model after make
|
|
idx = title.lower().find(m.lower())
|
|
after = title[idx + len(m):].strip()
|
|
# First word after make is likely model
|
|
model_match = re.match(r"^(\w+)", after)
|
|
if model_match:
|
|
model = model_match.group(1)
|
|
break
|
|
|
|
if year or make:
|
|
return VehicleInfo(make=make, model=model, year=year)
|
|
|
|
return None
|
|
|
|
def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]:
|
|
"""Parse vehicle info from page text."""
|
|
return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars
|
|
|
|
|
|
# Singleton instance
|
|
manual_extractor = ManualExtractor()
|