feat: Owner's Manual OCR Pipeline (#71) #79

Merged
egullickson merged 1 commits from issue-71-manual-ocr-pipeline into main 2026-02-02 03:37:34 +00:00
20 changed files with 2904 additions and 14 deletions

View File

@@ -8,6 +8,13 @@ from app.extractors.receipt_extractor import (
ExtractedField,
)
from app.extractors.fuel_receipt import FuelReceiptExtractor, fuel_receipt_extractor
from app.extractors.manual_extractor import (
ManualExtractor,
manual_extractor,
ManualExtractionResult,
ExtractedSchedule,
VehicleInfo,
)
__all__ = [
"BaseExtractor",
@@ -20,4 +27,9 @@ __all__ = [
"ExtractedField",
"FuelReceiptExtractor",
"fuel_receipt_extractor",
"ManualExtractor",
"manual_extractor",
"ManualExtractionResult",
"ExtractedSchedule",
"VehicleInfo",
]

View File

@@ -0,0 +1,417 @@
"""Owner's manual extractor for maintenance schedule extraction."""
import io
import logging
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
import pytesseract
from PIL import Image
from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo
from app.table_extraction.detector import table_detector, DetectedTable
from app.table_extraction.parser import table_parser, ParsedScheduleRow
from app.patterns.maintenance_patterns import maintenance_matcher
logger = logging.getLogger(__name__)
@dataclass
class ExtractedSchedule:
"""A single extracted maintenance schedule."""
service: str
interval_miles: Optional[int]
interval_months: Optional[int]
details: Optional[str]
confidence: float
subtypes: list[str] = field(default_factory=list)
@dataclass
class VehicleInfo:
"""Vehicle information extracted from manual."""
make: Optional[str]
model: Optional[str]
year: Optional[int]
@dataclass
class ManualExtractionResult:
"""Complete result of manual extraction."""
success: bool
vehicle_info: Optional[VehicleInfo]
maintenance_schedules: list[ExtractedSchedule]
raw_tables: list[dict]
processing_time_ms: int
total_pages: int
pages_processed: int
error: Optional[str] = None
class ManualExtractor:
"""Extract maintenance schedules from owner's manuals.
Processing pipeline:
1. Analyze PDF structure
2. Find maintenance section pages
3. Extract text (native) or OCR (scanned)
4. Detect tables
5. Parse schedules
6. Normalize and deduplicate
"""
# Maximum pages to process for performance
MAX_PAGES_TO_PROCESS = 50
# Minimum confidence to include schedule
MIN_CONFIDENCE = 0.5
def extract(
self,
pdf_bytes: bytes,
progress_callback: Optional[Callable[[int, str], None]] = None,
) -> ManualExtractionResult:
"""
Extract maintenance schedules from an owner's manual PDF.
Args:
pdf_bytes: Raw PDF bytes
progress_callback: Optional callback for progress updates (percent, message)
Returns:
ManualExtractionResult with extracted data
"""
start_time = time.time()
def update_progress(percent: int, message: str) -> None:
if progress_callback:
progress_callback(percent, message)
logger.info(f"Progress {percent}%: {message}")
try:
update_progress(5, "Analyzing PDF structure")
# Get PDF info
pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes)
logger.info(
f"PDF: {pdf_info.total_pages} pages, "
f"has_text={pdf_info.has_text_layer}, "
f"is_scanned={pdf_info.is_scanned}"
)
update_progress(10, "Finding maintenance sections")
# Find pages likely to contain maintenance schedules
maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes)
if not maintenance_pages:
# If no specific pages found, process first N pages
maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages)))
logger.info("No specific maintenance section found, processing all pages")
else:
# Include pages before and after detected maintenance pages
expanded_pages: set[int] = set()
for page in maintenance_pages:
for offset in range(-2, 5): # Include 2 before, 4 after
new_page = page + offset
if 0 <= new_page < pdf_info.total_pages:
expanded_pages.add(new_page)
maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS]
logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section")
update_progress(15, "Extracting page content")
# Extract content from pages
all_schedules: list[ParsedScheduleRow] = []
all_tables: list[dict] = []
pages_processed = 0
for i, page_num in enumerate(maintenance_pages):
page_progress = 15 + int((i / len(maintenance_pages)) * 60)
update_progress(page_progress, f"Processing page {page_num + 1}")
# Extract page content
page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num)
pages_processed += 1
# Process based on content type
if page_content.has_text:
# Native PDF - use text directly
schedules, tables = self._process_text_page(
page_content.text_content, page_num
)
elif page_content.image_bytes:
# Scanned PDF - OCR required
schedules, tables = self._process_scanned_page(
page_content.image_bytes, page_num
)
else:
continue
all_schedules.extend(schedules)
all_tables.extend(tables)
update_progress(75, "Normalizing results")
# Deduplicate and normalize schedules
normalized_schedules = self._normalize_schedules(all_schedules)
update_progress(85, "Extracting vehicle information")
# Try to extract vehicle info from first few pages
vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info)
update_progress(95, "Finalizing results")
processing_time_ms = int((time.time() - start_time) * 1000)
logger.info(
f"Extraction complete: {len(normalized_schedules)} schedules from "
f"{pages_processed} pages in {processing_time_ms}ms"
)
update_progress(100, "Complete")
return ManualExtractionResult(
success=True,
vehicle_info=vehicle_info,
maintenance_schedules=normalized_schedules,
raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables],
processing_time_ms=processing_time_ms,
total_pages=pdf_info.total_pages,
pages_processed=pages_processed,
)
except Exception as e:
logger.error(f"Manual extraction failed: {e}", exc_info=True)
processing_time_ms = int((time.time() - start_time) * 1000)
return ManualExtractionResult(
success=False,
vehicle_info=None,
maintenance_schedules=[],
raw_tables=[],
processing_time_ms=processing_time_ms,
total_pages=0,
pages_processed=0,
error=str(e),
)
def _process_text_page(
self, text: str, page_number: int
) -> tuple[list[ParsedScheduleRow], list[dict]]:
"""Process a native PDF page with text."""
schedules: list[ParsedScheduleRow] = []
tables: list[dict] = []
# Detect tables in text
detected_tables = table_detector.detect_tables_in_text(text, page_number)
for table in detected_tables:
if table.is_maintenance_table and table.header_row:
# Parse table
parsed = table_parser.parse_table(
table.header_row,
table.raw_content,
)
schedules.extend(parsed)
tables.append({
"page": page_number,
"rows": len(table.raw_content),
"is_maintenance": True,
})
# Also try to extract from unstructured text
text_schedules = table_parser.parse_text_block(text)
schedules.extend(text_schedules)
return schedules, tables
def _process_scanned_page(
self, image_bytes: bytes, page_number: int
) -> tuple[list[ParsedScheduleRow], list[dict]]:
"""Process a scanned PDF page with OCR."""
schedules: list[ParsedScheduleRow] = []
tables: list[dict] = []
# Detect tables in image
detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number)
# OCR the full page
try:
image = Image.open(io.BytesIO(image_bytes))
ocr_text = pytesseract.image_to_string(image)
# Mark tables as maintenance if page contains maintenance keywords
for table in detected_tables:
table.is_maintenance_table = table_detector.is_maintenance_table(
table, ocr_text
)
# Try to extract from OCR text
text_tables = table_detector.detect_tables_in_text(ocr_text, page_number)
for table in text_tables:
if table.is_maintenance_table and table.header_row:
parsed = table_parser.parse_table(
table.header_row,
table.raw_content,
)
schedules.extend(parsed)
tables.append({
"page": page_number,
"rows": len(table.raw_content),
"is_maintenance": True,
})
# Also try unstructured text
text_schedules = table_parser.parse_text_block(ocr_text)
schedules.extend(text_schedules)
except Exception as e:
logger.warning(f"OCR failed for page {page_number}: {e}")
return schedules, tables
def _normalize_schedules(
self, schedules: list[ParsedScheduleRow]
) -> list[ExtractedSchedule]:
"""Normalize and deduplicate extracted schedules."""
# Group by normalized service name
by_service: dict[str, list[ParsedScheduleRow]] = {}
for schedule in schedules:
if schedule.confidence < self.MIN_CONFIDENCE:
continue
key = schedule.normalized_service or schedule.service.lower()
if key not in by_service:
by_service[key] = []
by_service[key].append(schedule)
# Merge duplicates, keeping highest confidence
results: list[ExtractedSchedule] = []
for service_key, items in by_service.items():
# Sort by confidence
items.sort(key=lambda x: x.confidence, reverse=True)
best = items[0]
# Merge interval info from other items if missing
miles = best.interval_miles
months = best.interval_months
details = best.details
fluid_spec = best.fluid_spec
for item in items[1:]:
if not miles and item.interval_miles:
miles = item.interval_miles
if not months and item.interval_months:
months = item.interval_months
if not details and item.details:
details = item.details
if not fluid_spec and item.fluid_spec:
fluid_spec = item.fluid_spec
# Build details string
detail_parts = []
if details:
detail_parts.append(details)
if fluid_spec:
detail_parts.append(f"Use {fluid_spec}")
results.append(
ExtractedSchedule(
service=best.normalized_service or best.service,
interval_miles=miles,
interval_months=months,
details=" - ".join(detail_parts) if detail_parts else None,
confidence=best.confidence,
subtypes=best.subtypes,
)
)
# Sort by confidence
results.sort(key=lambda x: x.confidence, reverse=True)
return results
def _extract_vehicle_info(
self, pdf_bytes: bytes, pdf_info: PdfInfo
) -> Optional[VehicleInfo]:
"""Extract vehicle make/model/year from manual."""
# Check metadata first
if pdf_info.title:
info = self._parse_vehicle_from_title(pdf_info.title)
if info:
return info
# Try first page
try:
first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0)
text = first_page.text_content
if not text and first_page.image_bytes:
# OCR first page
image = Image.open(io.BytesIO(first_page.image_bytes))
text = pytesseract.image_to_string(image)
if text:
return self._parse_vehicle_from_text(text)
except Exception as e:
logger.warning(f"Failed to extract vehicle info: {e}")
return None
def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]:
"""Parse vehicle info from document title."""
import re
# Common patterns: "2024 Honda Civic Owner's Manual"
year_match = re.search(r"(20\d{2}|19\d{2})", title)
year = int(year_match.group(1)) if year_match else None
# Common makes
makes = [
"Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac",
"Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford",
"Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar",
"Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln",
"Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi",
"Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla",
"Toyota", "Volkswagen", "Volvo",
]
make = None
model = None
for m in makes:
if m.lower() in title.lower():
make = m
# Try to find model after make
idx = title.lower().find(m.lower())
after = title[idx + len(m):].strip()
# First word after make is likely model
model_match = re.match(r"^(\w+)", after)
if model_match:
model = model_match.group(1)
break
if year or make:
return VehicleInfo(make=make, model=model, year=year)
return None
def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]:
"""Parse vehicle info from page text."""
return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars
# Singleton instance
manual_extractor = ManualExtractor()

View File

@@ -56,6 +56,8 @@ async def root() -> dict:
"endpoints": [
"POST /extract - Synchronous OCR extraction",
"POST /extract/vin - VIN-specific extraction with validation",
"POST /extract/receipt - Receipt extraction (fuel, general)",
"POST /extract/manual - Owner's manual extraction (async)",
"POST /jobs - Submit async OCR job",
"GET /jobs/{job_id} - Get async job status",
],

View File

@@ -6,6 +6,10 @@ from .schemas import (
JobResponse,
JobStatus,
JobSubmitRequest,
ManualExtractionResponse,
ManualJobResponse,
ManualMaintenanceSchedule,
ManualVehicleInfo,
OcrResponse,
ReceiptExtractedField,
ReceiptExtractionResponse,
@@ -20,6 +24,10 @@ __all__ = [
"JobResponse",
"JobStatus",
"JobSubmitRequest",
"ManualExtractionResponse",
"ManualJobResponse",
"ManualMaintenanceSchedule",
"ManualVehicleInfo",
"OcrResponse",
"ReceiptExtractedField",
"ReceiptExtractionResponse",

View File

@@ -115,3 +115,57 @@ class ReceiptExtractionResponse(BaseModel):
error: Optional[str] = None
model_config = {"populate_by_name": True}
# Manual extraction models
class ManualVehicleInfo(BaseModel):
"""Vehicle information extracted from manual."""
make: Optional[str] = None
model: Optional[str] = None
year: Optional[int] = None
class ManualMaintenanceSchedule(BaseModel):
"""A single maintenance schedule entry."""
service: str
interval_miles: Optional[int] = Field(default=None, alias="intervalMiles")
interval_months: Optional[int] = Field(default=None, alias="intervalMonths")
details: Optional[str] = None
confidence: float = Field(ge=0.0, le=1.0)
subtypes: list[str] = Field(default_factory=list)
model_config = {"populate_by_name": True}
class ManualExtractionResponse(BaseModel):
"""Response from manual extraction endpoint."""
success: bool
vehicle_info: Optional[ManualVehicleInfo] = Field(default=None, alias="vehicleInfo")
maintenance_schedules: list[ManualMaintenanceSchedule] = Field(
default_factory=list, alias="maintenanceSchedules"
)
raw_tables: list[dict] = Field(default_factory=list, alias="rawTables")
processing_time_ms: int = Field(alias="processingTimeMs")
total_pages: int = Field(alias="totalPages")
pages_processed: int = Field(alias="pagesProcessed")
error: Optional[str] = None
model_config = {"populate_by_name": True}
class ManualJobResponse(BaseModel):
"""Response for async manual extraction job."""
job_id: str = Field(alias="jobId")
status: JobStatus
progress: Optional[int] = Field(default=None, ge=0, le=100)
estimated_seconds: Optional[int] = Field(default=None, alias="estimatedSeconds")
result: Optional[ManualExtractionResponse] = None
error: Optional[str] = None
model_config = {"populate_by_name": True}

View File

@@ -1,7 +1,9 @@
"""Pattern matching modules for receipt field extraction."""
"""Pattern matching modules for receipt and manual field extraction."""
from app.patterns.date_patterns import DatePatternMatcher, date_matcher
from app.patterns.currency_patterns import CurrencyPatternMatcher, currency_matcher
from app.patterns.fuel_patterns import FuelPatternMatcher, fuel_matcher
from app.patterns.maintenance_patterns import MaintenancePatternMatcher, maintenance_matcher
from app.patterns.service_mapping import ServiceMapper, service_mapper
__all__ = [
"DatePatternMatcher",
@@ -10,4 +12,8 @@ __all__ = [
"currency_matcher",
"FuelPatternMatcher",
"fuel_matcher",
"MaintenancePatternMatcher",
"maintenance_matcher",
"ServiceMapper",
"service_mapper",
]

View File

@@ -0,0 +1,335 @@
"""Maintenance schedule pattern matching for owner's manual extraction."""
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class MileageIntervalMatch:
"""Result of mileage interval pattern matching."""
value: int # Miles
raw_match: str
confidence: float
pattern_name: str
@dataclass
class TimeIntervalMatch:
"""Result of time interval pattern matching."""
value: int # Months
raw_match: str
confidence: float
pattern_name: str
@dataclass
class FluidSpecMatch:
"""Result of fluid specification pattern matching."""
value: str # e.g., "0W-20", "ATF-Z1", "DOT 4"
fluid_type: str # e.g., "oil", "transmission", "brake"
raw_match: str
confidence: float
class MaintenancePatternMatcher:
"""Extract maintenance-specific data from owner's manual text."""
# Mileage interval patterns
MILEAGE_PATTERNS = [
# "every 5,000 miles" or "every 5000 miles"
(
r"every\s+([\d,]+)\s*(?:miles?|mi\.?)",
"every_miles",
0.95,
),
# "at 30,000 mi" or "at 30000 miles"
(
r"at\s+([\d,]+)\s*(?:miles?|mi\.?)",
"at_miles",
0.93,
),
# "5,000 miles or" (interval before "or")
(
r"([\d,]+)\s*(?:miles?|mi\.?)\s*(?:or|/)",
"miles_or",
0.90,
),
# "every 5,000-7,500 miles" (range - take lower)
(
r"every\s+([\d,]+)\s*[-]\s*[\d,]+\s*(?:miles?|mi\.?)",
"miles_range",
0.88,
),
# "7,500 mi/12 months" (interval with slash)
(
r"([\d,]+)\s*(?:miles?|mi\.?)\s*/",
"miles_slash",
0.87,
),
# Standalone "X,XXX miles" in table context
(
r"(?<![0-9])([\d,]+)\s*(?:miles?|mi\.?)(?![a-z])",
"standalone_miles",
0.75,
),
]
# Time interval patterns
TIME_PATTERNS = [
# "every 6 months"
(
r"every\s+(\d+)\s*months?",
"every_months",
0.95,
),
# "6 months or" (interval before "or")
(
r"(\d+)\s*months?\s*(?:or|/)",
"months_or",
0.90,
),
# "annually" -> 12 months
(
r"\bannually\b",
"annually",
0.95,
),
# "semi-annually" or "semi-annual" -> 6 months
(
r"\bsemi-?annual(?:ly)?\b",
"semi_annual",
0.95,
),
# "every year" -> 12 months
(
r"every\s+year",
"every_year",
0.93,
),
# "every 2 years" -> 24 months
(
r"every\s+(\d+)\s*years?",
"every_years",
0.93,
),
# "12 mo/7,500 mi" or "12 months/"
(
r"(\d+)\s*(?:mo(?:nths?)?\.?)\s*/",
"months_slash",
0.87,
),
# Standalone "X months" in table context
(
r"(?<![0-9])(\d+)\s*months?(?![a-z])",
"standalone_months",
0.75,
),
]
# Fluid specification patterns
FLUID_PATTERNS = [
# Oil viscosity: 0W-20, 5W-30, 10W-40
(
r"\b(\d+W-\d+)\b",
"oil",
0.95,
),
# Full synthetic variants
(
r"(full\s+synthetic\s+\d+W-\d+)",
"oil",
0.93,
),
# Transmission fluid: ATF-Z1, ATF+4, Dexron VI
(
r"\b(ATF[- ]?\w+)\b",
"transmission",
0.90,
),
(
r"\b(Dexron\s*(?:VI|IV|III)?)\b",
"transmission",
0.90,
),
(
r"\b(Mercon\s*(?:V|LV|SP)?)\b",
"transmission",
0.90,
),
# Brake fluid: DOT 3, DOT 4, DOT 5.1
(
r"\b(DOT\s*\d(?:\.\d)?)\b",
"brake",
0.95,
),
# Coolant types
(
r"\b((?:Type\s+)?(?:2|II)\s+(?:coolant|antifreeze))\b",
"coolant",
0.88,
),
(
r"\b((?:50/50|pre-mixed)\s+(?:coolant|antifreeze))\b",
"coolant",
0.85,
),
# Power steering fluid
(
r"\b(power\s+steering\s+fluid)\b",
"power_steering",
0.90,
),
]
def extract_mileage_interval(self, text: str) -> Optional[MileageIntervalMatch]:
"""
Extract mileage interval from text.
Args:
text: Text to search for mileage intervals
Returns:
MileageIntervalMatch or None if no interval found
"""
text_lower = text.lower()
for pattern, name, confidence in self.MILEAGE_PATTERNS:
match = re.search(pattern, text_lower, re.IGNORECASE)
if match:
# Extract the number and remove commas
mileage_str = match.group(1).replace(",", "")
mileage = int(mileage_str)
if self._is_reasonable_mileage(mileage):
return MileageIntervalMatch(
value=mileage,
raw_match=match.group(0),
confidence=confidence,
pattern_name=name,
)
return None
def extract_time_interval(self, text: str) -> Optional[TimeIntervalMatch]:
"""
Extract time interval from text.
Args:
text: Text to search for time intervals
Returns:
TimeIntervalMatch or None if no interval found
"""
text_lower = text.lower()
for pattern, name, confidence in self.TIME_PATTERNS:
match = re.search(pattern, text_lower, re.IGNORECASE)
if match:
# Handle special cases
if name == "annually":
months = 12
elif name == "semi_annual":
months = 6
elif name == "every_year":
months = 12
elif name == "every_years":
years = int(match.group(1))
months = years * 12
else:
months = int(match.group(1))
if self._is_reasonable_months(months):
return TimeIntervalMatch(
value=months,
raw_match=match.group(0),
confidence=confidence,
pattern_name=name,
)
return None
def extract_fluid_spec(self, text: str) -> Optional[FluidSpecMatch]:
"""
Extract fluid specification from text.
Args:
text: Text to search for fluid specs
Returns:
FluidSpecMatch or None if no spec found
"""
for pattern, fluid_type, confidence in self.FLUID_PATTERNS:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return FluidSpecMatch(
value=match.group(1).upper() if fluid_type != "coolant" else match.group(1),
fluid_type=fluid_type,
raw_match=match.group(0),
confidence=confidence,
)
return None
def extract_all_fluid_specs(self, text: str) -> list[FluidSpecMatch]:
"""
Extract all fluid specifications from text.
Args:
text: Text to search for fluid specs
Returns:
List of FluidSpecMatch objects
"""
results = []
seen_values: set[str] = set()
for pattern, fluid_type, confidence in self.FLUID_PATTERNS:
for match in re.finditer(pattern, text, re.IGNORECASE):
value = match.group(1).upper() if fluid_type != "coolant" else match.group(1)
if value not in seen_values:
seen_values.add(value)
results.append(
FluidSpecMatch(
value=value,
fluid_type=fluid_type,
raw_match=match.group(0),
confidence=confidence,
)
)
return results
def extract_combined_interval(
self, text: str
) -> tuple[Optional[MileageIntervalMatch], Optional[TimeIntervalMatch]]:
"""
Extract both mileage and time intervals from a combined pattern.
Many schedules use patterns like "every 5,000 miles or 6 months".
Args:
text: Text to search
Returns:
Tuple of (mileage_match, time_match)
"""
mileage = self.extract_mileage_interval(text)
time = self.extract_time_interval(text)
return mileage, time
def _is_reasonable_mileage(self, mileage: int) -> bool:
"""Check if mileage interval is reasonable for maintenance."""
# Typical ranges: 1,000 to 100,000 miles
return 500 <= mileage <= 150000
def _is_reasonable_months(self, months: int) -> bool:
"""Check if month interval is reasonable for maintenance."""
# Typical ranges: 1 to 120 months (10 years)
return 1 <= months <= 120
# Singleton instance
maintenance_matcher = MaintenancePatternMatcher()

View File

@@ -0,0 +1,259 @@
"""Service name normalization and mapping to maintenance subtypes."""
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class ServiceMapping:
"""Mapping result from extracted text to maintenance subtypes."""
normalized_name: str # Standardized service name
subtypes: list[str] # Maintenance subtypes from the system
category: str # routine_maintenance, repair, performance_upgrade
confidence: float
# Maintenance subtypes from the system (must match exactly)
ROUTINE_MAINTENANCE_SUBTYPES = [
"Accelerator Pedal",
"Air Filter Element",
"Brakes and Traction Control",
"Cabin Air Filter / Purifier",
"Coolant",
"Doors",
"Drive Belt",
"Engine Oil",
"Evaporative Emissions System",
"Exhaust System",
"Fluid - A/T",
"Fluid - Differential",
"Fluid - M/T",
"Fluid Filter - A/T",
"Fluids",
"Fuel Delivery and Air Induction",
"Hood Shock / Support",
"Neutral Safety Switch",
"Parking Brake System",
"Restraints and Safety Systems",
"Shift Interlock A/T",
"Spark Plug",
"Steering and Suspension",
"Tires",
"Trunk / Liftgate Shock / Support",
"Washer Fluid",
"Wiper Blade",
]
class ServiceMapper:
"""Map extracted service names to maintenance subtypes."""
# Mapping from common service terms to system subtypes
# Keys are lowercase patterns, values are (normalized_name, subtypes, category, confidence)
SERVICE_MAPPINGS: dict[str, tuple[str, list[str], str, float]] = {
# Oil related
"engine oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
"oil change": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
"motor oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.93),
"oil and filter": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
"oil & filter": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
"change engine oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
"replace engine oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
# Air filter
"air filter": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.90),
"engine air filter": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.95),
"air cleaner": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.88),
"air cleaner element": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.93),
"replace air filter": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.95),
# Cabin filter
"cabin air filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.95),
"cabin filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.93),
"a/c filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.88),
"hvac filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.88),
"interior air filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.90),
"dust and pollen filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.90),
# Tires
"tire rotation": ("Tire Rotation", ["Tires"], "routine_maintenance", 0.98),
"rotate tires": ("Tire Rotation", ["Tires"], "routine_maintenance", 0.95),
"tire inspection": ("Tire Inspection", ["Tires"], "routine_maintenance", 0.93),
"inspect tires": ("Tire Inspection", ["Tires"], "routine_maintenance", 0.93),
"check tire pressure": ("Tire Pressure Check", ["Tires"], "routine_maintenance", 0.90),
"tire pressure": ("Tire Pressure Check", ["Tires"], "routine_maintenance", 0.85),
# Brakes
"brake inspection": ("Brake Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.95),
"inspect brakes": ("Brake Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.93),
"brake fluid": ("Brake Fluid Service", ["Brakes and Traction Control"], "routine_maintenance", 0.93),
"brake pads": ("Brake Pad Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.90),
"parking brake": ("Parking Brake Inspection", ["Parking Brake System"], "routine_maintenance", 0.93),
# Coolant
"coolant": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.90),
"engine coolant": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.93),
"antifreeze": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.90),
"cooling system": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.88),
"radiator fluid": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.88),
"replace coolant": ("Coolant Replacement", ["Coolant"], "routine_maintenance", 0.95),
# Transmission
"transmission fluid": ("Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.93),
"automatic transmission fluid": ("Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.95),
"atf": ("Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.90),
"manual transmission fluid": ("Manual Transmission Fluid", ["Fluid - M/T"], "routine_maintenance", 0.95),
"cvt fluid": ("CVT Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.93),
"transmission filter": ("Transmission Filter", ["Fluid Filter - A/T"], "routine_maintenance", 0.93),
# Differential
"differential fluid": ("Differential Fluid Service", ["Fluid - Differential"], "routine_maintenance", 0.95),
"rear differential": ("Differential Fluid Service", ["Fluid - Differential"], "routine_maintenance", 0.93),
"front differential": ("Differential Fluid Service", ["Fluid - Differential"], "routine_maintenance", 0.93),
"transfer case": ("Transfer Case Fluid", ["Fluid - Differential"], "routine_maintenance", 0.90),
# Spark plugs
"spark plug": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
"spark plugs": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
"replace spark plugs": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
"ignition plugs": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.88),
# Drive belt
"drive belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.93),
"serpentine belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.93),
"accessory belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.90),
"timing belt": ("Timing Belt Service", ["Drive Belt"], "routine_maintenance", 0.90),
"v-belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.88),
# Wipers
"wiper blade": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.95),
"wiper blades": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.95),
"windshield wiper": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.93),
"replace wipers": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.93),
# Washer fluid
"washer fluid": ("Washer Fluid", ["Washer Fluid"], "routine_maintenance", 0.95),
"windshield washer": ("Washer Fluid", ["Washer Fluid"], "routine_maintenance", 0.90),
# Steering/Suspension
"steering": ("Steering Inspection", ["Steering and Suspension"], "routine_maintenance", 0.85),
"suspension": ("Suspension Inspection", ["Steering and Suspension"], "routine_maintenance", 0.85),
"power steering": ("Power Steering Fluid", ["Steering and Suspension"], "routine_maintenance", 0.90),
"power steering fluid": ("Power Steering Fluid", ["Steering and Suspension"], "routine_maintenance", 0.93),
# Exhaust
"exhaust": ("Exhaust System Inspection", ["Exhaust System"], "routine_maintenance", 0.88),
"exhaust system": ("Exhaust System Inspection", ["Exhaust System"], "routine_maintenance", 0.93),
# Fuel system
"fuel filter": ("Fuel Filter Replacement", ["Fuel Delivery and Air Induction"], "routine_maintenance", 0.93),
"fuel system": ("Fuel System Inspection", ["Fuel Delivery and Air Induction"], "routine_maintenance", 0.88),
"fuel injection": ("Fuel Injection Service", ["Fuel Delivery and Air Induction"], "routine_maintenance", 0.88),
# Emissions
"evaporative emissions": ("Evaporative Emissions Inspection", ["Evaporative Emissions System"], "routine_maintenance", 0.93),
"evap system": ("Evaporative Emissions Inspection", ["Evaporative Emissions System"], "routine_maintenance", 0.90),
"emissions": ("Evaporative Emissions Inspection", ["Evaporative Emissions System"], "routine_maintenance", 0.80),
# Safety systems
"seat belt": ("Safety Systems Inspection", ["Restraints and Safety Systems"], "routine_maintenance", 0.90),
"airbag": ("Safety Systems Inspection", ["Restraints and Safety Systems"], "routine_maintenance", 0.85),
"restraint": ("Safety Systems Inspection", ["Restraints and Safety Systems"], "routine_maintenance", 0.85),
# Miscellaneous
"battery": ("Battery Inspection", ["Fluids"], "routine_maintenance", 0.80),
"inspect battery": ("Battery Inspection", ["Fluids"], "routine_maintenance", 0.85),
"door hinges": ("Door Lubrication", ["Doors"], "routine_maintenance", 0.85),
"hood shock": ("Hood Shock Inspection", ["Hood Shock / Support"], "routine_maintenance", 0.90),
"trunk shock": ("Trunk Shock Inspection", ["Trunk / Liftgate Shock / Support"], "routine_maintenance", 0.90),
"liftgate": ("Liftgate Inspection", ["Trunk / Liftgate Shock / Support"], "routine_maintenance", 0.88),
}
# Pattern-based mappings for fuzzy matching
SERVICE_PATTERNS: list[tuple[str, str, list[str], str, float]] = [
# (regex_pattern, normalized_name, subtypes, category, confidence)
(r"oil\s+(?:and|&)\s+filter", "Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
(r"(?:change|replace)\s+(?:the\s+)?oil", "Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.93),
(r"(?:inspect|check)\s+(?:the\s+)?brakes?", "Brake Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.90),
(r"(?:inspect|check)\s+(?:the\s+)?tires?", "Tire Inspection", ["Tires"], "routine_maintenance", 0.90),
(r"(?:rotate|rotation)\s+(?:the\s+)?tires?", "Tire Rotation", ["Tires"], "routine_maintenance", 0.95),
(r"(?:replace|change)\s+(?:the\s+)?(?:engine\s+)?air\s+filter", "Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.95),
(r"(?:replace|change)\s+(?:the\s+)?cabin\s+(?:air\s+)?filter", "Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.95),
(r"(?:replace|change)\s+(?:the\s+)?spark\s+plugs?", "Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
(r"(?:replace|change)\s+(?:the\s+)?coolant", "Coolant Replacement", ["Coolant"], "routine_maintenance", 0.93),
(r"(?:flush|drain)\s+(?:the\s+)?coolant", "Coolant Flush", ["Coolant"], "routine_maintenance", 0.93),
(r"(?:replace|change)\s+(?:the\s+)?(?:a/?t|automatic\s+transmission)\s+fluid", "Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.93),
(r"(?:inspect|check)\s+(?:the\s+)?(?:drive|serpentine|accessory)\s+belt", "Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.90),
]
def map_service(self, service_text: str) -> Optional[ServiceMapping]:
"""
Map extracted service text to maintenance subtypes.
Args:
service_text: Service name or description from the manual
Returns:
ServiceMapping or None if no mapping found
"""
normalized_text = service_text.lower().strip()
# Try exact mapping first
for key, (name, subtypes, category, conf) in self.SERVICE_MAPPINGS.items():
if key in normalized_text:
return ServiceMapping(
normalized_name=name,
subtypes=subtypes,
category=category,
confidence=conf,
)
# Try pattern matching
for pattern, name, subtypes, category, conf in self.SERVICE_PATTERNS:
if re.search(pattern, normalized_text, re.IGNORECASE):
return ServiceMapping(
normalized_name=name,
subtypes=subtypes,
category=category,
confidence=conf,
)
return None
def map_service_fuzzy(self, service_text: str, threshold: float = 0.6) -> Optional[ServiceMapping]:
"""
Map service text with fuzzy matching for typos and variations.
Args:
service_text: Service name or description
threshold: Minimum similarity threshold (0.0-1.0)
Returns:
ServiceMapping or None
"""
# First try exact matching
result = self.map_service(service_text)
if result:
return result
# Fall back to word overlap matching
words = set(service_text.lower().split())
best_match: Optional[ServiceMapping] = None
best_score = 0.0
for key, (name, subtypes, category, conf) in self.SERVICE_MAPPINGS.items():
key_words = set(key.split())
overlap = len(words & key_words)
total = len(words | key_words)
if total > 0:
score = overlap / total
if score > best_score and score >= threshold:
best_score = score
best_match = ServiceMapping(
normalized_name=name,
subtypes=subtypes,
category=category,
confidence=conf * score, # Reduce confidence by match quality
)
return best_match
def get_all_service_keywords(self) -> list[str]:
"""Get all service keywords for table header detection."""
keywords = list(self.SERVICE_MAPPINGS.keys())
# Add common header terms
keywords.extend([
"service", "maintenance", "item", "operation",
"inspection", "replacement", "interval", "schedule",
])
return keywords
# Singleton instance
service_mapper = ServiceMapper()

View File

@@ -5,6 +5,12 @@ from app.preprocessors.receipt_preprocessor import (
ReceiptPreprocessor,
receipt_preprocessor,
)
from app.preprocessors.pdf_preprocessor import (
PdfPreprocessor,
pdf_preprocessor,
PdfPageContent,
PdfInfo,
)
__all__ = [
"ImagePreprocessor",
@@ -13,4 +19,8 @@ __all__ = [
"vin_preprocessor",
"ReceiptPreprocessor",
"receipt_preprocessor",
"PdfPreprocessor",
"pdf_preprocessor",
"PdfPageContent",
"PdfInfo",
]

View File

@@ -0,0 +1,353 @@
"""PDF preprocessing for owner's manual extraction."""
import io
import logging
from dataclasses import dataclass, field
from typing import Iterator, Optional
import fitz # PyMuPDF
from PIL import Image
logger = logging.getLogger(__name__)
@dataclass
class PdfPageContent:
"""Content extracted from a single PDF page."""
page_number: int
has_text: bool
text_content: str
image_bytes: Optional[bytes] # Rendered image for scanned pages
width: int
height: int
@dataclass
class PdfInfo:
"""Information about a PDF document."""
total_pages: int
has_text_layer: bool
is_scanned: bool # True if most pages lack text layer
file_size_bytes: int
title: Optional[str]
author: Optional[str]
metadata: dict = field(default_factory=dict)
class PdfPreprocessor:
"""Preprocess PDFs for OCR extraction.
Handles two scenarios:
1. Native PDFs with text layer - extract text directly
2. Scanned PDFs - render pages to images for OCR
Uses PyMuPDF (fitz) for both text extraction and image rendering.
"""
# DPI for rendering scanned pages
DEFAULT_DPI = 300
# Minimum text length to consider a page has text
MIN_TEXT_LENGTH = 50
# Maximum pages to sample for scan detection
SAMPLE_PAGES = 10
def get_pdf_info(self, pdf_bytes: bytes) -> PdfInfo:
"""
Analyze PDF and return metadata.
Args:
pdf_bytes: Raw PDF bytes
Returns:
PdfInfo with document metadata
"""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
try:
total_pages = len(doc)
metadata = doc.metadata or {}
# Sample pages to determine if scanned
text_pages = 0
sample_count = min(total_pages, self.SAMPLE_PAGES)
# Sample from beginning, middle, and end
if total_pages <= self.SAMPLE_PAGES:
sample_indices = list(range(total_pages))
else:
sample_indices = [
0, 1, 2, # Beginning
total_pages // 2 - 1, total_pages // 2, total_pages // 2 + 1, # Middle
total_pages - 3, total_pages - 2, total_pages - 1, # End
]
sample_indices = [i for i in sample_indices if 0 <= i < total_pages]
for page_idx in sample_indices:
page = doc[page_idx]
text = page.get_text().strip()
if len(text) >= self.MIN_TEXT_LENGTH:
text_pages += 1
# Consider it a scanned PDF if less than half of sampled pages have text
has_text_layer = text_pages > 0
is_scanned = text_pages < len(sample_indices) / 2
return PdfInfo(
total_pages=total_pages,
has_text_layer=has_text_layer,
is_scanned=is_scanned,
file_size_bytes=len(pdf_bytes),
title=metadata.get("title"),
author=metadata.get("author"),
metadata=metadata,
)
finally:
doc.close()
def extract_text_from_page(
self, pdf_bytes: bytes, page_number: int
) -> PdfPageContent:
"""
Extract content from a single PDF page.
Args:
pdf_bytes: Raw PDF bytes
page_number: Zero-indexed page number
Returns:
PdfPageContent with text and/or image
"""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
try:
if page_number >= len(doc):
raise ValueError(f"Page {page_number} does not exist (max: {len(doc) - 1})")
page = doc[page_number]
text = page.get_text().strip()
has_text = len(text) >= self.MIN_TEXT_LENGTH
rect = page.rect
width = int(rect.width)
height = int(rect.height)
# If page has text, we don't need to render
image_bytes = None
if not has_text:
image_bytes = self._render_page_to_image(page, self.DEFAULT_DPI)
return PdfPageContent(
page_number=page_number,
has_text=has_text,
text_content=text,
image_bytes=image_bytes,
width=width,
height=height,
)
finally:
doc.close()
def extract_all_pages(
self,
pdf_bytes: bytes,
dpi: int = DEFAULT_DPI,
force_ocr: bool = False,
) -> Iterator[PdfPageContent]:
"""
Extract content from all pages as a generator.
Args:
pdf_bytes: Raw PDF bytes
dpi: DPI for rendering scanned pages
force_ocr: If True, render all pages regardless of text layer
Yields:
PdfPageContent for each page
"""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
try:
for page_number in range(len(doc)):
page = doc[page_number]
text = page.get_text().strip()
has_text = len(text) >= self.MIN_TEXT_LENGTH
rect = page.rect
width = int(rect.width)
height = int(rect.height)
# Render to image if no text or force_ocr
image_bytes = None
if not has_text or force_ocr:
image_bytes = self._render_page_to_image(page, dpi)
yield PdfPageContent(
page_number=page_number,
has_text=has_text,
text_content=text if has_text else "",
image_bytes=image_bytes,
width=width,
height=height,
)
finally:
doc.close()
def extract_page_range(
self,
pdf_bytes: bytes,
start_page: int,
end_page: int,
dpi: int = DEFAULT_DPI,
) -> list[PdfPageContent]:
"""
Extract content from a range of pages.
Args:
pdf_bytes: Raw PDF bytes
start_page: First page (zero-indexed)
end_page: Last page (exclusive)
dpi: DPI for rendering
Returns:
List of PdfPageContent
"""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
try:
results = []
end_page = min(end_page, len(doc))
for page_number in range(start_page, end_page):
page = doc[page_number]
text = page.get_text().strip()
has_text = len(text) >= self.MIN_TEXT_LENGTH
rect = page.rect
width = int(rect.width)
height = int(rect.height)
image_bytes = None
if not has_text:
image_bytes = self._render_page_to_image(page, dpi)
results.append(
PdfPageContent(
page_number=page_number,
has_text=has_text,
text_content=text if has_text else "",
image_bytes=image_bytes,
width=width,
height=height,
)
)
return results
finally:
doc.close()
def find_maintenance_section(
self, pdf_bytes: bytes, keywords: Optional[list[str]] = None
) -> list[int]:
"""
Find pages likely containing maintenance schedules.
Args:
pdf_bytes: Raw PDF bytes
keywords: Keywords to search for (defaults to common terms)
Returns:
List of page numbers likely containing maintenance info
"""
if keywords is None:
keywords = [
"maintenance schedule",
"maintenance interval",
"service schedule",
"service interval",
"recommended maintenance",
"scheduled maintenance",
"routine maintenance",
"periodic maintenance",
"owner's maintenance",
"maintenance requirements",
]
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
try:
maintenance_pages = []
for page_number in range(len(doc)):
page = doc[page_number]
text = page.get_text().lower()
for keyword in keywords:
if keyword.lower() in text:
maintenance_pages.append(page_number)
break
return maintenance_pages
finally:
doc.close()
def _render_page_to_image(self, page: fitz.Page, dpi: int) -> bytes:
"""
Render a PDF page to PNG image bytes.
Args:
page: PyMuPDF page object
dpi: Target DPI for rendering
Returns:
PNG image bytes
"""
# Calculate scale factor from DPI
# Default PDF resolution is 72 DPI
scale = dpi / 72.0
matrix = fitz.Matrix(scale, scale)
# Render page to pixmap
pixmap = page.get_pixmap(matrix=matrix)
# Convert to PNG bytes
png_bytes = pixmap.tobytes("png")
return png_bytes
def render_page_for_table_detection(
self, pdf_bytes: bytes, page_number: int, dpi: int = 150
) -> bytes:
"""
Render a page at lower DPI for table detection (faster).
Args:
pdf_bytes: Raw PDF bytes
page_number: Page to render
dpi: DPI for rendering (lower for faster processing)
Returns:
PNG image bytes
"""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
try:
if page_number >= len(doc):
raise ValueError(f"Page {page_number} does not exist")
page = doc[page_number]
return self._render_page_to_image(page, dpi)
finally:
doc.close()
# Singleton instance
pdf_preprocessor = PdfPreprocessor()

View File

@@ -2,19 +2,24 @@
import logging
from typing import Optional
from fastapi import APIRouter, File, Form, HTTPException, Query, UploadFile
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, Query, UploadFile
from app.extractors.vin_extractor import vin_extractor
from app.extractors.receipt_extractor import receipt_extractor
from app.extractors.manual_extractor import manual_extractor
from app.models import (
BoundingBox,
ManualExtractionResponse,
ManualJobResponse,
ManualMaintenanceSchedule,
ManualVehicleInfo,
OcrResponse,
ReceiptExtractedField,
ReceiptExtractionResponse,
VinAlternative,
VinExtractionResponse,
)
from app.services import ocr_service
from app.services import ocr_service, job_queue
logger = logging.getLogger(__name__)
@@ -23,6 +28,9 @@ router = APIRouter(prefix="/extract", tags=["extract"])
# Maximum file size for synchronous processing (10MB)
MAX_SYNC_SIZE = 10 * 1024 * 1024
# Maximum file size for manual/PDF processing (200MB)
MAX_MANUAL_SIZE = 200 * 1024 * 1024
@router.post("", response_model=OcrResponse)
async def extract_text(
@@ -257,3 +265,166 @@ async def extract_receipt(
processingTimeMs=result.processing_time_ms,
error=result.error,
)
@router.post("/manual", response_model=ManualJobResponse)
async def extract_manual(
background_tasks: BackgroundTasks,
file: UploadFile = File(..., description="Owner's manual PDF file"),
vehicle_id: Optional[str] = Form(None, description="Vehicle ID for context"),
) -> ManualJobResponse:
"""
Submit an async job to extract maintenance schedules from an owner's manual.
Supports PDF files up to 200MB. Processing is done asynchronously due to
the time required for large documents.
Pipeline:
1. Analyze PDF structure (text layer vs scanned)
2. Find maintenance schedule sections
3. Extract text or perform OCR on scanned pages
4. Detect and parse maintenance tables
5. Extract service intervals and fluid specifications
- **file**: Owner's manual PDF (max 200MB)
- **vehicle_id**: Optional vehicle ID for context
Returns immediately with job_id. Poll GET /jobs/{job_id} for status and results.
Response when completed:
- **vehicleInfo**: Detected make/model/year
- **maintenanceSchedules**: List of extracted maintenance items with intervals
- **rawTables**: Metadata about detected tables
- **processingTimeMs**: Total processing time
"""
# Validate file presence
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Validate file type
content_type = file.content_type or ""
if not content_type.startswith("application/pdf") and not file.filename.lower().endswith(".pdf"):
raise HTTPException(
status_code=400,
detail="File must be a PDF document",
)
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > MAX_MANUAL_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Max: {MAX_MANUAL_SIZE // (1024*1024)}MB.",
)
if file_size == 0:
raise HTTPException(status_code=400, detail="Empty file provided")
logger.info(
f"Manual extraction: {file.filename}, "
f"size: {file_size} bytes, "
f"vehicle_id: {vehicle_id}"
)
# Estimate processing time based on file size
# Rough estimate: 1 second per MB for native PDFs, 3 seconds for scanned
estimated_seconds = max(30, (file_size // (1024 * 1024)) * 2)
# Submit job to queue
job_id = await job_queue.submit_manual_job(
file_bytes=content,
vehicle_id=vehicle_id,
)
# Schedule background processing
background_tasks.add_task(process_manual_job, job_id)
# Return initial status
return ManualJobResponse(
jobId=job_id,
status="pending",
progress=0,
estimatedSeconds=estimated_seconds,
)
async def process_manual_job(job_id: str) -> None:
"""Background task to process a manual extraction job."""
import asyncio
logger.info(f"Starting manual extraction job {job_id}")
try:
# Update status to processing
await job_queue.update_manual_job_progress(job_id, 5, "Starting extraction")
# Get job data
file_bytes = await job_queue.get_job_data(job_id)
if not file_bytes:
await job_queue.fail_manual_job(job_id, "Job data not found")
return
# Define progress callback
async def progress_callback(percent: int, message: str) -> None:
await job_queue.update_manual_job_progress(job_id, percent, message)
# Run extraction in thread pool (CPU-bound)
loop = asyncio.get_event_loop()
def sync_progress_callback(percent: int, message: str) -> None:
# Schedule the async update
asyncio.run_coroutine_threadsafe(
job_queue.update_manual_job_progress(job_id, percent, message),
loop,
)
result = await loop.run_in_executor(
None,
lambda: manual_extractor.extract(
pdf_bytes=file_bytes,
progress_callback=sync_progress_callback,
),
)
if result.success:
# Convert to response model
vehicle_info = None
if result.vehicle_info:
vehicle_info = ManualVehicleInfo(
make=result.vehicle_info.make,
model=result.vehicle_info.model,
year=result.vehicle_info.year,
)
schedules = [
ManualMaintenanceSchedule(
service=s.service,
intervalMiles=s.interval_miles,
intervalMonths=s.interval_months,
details=s.details,
confidence=s.confidence,
subtypes=s.subtypes,
)
for s in result.maintenance_schedules
]
response = ManualExtractionResponse(
success=True,
vehicleInfo=vehicle_info,
maintenanceSchedules=schedules,
rawTables=result.raw_tables,
processingTimeMs=result.processing_time_ms,
totalPages=result.total_pages,
pagesProcessed=result.pages_processed,
)
await job_queue.complete_manual_job(job_id, response)
else:
await job_queue.fail_manual_job(job_id, result.error or "Extraction failed")
except Exception as e:
logger.error(f"Manual job {job_id} failed: {e}", exc_info=True)
await job_queue.fail_manual_job(job_id, str(e))

View File

@@ -1,11 +1,11 @@
"""Async OCR job endpoints."""
import asyncio
import logging
from typing import Optional
from typing import Optional, Union
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile
from app.models import JobResponse, JobSubmitRequest
from app.models import JobResponse, JobSubmitRequest, ManualJobResponse
from app.services import job_queue, ocr_service
logger = logging.getLogger(__name__)
@@ -73,12 +73,13 @@ async def submit_job(
)
@router.get("/{job_id}", response_model=JobResponse)
async def get_job_status(job_id: str) -> JobResponse:
@router.get("/{job_id}", response_model=Union[JobResponse, ManualJobResponse])
async def get_job_status(job_id: str) -> Union[JobResponse, ManualJobResponse]:
"""
Get the status of an async OCR job.
Poll this endpoint to check job progress and retrieve results.
Works for both regular OCR jobs and manual extraction jobs.
Returns:
- **pending**: Job is queued
@@ -86,15 +87,20 @@ async def get_job_status(job_id: str) -> JobResponse:
- **completed**: Job finished successfully (includes result)
- **failed**: Job failed (includes error message)
"""
# Try regular job first
result = await job_queue.get_job_status(job_id)
if result is not None:
return result
if result is None:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} not found. Jobs expire after 1 hour.",
)
# Try manual job
manual_result = await job_queue.get_manual_job_status(job_id)
if manual_result is not None:
return manual_result
return result
raise HTTPException(
status_code=404,
detail=f"Job {job_id} not found. Jobs expire after 1-2 hours.",
)
async def process_job(job_id: str) -> None:

View File

@@ -3,23 +3,34 @@ import asyncio
import json
import logging
import uuid
from typing import Optional
from typing import Optional, TYPE_CHECKING
import redis.asyncio as redis
from app.config import settings
from app.models import JobResponse, JobStatus, OcrResponse
if TYPE_CHECKING:
from app.models import ManualExtractionResponse, ManualJobResponse
logger = logging.getLogger(__name__)
# Job TTL in seconds (1 hour)
JOB_TTL = 3600
# Manual job TTL (2 hours for larger files)
MANUAL_JOB_TTL = 7200
# Key prefixes
JOB_PREFIX = "ocr:job:"
JOB_DATA_PREFIX = "ocr:job:data:"
JOB_RESULT_PREFIX = "ocr:job:result:"
# Manual job prefixes
MANUAL_JOB_PREFIX = "ocr:manual:job:"
MANUAL_JOB_DATA_PREFIX = "ocr:manual:job:data:"
MANUAL_JOB_RESULT_PREFIX = "ocr:manual:job:result:"
class JobQueue:
"""Manages async OCR jobs using Redis."""
@@ -228,6 +239,156 @@ class JobQueue:
except Exception as e:
logger.error(f"Callback failed for job {job_id}: {e}")
# Manual extraction job methods
async def submit_manual_job(
self,
file_bytes: bytes,
vehicle_id: Optional[str] = None,
) -> str:
"""
Submit a new manual extraction job.
Args:
file_bytes: Raw PDF bytes
vehicle_id: Optional vehicle ID for context
Returns:
Job ID
"""
r = await self.get_redis()
job_id = str(uuid.uuid4())
# Store job metadata
job_meta = {
"status": JobStatus.PENDING.value,
"progress": 0,
"progress_message": "",
"vehicle_id": vehicle_id or "",
"job_type": "manual",
}
# Store file data separately (binary)
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
# Use pipeline for atomic operation
async with r.pipeline() as pipe:
# Store metadata as hash
await pipe.hset(meta_key, mapping=job_meta) # type: ignore
await pipe.expire(meta_key, MANUAL_JOB_TTL)
# Store binary data
await pipe.set(data_key, file_bytes)
await pipe.expire(data_key, MANUAL_JOB_TTL)
await pipe.execute()
logger.info(f"Manual job {job_id} submitted")
return job_id
async def get_manual_job_status(self, job_id: str) -> Optional["ManualJobResponse"]:
"""
Get the status of a manual extraction job.
Args:
job_id: Job ID to check
Returns:
ManualJobResponse or None if job doesn't exist
"""
from app.models import ManualJobResponse, ManualExtractionResponse
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
# Get job metadata
meta = await r.hgetall(meta_key) # type: ignore
if not meta:
return None
status = JobStatus(meta.get("status", JobStatus.PENDING.value))
progress = int(meta.get("progress", 0))
error = meta.get("error")
# Get result if completed
result = None
if status == JobStatus.COMPLETED:
result_json = await r.get(result_key)
if result_json:
result_dict = json.loads(result_json)
result = ManualExtractionResponse(**result_dict)
return ManualJobResponse(
jobId=job_id,
status=status,
progress=progress if status == JobStatus.PROCESSING else None,
result=result,
error=error if status == JobStatus.FAILED else None,
)
async def update_manual_job_progress(
self, job_id: str, progress: int, message: str = ""
) -> None:
"""Update manual job progress percentage and message."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
await r.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.PROCESSING.value,
"progress": progress,
"progress_message": message,
})
async def complete_manual_job(
self, job_id: str, result: "ManualExtractionResponse"
) -> None:
"""Mark manual job as completed with result."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
# Store result
result_dict = result.model_dump(by_alias=True)
result_json = json.dumps(result_dict)
async with r.pipeline() as pipe:
# Update status
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.COMPLETED.value,
"progress": 100,
})
# Store result
await pipe.set(result_key, result_json)
await pipe.expire(result_key, MANUAL_JOB_TTL)
# Delete file data (no longer needed)
await pipe.delete(data_key)
await pipe.execute()
logger.info(f"Manual job {job_id} completed")
async def fail_manual_job(self, job_id: str, error: str) -> None:
"""Mark manual job as failed with error message."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
async with r.pipeline() as pipe:
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.FAILED.value,
"error": error,
})
# Delete file data
await pipe.delete(data_key)
await pipe.execute()
logger.error(f"Manual job {job_id} failed: {error}")
# Singleton instance
job_queue = JobQueue()

View File

@@ -0,0 +1,12 @@
"""Table extraction components for maintenance schedule parsing."""
from app.table_extraction.detector import TableDetector, table_detector, DetectedTable
from app.table_extraction.parser import TableParser, table_parser, ParsedScheduleRow
__all__ = [
"TableDetector",
"table_detector",
"DetectedTable",
"TableParser",
"table_parser",
"ParsedScheduleRow",
]

View File

@@ -0,0 +1,322 @@
"""Table detection for maintenance schedule extraction."""
import io
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
import cv2
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
@dataclass
class DetectedTable:
"""A detected table in a document."""
page_number: int
x: int
y: int
width: int
height: int
confidence: float
is_maintenance_table: bool
header_row: Optional[list[str]] = None
raw_content: list[list[str]] = field(default_factory=list)
class TableDetector:
"""Detect tables in document pages.
Uses computer vision techniques to identify table regions:
1. Line detection for bordered tables
2. Text alignment analysis for borderless tables
3. Header keyword matching for maintenance schedule identification
"""
# Keywords indicating maintenance schedule table headers
MAINTENANCE_HEADERS = [
"service", "maintenance", "item", "operation",
"miles", "mi", "km", "kilometers",
"months", "mo", "interval",
"check", "replace", "inspect", "change",
"schedule", "frequency",
]
# Keywords in content that indicate maintenance
MAINTENANCE_CONTENT_KEYWORDS = [
"oil", "filter", "brake", "tire", "coolant",
"fluid", "spark plug", "belt", "hose",
"inspect", "replace", "change", "check",
]
def detect_tables_in_image(
self, image_bytes: bytes, page_number: int = 0
) -> list[DetectedTable]:
"""
Detect tables in an image using line detection.
Args:
image_bytes: PNG/JPEG image bytes
page_number: Page number for the result
Returns:
List of DetectedTable objects
"""
# Load image
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
if img is None:
logger.warning("Failed to decode image for table detection")
return []
# Apply threshold
_, binary = cv2.threshold(img, 150, 255, cv2.THRESH_BINARY_INV)
# Detect horizontal lines
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
horizontal_lines = cv2.morphologyEx(
binary, cv2.MORPH_OPEN, horizontal_kernel, iterations=2
)
# Detect vertical lines
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40))
vertical_lines = cv2.morphologyEx(
binary, cv2.MORPH_OPEN, vertical_kernel, iterations=2
)
# Combine lines
table_mask = cv2.add(horizontal_lines, vertical_lines)
# Find contours
contours, _ = cv2.findContours(
table_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
tables = []
height, width = img.shape[:2]
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
# Filter by size (tables should be reasonably large)
if w < width * 0.3 or h < height * 0.05:
continue
if w > width * 0.95 and h > height * 0.95:
continue # Skip full-page rectangles
# Calculate confidence based on aspect ratio and size
aspect_ratio = w / h if h > 0 else 0
size_ratio = (w * h) / (width * height)
# Tables typically have reasonable aspect ratios
if 0.5 <= aspect_ratio <= 10 and 0.01 <= size_ratio <= 0.8:
confidence = min(0.9, 0.5 + size_ratio + (1 - abs(aspect_ratio - 2) / 10))
tables.append(
DetectedTable(
page_number=page_number,
x=x,
y=y,
width=w,
height=h,
confidence=confidence,
is_maintenance_table=False, # Will be determined later
)
)
logger.debug(f"Detected {len(tables)} potential tables on page {page_number}")
return tables
def detect_tables_in_text(
self, text: str, page_number: int = 0
) -> list[DetectedTable]:
"""
Detect table-like structures in text using pattern analysis.
Useful for native PDFs where text is available.
Args:
text: Extracted text content
page_number: Page number
Returns:
List of DetectedTable with content populated
"""
tables = []
lines = text.split("\n")
# Look for patterns that suggest tabular data
# - Multiple columns separated by whitespace or tabs
# - Consistent column alignment across rows
current_table_lines: list[str] = []
in_table = False
table_start_idx = 0
for i, line in enumerate(lines):
# Check if line looks like table row
is_table_row = self._is_table_row(line)
if is_table_row:
if not in_table:
in_table = True
table_start_idx = i
current_table_lines = []
current_table_lines.append(line)
else:
if in_table and len(current_table_lines) >= 3:
# End of table, process it
table = self._process_text_table(
current_table_lines, page_number, table_start_idx
)
if table:
tables.append(table)
in_table = False
current_table_lines = []
# Handle table at end of text
if in_table and len(current_table_lines) >= 3:
table = self._process_text_table(
current_table_lines, page_number, table_start_idx
)
if table:
tables.append(table)
return tables
def is_maintenance_table(
self, table: DetectedTable, full_text: Optional[str] = None
) -> bool:
"""
Determine if a detected table is a maintenance schedule.
Args:
table: Detected table to analyze
full_text: Optional surrounding text for context
Returns:
True if likely a maintenance schedule table
"""
# Check header row for maintenance keywords
if table.header_row:
header_text = " ".join(table.header_row).lower()
header_matches = sum(
1 for kw in self.MAINTENANCE_HEADERS if kw in header_text
)
if header_matches >= 2:
return True
# Check content for maintenance keywords
if table.raw_content:
content_text = " ".join(
" ".join(row) for row in table.raw_content
).lower()
content_matches = sum(
1 for kw in self.MAINTENANCE_CONTENT_KEYWORDS if kw in content_text
)
if content_matches >= 3:
return True
# Check surrounding text
if full_text:
text_lower = full_text.lower()
context_keywords = [
"maintenance schedule",
"service schedule",
"maintenance interval",
"recommended maintenance",
]
if any(kw in text_lower for kw in context_keywords):
return True
return False
def _is_table_row(self, line: str) -> bool:
"""Check if a line looks like a table row."""
# Skip empty lines
stripped = line.strip()
if not stripped:
return False
# Check for multiple whitespace-separated columns
parts = re.split(r"\s{2,}|\t", stripped)
if len(parts) >= 2:
# At least 2 columns with content
non_empty = [p for p in parts if p.strip()]
return len(non_empty) >= 2
# Check for common table patterns
# e.g., "Service Item 5,000 miles 6 months"
if re.search(r"\d+[,.]?\d*\s*(miles?|mi\.?|km|months?|mo\.?)", stripped, re.I):
return True
return False
def _process_text_table(
self, lines: list[str], page_number: int, start_line: int
) -> Optional[DetectedTable]:
"""Process extracted text lines into a table structure."""
if not lines:
return None
# Parse rows
rows = []
for line in lines:
# Split on multiple whitespace or tabs
parts = re.split(r"\s{2,}|\t", line.strip())
cells = [p.strip() for p in parts if p.strip()]
if cells:
rows.append(cells)
if len(rows) < 2:
return None
# First row is likely header
header_row = rows[0]
# Check if this looks like a maintenance table
table = DetectedTable(
page_number=page_number,
x=0, # Text tables don't have coordinates
y=start_line,
width=0,
height=len(rows),
confidence=0.7,
is_maintenance_table=False,
header_row=header_row,
raw_content=rows[1:],
)
# Determine if it's a maintenance table
table.is_maintenance_table = self.is_maintenance_table(table)
if table.is_maintenance_table:
table.confidence = 0.85
return table
def extract_table_text_from_region(
self, image_bytes: bytes, table: DetectedTable
) -> list[list[str]]:
"""
Extract text from a table region using OCR.
Args:
image_bytes: Full page image
table: Detected table with coordinates
Returns:
2D list of cell contents
"""
# This would use Tesseract on the cropped region
# For now, return empty - actual OCR will be done in manual_extractor
logger.debug(f"Table region: ({table.x}, {table.y}) {table.width}x{table.height}")
return []
# Singleton instance
table_detector = TableDetector()

View File

@@ -0,0 +1,357 @@
"""Parse maintenance schedule tables into structured data."""
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
from app.patterns.maintenance_patterns import maintenance_matcher
from app.patterns.service_mapping import service_mapper
logger = logging.getLogger(__name__)
@dataclass
class ParsedScheduleRow:
"""A parsed maintenance schedule row."""
service: str
normalized_service: Optional[str]
subtypes: list[str]
interval_miles: Optional[int]
interval_months: Optional[int]
details: Optional[str]
fluid_spec: Optional[str]
confidence: float
raw_row: list[str] = field(default_factory=list)
class TableParser:
"""Parse detected tables into maintenance schedules.
Handles various table formats:
- Service | Miles | Months | Notes
- Service | Interval | Description
- Miles/Months header with service rows
"""
# Common column header patterns
COLUMN_PATTERNS = {
"service": [
r"service", r"item", r"maintenance", r"operation",
r"component", r"part", r"system", r"description",
],
"miles": [
r"miles?", r"mi\.?", r"mileage", r"odometer",
r"km", r"kilometers?",
],
"months": [
r"months?", r"mo\.?", r"time", r"interval",
r"years?", r"yr\.?",
],
"details": [
r"notes?", r"details?", r"remarks?", r"comments?",
r"specification", r"specs?", r"procedure",
],
}
def parse_table(
self,
header_row: list[str],
data_rows: list[list[str]],
) -> list[ParsedScheduleRow]:
"""
Parse a maintenance table into structured schedule rows.
Args:
header_row: Table header cells
data_rows: Table data rows
Returns:
List of ParsedScheduleRow objects
"""
# Identify column types
column_types = self._identify_columns(header_row)
if not column_types:
logger.warning("Could not identify table columns")
return self._parse_without_headers(data_rows)
results = []
for row in data_rows:
parsed = self._parse_row(row, column_types)
if parsed:
results.append(parsed)
return results
def parse_text_block(self, text: str) -> list[ParsedScheduleRow]:
"""
Parse maintenance schedules from unstructured text.
Useful when table detection fails but text contains schedule info.
Args:
text: Text block that may contain maintenance schedules
Returns:
List of ParsedScheduleRow objects
"""
results = []
lines = text.split("\n")
for line in lines:
# Look for lines with service + interval pattern
service_match = service_mapper.map_service(line)
mileage_match = maintenance_matcher.extract_mileage_interval(line)
time_match = maintenance_matcher.extract_time_interval(line)
if service_match and (mileage_match or time_match):
# Extract fluid spec if present
fluid_match = maintenance_matcher.extract_fluid_spec(line)
results.append(
ParsedScheduleRow(
service=line.strip(),
normalized_service=service_match.normalized_name,
subtypes=service_match.subtypes,
interval_miles=mileage_match.value if mileage_match else None,
interval_months=time_match.value if time_match else None,
details=None,
fluid_spec=fluid_match.value if fluid_match else None,
confidence=min(
service_match.confidence,
mileage_match.confidence if mileage_match else 1.0,
time_match.confidence if time_match else 1.0,
),
raw_row=[line],
)
)
return results
def _identify_columns(
self, header_row: list[str]
) -> dict[int, str]:
"""
Identify column types from header row.
Args:
header_row: Table header cells
Returns:
Dict mapping column index to type
"""
column_types: dict[int, str] = {}
for i, header in enumerate(header_row):
header_lower = header.lower().strip()
for col_type, patterns in self.COLUMN_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, header_lower, re.IGNORECASE):
column_types[i] = col_type
break
if i in column_types:
break
# If no service column found, assume first column
if "service" not in column_types.values() and header_row:
for i, header in enumerate(header_row):
if i not in column_types:
column_types[i] = "service"
break
return column_types
def _parse_row(
self,
row: list[str],
column_types: dict[int, str],
) -> Optional[ParsedScheduleRow]:
"""
Parse a single data row using identified column types.
Args:
row: Table row cells
column_types: Column index to type mapping
Returns:
ParsedScheduleRow or None
"""
service = ""
interval_miles: Optional[int] = None
interval_months: Optional[int] = None
details: Optional[str] = None
fluid_spec: Optional[str] = None
# Extract values based on column types
for i, cell in enumerate(row):
cell_value = cell.strip()
if not cell_value:
continue
col_type = column_types.get(i)
if col_type == "service":
service = cell_value
elif col_type == "miles":
miles = self._extract_miles(cell_value)
if miles:
interval_miles = miles
elif col_type == "months":
months = self._extract_months(cell_value)
if months:
interval_months = months
elif col_type == "details":
details = cell_value
# Also check for fluid specs in details
fluid_match = maintenance_matcher.extract_fluid_spec(cell_value)
if fluid_match:
fluid_spec = fluid_match.value
# If no explicit miles/months columns, try to extract from service text
if not interval_miles and not interval_months:
mileage_match = maintenance_matcher.extract_mileage_interval(service)
time_match = maintenance_matcher.extract_time_interval(service)
if mileage_match:
interval_miles = mileage_match.value
if time_match:
interval_months = time_match.value
# Check for intervals in any cell
if not interval_miles:
for cell in row:
mileage_match = maintenance_matcher.extract_mileage_interval(cell)
if mileage_match:
interval_miles = mileage_match.value
break
if not interval_months:
for cell in row:
time_match = maintenance_matcher.extract_time_interval(cell)
if time_match:
interval_months = time_match.value
break
# Skip if no service identified
if not service:
return None
# Map service to normalized name and subtypes
service_match = service_mapper.map_service(service)
normalized_service = service_match.normalized_name if service_match else None
subtypes = service_match.subtypes if service_match else []
service_confidence = service_match.confidence if service_match else 0.5
# Calculate overall confidence
interval_confidence = 0.0
if interval_miles:
interval_confidence = max(interval_confidence, 0.8)
if interval_months:
interval_confidence = max(interval_confidence, 0.8)
confidence = (service_confidence + interval_confidence) / 2 if interval_confidence else service_confidence * 0.7
return ParsedScheduleRow(
service=service,
normalized_service=normalized_service,
subtypes=subtypes,
interval_miles=interval_miles,
interval_months=interval_months,
details=details,
fluid_spec=fluid_spec,
confidence=confidence,
raw_row=row,
)
def _parse_without_headers(
self, data_rows: list[list[str]]
) -> list[ParsedScheduleRow]:
"""
Parse table without clear headers by analyzing content.
Args:
data_rows: Table rows
Returns:
List of ParsedScheduleRow
"""
results = []
for row in data_rows:
if not row:
continue
# Join all cells and try to extract info
row_text = " ".join(row)
service_match = service_mapper.map_service(row_text)
mileage_match = maintenance_matcher.extract_mileage_interval(row_text)
time_match = maintenance_matcher.extract_time_interval(row_text)
fluid_match = maintenance_matcher.extract_fluid_spec(row_text)
if service_match:
results.append(
ParsedScheduleRow(
service=row[0] if row else row_text,
normalized_service=service_match.normalized_name,
subtypes=service_match.subtypes,
interval_miles=mileage_match.value if mileage_match else None,
interval_months=time_match.value if time_match else None,
details=None,
fluid_spec=fluid_match.value if fluid_match else None,
confidence=service_match.confidence * 0.8, # Reduce for no-header parsing
raw_row=row,
)
)
return results
def _extract_miles(self, text: str) -> Optional[int]:
"""Extract mileage value from cell text."""
# First try pattern matcher
match = maintenance_matcher.extract_mileage_interval(text)
if match:
return match.value
# Try simple number extraction
# Look for patterns like "5,000", "5000", "5K"
number_match = re.search(r"([\d,]+)(?:K)?", text.replace(" ", ""), re.IGNORECASE)
if number_match:
num_str = number_match.group(1).replace(",", "")
try:
value = int(num_str)
# Handle "5K" notation
if "K" in text.upper() and value < 1000:
value *= 1000
if 500 <= value <= 150000:
return value
except ValueError:
pass
return None
def _extract_months(self, text: str) -> Optional[int]:
"""Extract month interval from cell text."""
# First try pattern matcher
match = maintenance_matcher.extract_time_interval(text)
if match:
return match.value
# Try simple number extraction
number_match = re.search(r"(\d+)", text)
if number_match:
try:
value = int(number_match.group(1))
if 1 <= value <= 120:
return value
except ValueError:
pass
return None
# Singleton instance
table_parser = TableParser()

View File

@@ -16,6 +16,9 @@ numpy>=1.24.0
# OCR Engines
pytesseract>=0.3.10
# PDF Processing
PyMuPDF>=1.23.0
# Redis for job queue
redis>=5.0.0

View File

@@ -0,0 +1,164 @@
"""Tests for maintenance pattern matching."""
import pytest
from app.patterns.maintenance_patterns import maintenance_matcher
class TestMileageIntervalExtraction:
"""Tests for mileage interval extraction."""
def test_every_miles_pattern(self) -> None:
"""Test 'every X miles' pattern."""
result = maintenance_matcher.extract_mileage_interval("every 5,000 miles")
assert result is not None
assert result.value == 5000
assert result.confidence >= 0.9
def test_every_miles_no_comma(self) -> None:
"""Test 'every X miles' without comma."""
result = maintenance_matcher.extract_mileage_interval("every 5000 miles")
assert result is not None
assert result.value == 5000
def test_at_miles_pattern(self) -> None:
"""Test 'at X miles' pattern."""
result = maintenance_matcher.extract_mileage_interval("at 30,000 mi")
assert result is not None
assert result.value == 30000
def test_miles_or_pattern(self) -> None:
"""Test 'X miles or' pattern."""
result = maintenance_matcher.extract_mileage_interval("7,500 miles or 12 months")
assert result is not None
assert result.value == 7500
def test_miles_slash_pattern(self) -> None:
"""Test 'X mi/Y months' pattern."""
result = maintenance_matcher.extract_mileage_interval("5000 mi/6 months")
assert result is not None
assert result.value == 5000
def test_no_mileage(self) -> None:
"""Test text without mileage."""
result = maintenance_matcher.extract_mileage_interval("check brake fluid")
assert result is None
def test_unreasonable_mileage(self) -> None:
"""Test unreasonably low/high mileage is rejected."""
result = maintenance_matcher.extract_mileage_interval("every 10 miles")
assert result is None
result = maintenance_matcher.extract_mileage_interval("every 1,000,000 miles")
assert result is None
class TestTimeIntervalExtraction:
"""Tests for time interval extraction."""
def test_every_months_pattern(self) -> None:
"""Test 'every X months' pattern."""
result = maintenance_matcher.extract_time_interval("every 6 months")
assert result is not None
assert result.value == 6
assert result.confidence >= 0.9
def test_months_or_pattern(self) -> None:
"""Test 'X months or' pattern."""
result = maintenance_matcher.extract_time_interval("12 months or 10,000 miles")
assert result is not None
assert result.value == 12
def test_annually_pattern(self) -> None:
"""Test 'annually' keyword."""
result = maintenance_matcher.extract_time_interval("check annually")
assert result is not None
assert result.value == 12
def test_semi_annual_pattern(self) -> None:
"""Test 'semi-annually' keyword."""
result = maintenance_matcher.extract_time_interval("inspect semi-annually")
assert result is not None
assert result.value == 6
def test_every_years_pattern(self) -> None:
"""Test 'every X years' pattern."""
result = maintenance_matcher.extract_time_interval("replace every 2 years")
assert result is not None
assert result.value == 24
def test_no_time_interval(self) -> None:
"""Test text without time interval."""
result = maintenance_matcher.extract_time_interval("change oil filter")
assert result is None
class TestFluidSpecExtraction:
"""Tests for fluid specification extraction."""
def test_oil_viscosity(self) -> None:
"""Test oil viscosity patterns."""
result = maintenance_matcher.extract_fluid_spec("Use 0W-20 oil")
assert result is not None
assert result.value == "0W-20"
assert result.fluid_type == "oil"
result = maintenance_matcher.extract_fluid_spec("5W-30 synthetic")
assert result is not None
assert result.value == "5W-30"
def test_transmission_fluid(self) -> None:
"""Test transmission fluid patterns."""
result = maintenance_matcher.extract_fluid_spec("ATF-Z1 transmission fluid")
assert result is not None
assert "ATF" in result.value
assert result.fluid_type == "transmission"
result = maintenance_matcher.extract_fluid_spec("Dexron VI")
assert result is not None
assert result.fluid_type == "transmission"
def test_brake_fluid(self) -> None:
"""Test brake fluid patterns."""
result = maintenance_matcher.extract_fluid_spec("DOT 4 brake fluid")
assert result is not None
assert "DOT" in result.value
assert result.fluid_type == "brake"
def test_extract_all_fluid_specs(self) -> None:
"""Test extracting multiple fluid specs."""
text = "Use 0W-20 oil and DOT 4 brake fluid"
results = maintenance_matcher.extract_all_fluid_specs(text)
assert len(results) >= 2
class TestCombinedInterval:
"""Tests for combined interval extraction."""
def test_mileage_and_time(self) -> None:
"""Test extracting both intervals."""
text = "every 5,000 miles or 6 months, whichever comes first"
mileage, time = maintenance_matcher.extract_combined_interval(text)
assert mileage is not None
assert mileage.value == 5000
assert time is not None
assert time.value == 6
def test_only_mileage(self) -> None:
"""Test with only mileage."""
text = "replace every 30,000 miles"
mileage, time = maintenance_matcher.extract_combined_interval(text)
assert mileage is not None
assert mileage.value == 30000
assert time is None
def test_only_time(self) -> None:
"""Test with only time."""
text = "inspect annually"
mileage, time = maintenance_matcher.extract_combined_interval(text)
assert mileage is None
assert time is not None
assert time.value == 12

View File

@@ -0,0 +1,116 @@
"""Tests for service name mapping."""
import pytest
from app.patterns.service_mapping import service_mapper
class TestServiceMapping:
"""Tests for service to subtype mapping."""
def test_engine_oil_mapping(self) -> None:
"""Test engine oil service mapping."""
result = service_mapper.map_service("engine oil")
assert result is not None
assert result.normalized_name == "Engine Oil Change"
assert "Engine Oil" in result.subtypes
assert result.category == "routine_maintenance"
def test_oil_change_mapping(self) -> None:
"""Test oil change service mapping."""
result = service_mapper.map_service("oil change")
assert result is not None
assert "Engine Oil" in result.subtypes
def test_air_filter_mapping(self) -> None:
"""Test air filter service mapping."""
result = service_mapper.map_service("engine air filter")
assert result is not None
assert result.normalized_name == "Air Filter Replacement"
assert "Air Filter Element" in result.subtypes
def test_cabin_filter_mapping(self) -> None:
"""Test cabin air filter mapping."""
result = service_mapper.map_service("cabin air filter")
assert result is not None
assert "Cabin Air Filter / Purifier" in result.subtypes
def test_tire_rotation_mapping(self) -> None:
"""Test tire rotation mapping."""
result = service_mapper.map_service("tire rotation")
assert result is not None
assert "Tires" in result.subtypes
assert result.confidence >= 0.95
def test_brake_inspection_mapping(self) -> None:
"""Test brake inspection mapping."""
result = service_mapper.map_service("brake inspection")
assert result is not None
assert "Brakes and Traction Control" in result.subtypes
def test_coolant_mapping(self) -> None:
"""Test coolant service mapping."""
result = service_mapper.map_service("engine coolant")
assert result is not None
assert "Coolant" in result.subtypes
def test_transmission_fluid_mapping(self) -> None:
"""Test transmission fluid mapping."""
result = service_mapper.map_service("automatic transmission fluid")
assert result is not None
assert "Fluid - A/T" in result.subtypes
def test_spark_plug_mapping(self) -> None:
"""Test spark plug mapping."""
result = service_mapper.map_service("spark plugs")
assert result is not None
assert "Spark Plug" in result.subtypes
def test_wiper_blade_mapping(self) -> None:
"""Test wiper blade mapping."""
result = service_mapper.map_service("wiper blades")
assert result is not None
assert "Wiper Blade" in result.subtypes
def test_unknown_service(self) -> None:
"""Test unknown service returns None."""
result = service_mapper.map_service("quantum flux capacitor")
assert result is None
def test_case_insensitive(self) -> None:
"""Test mapping is case insensitive."""
result = service_mapper.map_service("ENGINE OIL")
assert result is not None
assert "Engine Oil" in result.subtypes
def test_partial_match(self) -> None:
"""Test partial matching in longer text."""
result = service_mapper.map_service("Replace engine oil and filter")
assert result is not None
assert "Engine Oil" in result.subtypes
class TestFuzzyMapping:
"""Tests for fuzzy service mapping."""
def test_fuzzy_oil_change(self) -> None:
"""Test fuzzy matching for oil change."""
result = service_mapper.map_service_fuzzy("change the engine oil")
assert result is not None
assert "Engine Oil" in result.subtypes
def test_fuzzy_low_threshold(self) -> None:
"""Test fuzzy matching with low similarity."""
result = service_mapper.map_service_fuzzy("oil", threshold=0.3)
assert result is not None # Should match "engine oil" partially
class TestKeywords:
"""Tests for keyword extraction."""
def test_get_keywords(self) -> None:
"""Test getting service keywords."""
keywords = service_mapper.get_all_service_keywords()
assert len(keywords) > 0
assert "engine oil" in keywords
assert "service" in keywords
assert "maintenance" in keywords

View File

@@ -0,0 +1,122 @@
"""Tests for table parsing."""
import pytest
from app.table_extraction.parser import table_parser
class TestTableParsing:
"""Tests for maintenance table parsing."""
def test_parse_simple_table(self) -> None:
"""Test parsing a simple maintenance table."""
header = ["Service", "Miles", "Months"]
data = [
["Engine Oil", "5,000", "6"],
["Air Filter", "30,000", "24"],
["Cabin Filter", "15,000", "12"],
]
results = table_parser.parse_table(header, data)
assert len(results) == 3
# Check oil change
oil = next(r for r in results if "oil" in r.service.lower())
assert oil.interval_miles == 5000
assert oil.interval_months == 6
def test_parse_table_with_notes(self) -> None:
"""Test parsing table with notes column."""
header = ["Item", "Interval", "Notes"]
data = [
["Engine Oil", "5,000 miles or 6 months", "Use 0W-20"],
["Brake Fluid", "30,000 miles", "DOT 4"],
]
results = table_parser.parse_table(header, data)
assert len(results) == 2
def test_parse_without_headers(self) -> None:
"""Test parsing table without clear headers."""
data = [
["Engine oil change", "5,000 miles", "6 months"],
["Tire rotation", "7,500 miles", ""],
]
results = table_parser._parse_without_headers(data)
assert len(results) >= 1
def test_parse_text_block(self) -> None:
"""Test parsing unstructured text."""
text = """
Engine oil: replace every 5,000 miles or 6 months
Air filter: replace every 30,000 miles
Tire rotation: every 7,500 miles
"""
results = table_parser.parse_text_block(text)
assert len(results) >= 2
class TestColumnIdentification:
"""Tests for column type identification."""
def test_identify_service_column(self) -> None:
"""Test identifying service column."""
header = ["Service Item", "Miles", "Months"]
columns = table_parser._identify_columns(header)
assert columns.get(0) == "service"
assert columns.get(1) == "miles"
assert columns.get(2) == "months"
def test_identify_maintenance_column(self) -> None:
"""Test identifying 'maintenance' as service column."""
header = ["Maintenance", "Interval", "Notes"]
columns = table_parser._identify_columns(header)
assert columns.get(0) == "service"
def test_identify_details_column(self) -> None:
"""Test identifying details/notes column."""
header = ["Item", "Miles", "Notes"]
columns = table_parser._identify_columns(header)
assert columns.get(2) == "details"
class TestIntervalExtraction:
"""Tests for interval extraction from cells."""
def test_extract_miles_with_comma(self) -> None:
"""Test extracting miles with comma separator."""
result = table_parser._extract_miles("5,000")
assert result == 5000
def test_extract_miles_without_comma(self) -> None:
"""Test extracting miles without comma."""
result = table_parser._extract_miles("5000")
assert result == 5000
def test_extract_miles_with_unit(self) -> None:
"""Test extracting miles with unit."""
result = table_parser._extract_miles("5,000 miles")
assert result == 5000
def test_extract_miles_k_notation(self) -> None:
"""Test extracting miles with K notation."""
result = table_parser._extract_miles("5K")
assert result == 5000
def test_extract_months(self) -> None:
"""Test extracting months."""
result = table_parser._extract_months("6")
assert result == 6
def test_extract_months_with_unit(self) -> None:
"""Test extracting months with unit."""
result = table_parser._extract_months("12 months")
assert result == 12