Merge pull request 'feat: Owner's Manual OCR Pipeline (#71)' (#79) from issue-71-manual-ocr-pipeline into main
All checks were successful
Deploy to Staging / Build Images (push) Successful in 31s
Deploy to Staging / Deploy to Staging (push) Successful in 31s
Deploy to Staging / Verify Staging (push) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (push) Successful in 8s
Deploy to Staging / Notify Staging Failure (push) Has been skipped
All checks were successful
Deploy to Staging / Build Images (push) Successful in 31s
Deploy to Staging / Deploy to Staging (push) Successful in 31s
Deploy to Staging / Verify Staging (push) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (push) Successful in 8s
Deploy to Staging / Notify Staging Failure (push) Has been skipped
Reviewed-on: #79
This commit was merged in pull request #79.
This commit is contained in:
@@ -8,6 +8,13 @@ from app.extractors.receipt_extractor import (
|
||||
ExtractedField,
|
||||
)
|
||||
from app.extractors.fuel_receipt import FuelReceiptExtractor, fuel_receipt_extractor
|
||||
from app.extractors.manual_extractor import (
|
||||
ManualExtractor,
|
||||
manual_extractor,
|
||||
ManualExtractionResult,
|
||||
ExtractedSchedule,
|
||||
VehicleInfo,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"BaseExtractor",
|
||||
@@ -20,4 +27,9 @@ __all__ = [
|
||||
"ExtractedField",
|
||||
"FuelReceiptExtractor",
|
||||
"fuel_receipt_extractor",
|
||||
"ManualExtractor",
|
||||
"manual_extractor",
|
||||
"ManualExtractionResult",
|
||||
"ExtractedSchedule",
|
||||
"VehicleInfo",
|
||||
]
|
||||
|
||||
417
ocr/app/extractors/manual_extractor.py
Normal file
417
ocr/app/extractors/manual_extractor.py
Normal file
@@ -0,0 +1,417 @@
|
||||
"""Owner's manual extractor for maintenance schedule extraction."""
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Callable, Optional
|
||||
|
||||
import pytesseract
|
||||
from PIL import Image
|
||||
|
||||
from app.preprocessors.pdf_preprocessor import pdf_preprocessor, PdfInfo
|
||||
from app.table_extraction.detector import table_detector, DetectedTable
|
||||
from app.table_extraction.parser import table_parser, ParsedScheduleRow
|
||||
from app.patterns.maintenance_patterns import maintenance_matcher
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractedSchedule:
|
||||
"""A single extracted maintenance schedule."""
|
||||
|
||||
service: str
|
||||
interval_miles: Optional[int]
|
||||
interval_months: Optional[int]
|
||||
details: Optional[str]
|
||||
confidence: float
|
||||
subtypes: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VehicleInfo:
|
||||
"""Vehicle information extracted from manual."""
|
||||
|
||||
make: Optional[str]
|
||||
model: Optional[str]
|
||||
year: Optional[int]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ManualExtractionResult:
|
||||
"""Complete result of manual extraction."""
|
||||
|
||||
success: bool
|
||||
vehicle_info: Optional[VehicleInfo]
|
||||
maintenance_schedules: list[ExtractedSchedule]
|
||||
raw_tables: list[dict]
|
||||
processing_time_ms: int
|
||||
total_pages: int
|
||||
pages_processed: int
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class ManualExtractor:
|
||||
"""Extract maintenance schedules from owner's manuals.
|
||||
|
||||
Processing pipeline:
|
||||
1. Analyze PDF structure
|
||||
2. Find maintenance section pages
|
||||
3. Extract text (native) or OCR (scanned)
|
||||
4. Detect tables
|
||||
5. Parse schedules
|
||||
6. Normalize and deduplicate
|
||||
"""
|
||||
|
||||
# Maximum pages to process for performance
|
||||
MAX_PAGES_TO_PROCESS = 50
|
||||
|
||||
# Minimum confidence to include schedule
|
||||
MIN_CONFIDENCE = 0.5
|
||||
|
||||
def extract(
|
||||
self,
|
||||
pdf_bytes: bytes,
|
||||
progress_callback: Optional[Callable[[int, str], None]] = None,
|
||||
) -> ManualExtractionResult:
|
||||
"""
|
||||
Extract maintenance schedules from an owner's manual PDF.
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
progress_callback: Optional callback for progress updates (percent, message)
|
||||
|
||||
Returns:
|
||||
ManualExtractionResult with extracted data
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
def update_progress(percent: int, message: str) -> None:
|
||||
if progress_callback:
|
||||
progress_callback(percent, message)
|
||||
logger.info(f"Progress {percent}%: {message}")
|
||||
|
||||
try:
|
||||
update_progress(5, "Analyzing PDF structure")
|
||||
|
||||
# Get PDF info
|
||||
pdf_info = pdf_preprocessor.get_pdf_info(pdf_bytes)
|
||||
logger.info(
|
||||
f"PDF: {pdf_info.total_pages} pages, "
|
||||
f"has_text={pdf_info.has_text_layer}, "
|
||||
f"is_scanned={pdf_info.is_scanned}"
|
||||
)
|
||||
|
||||
update_progress(10, "Finding maintenance sections")
|
||||
|
||||
# Find pages likely to contain maintenance schedules
|
||||
maintenance_pages = pdf_preprocessor.find_maintenance_section(pdf_bytes)
|
||||
|
||||
if not maintenance_pages:
|
||||
# If no specific pages found, process first N pages
|
||||
maintenance_pages = list(range(min(self.MAX_PAGES_TO_PROCESS, pdf_info.total_pages)))
|
||||
logger.info("No specific maintenance section found, processing all pages")
|
||||
else:
|
||||
# Include pages before and after detected maintenance pages
|
||||
expanded_pages: set[int] = set()
|
||||
for page in maintenance_pages:
|
||||
for offset in range(-2, 5): # Include 2 before, 4 after
|
||||
new_page = page + offset
|
||||
if 0 <= new_page < pdf_info.total_pages:
|
||||
expanded_pages.add(new_page)
|
||||
maintenance_pages = sorted(expanded_pages)[:self.MAX_PAGES_TO_PROCESS]
|
||||
logger.info(f"Processing {len(maintenance_pages)} pages around maintenance section")
|
||||
|
||||
update_progress(15, "Extracting page content")
|
||||
|
||||
# Extract content from pages
|
||||
all_schedules: list[ParsedScheduleRow] = []
|
||||
all_tables: list[dict] = []
|
||||
pages_processed = 0
|
||||
|
||||
for i, page_num in enumerate(maintenance_pages):
|
||||
page_progress = 15 + int((i / len(maintenance_pages)) * 60)
|
||||
update_progress(page_progress, f"Processing page {page_num + 1}")
|
||||
|
||||
# Extract page content
|
||||
page_content = pdf_preprocessor.extract_text_from_page(pdf_bytes, page_num)
|
||||
pages_processed += 1
|
||||
|
||||
# Process based on content type
|
||||
if page_content.has_text:
|
||||
# Native PDF - use text directly
|
||||
schedules, tables = self._process_text_page(
|
||||
page_content.text_content, page_num
|
||||
)
|
||||
elif page_content.image_bytes:
|
||||
# Scanned PDF - OCR required
|
||||
schedules, tables = self._process_scanned_page(
|
||||
page_content.image_bytes, page_num
|
||||
)
|
||||
else:
|
||||
continue
|
||||
|
||||
all_schedules.extend(schedules)
|
||||
all_tables.extend(tables)
|
||||
|
||||
update_progress(75, "Normalizing results")
|
||||
|
||||
# Deduplicate and normalize schedules
|
||||
normalized_schedules = self._normalize_schedules(all_schedules)
|
||||
|
||||
update_progress(85, "Extracting vehicle information")
|
||||
|
||||
# Try to extract vehicle info from first few pages
|
||||
vehicle_info = self._extract_vehicle_info(pdf_bytes, pdf_info)
|
||||
|
||||
update_progress(95, "Finalizing results")
|
||||
|
||||
processing_time_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
logger.info(
|
||||
f"Extraction complete: {len(normalized_schedules)} schedules from "
|
||||
f"{pages_processed} pages in {processing_time_ms}ms"
|
||||
)
|
||||
|
||||
update_progress(100, "Complete")
|
||||
|
||||
return ManualExtractionResult(
|
||||
success=True,
|
||||
vehicle_info=vehicle_info,
|
||||
maintenance_schedules=normalized_schedules,
|
||||
raw_tables=[{"page": t.get("page", 0), "rows": t.get("rows", 0)} for t in all_tables],
|
||||
processing_time_ms=processing_time_ms,
|
||||
total_pages=pdf_info.total_pages,
|
||||
pages_processed=pages_processed,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Manual extraction failed: {e}", exc_info=True)
|
||||
processing_time_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
return ManualExtractionResult(
|
||||
success=False,
|
||||
vehicle_info=None,
|
||||
maintenance_schedules=[],
|
||||
raw_tables=[],
|
||||
processing_time_ms=processing_time_ms,
|
||||
total_pages=0,
|
||||
pages_processed=0,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
def _process_text_page(
|
||||
self, text: str, page_number: int
|
||||
) -> tuple[list[ParsedScheduleRow], list[dict]]:
|
||||
"""Process a native PDF page with text."""
|
||||
schedules: list[ParsedScheduleRow] = []
|
||||
tables: list[dict] = []
|
||||
|
||||
# Detect tables in text
|
||||
detected_tables = table_detector.detect_tables_in_text(text, page_number)
|
||||
|
||||
for table in detected_tables:
|
||||
if table.is_maintenance_table and table.header_row:
|
||||
# Parse table
|
||||
parsed = table_parser.parse_table(
|
||||
table.header_row,
|
||||
table.raw_content,
|
||||
)
|
||||
schedules.extend(parsed)
|
||||
|
||||
tables.append({
|
||||
"page": page_number,
|
||||
"rows": len(table.raw_content),
|
||||
"is_maintenance": True,
|
||||
})
|
||||
|
||||
# Also try to extract from unstructured text
|
||||
text_schedules = table_parser.parse_text_block(text)
|
||||
schedules.extend(text_schedules)
|
||||
|
||||
return schedules, tables
|
||||
|
||||
def _process_scanned_page(
|
||||
self, image_bytes: bytes, page_number: int
|
||||
) -> tuple[list[ParsedScheduleRow], list[dict]]:
|
||||
"""Process a scanned PDF page with OCR."""
|
||||
schedules: list[ParsedScheduleRow] = []
|
||||
tables: list[dict] = []
|
||||
|
||||
# Detect tables in image
|
||||
detected_tables = table_detector.detect_tables_in_image(image_bytes, page_number)
|
||||
|
||||
# OCR the full page
|
||||
try:
|
||||
image = Image.open(io.BytesIO(image_bytes))
|
||||
ocr_text = pytesseract.image_to_string(image)
|
||||
|
||||
# Mark tables as maintenance if page contains maintenance keywords
|
||||
for table in detected_tables:
|
||||
table.is_maintenance_table = table_detector.is_maintenance_table(
|
||||
table, ocr_text
|
||||
)
|
||||
|
||||
# Try to extract from OCR text
|
||||
text_tables = table_detector.detect_tables_in_text(ocr_text, page_number)
|
||||
|
||||
for table in text_tables:
|
||||
if table.is_maintenance_table and table.header_row:
|
||||
parsed = table_parser.parse_table(
|
||||
table.header_row,
|
||||
table.raw_content,
|
||||
)
|
||||
schedules.extend(parsed)
|
||||
|
||||
tables.append({
|
||||
"page": page_number,
|
||||
"rows": len(table.raw_content),
|
||||
"is_maintenance": True,
|
||||
})
|
||||
|
||||
# Also try unstructured text
|
||||
text_schedules = table_parser.parse_text_block(ocr_text)
|
||||
schedules.extend(text_schedules)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"OCR failed for page {page_number}: {e}")
|
||||
|
||||
return schedules, tables
|
||||
|
||||
def _normalize_schedules(
|
||||
self, schedules: list[ParsedScheduleRow]
|
||||
) -> list[ExtractedSchedule]:
|
||||
"""Normalize and deduplicate extracted schedules."""
|
||||
# Group by normalized service name
|
||||
by_service: dict[str, list[ParsedScheduleRow]] = {}
|
||||
|
||||
for schedule in schedules:
|
||||
if schedule.confidence < self.MIN_CONFIDENCE:
|
||||
continue
|
||||
|
||||
key = schedule.normalized_service or schedule.service.lower()
|
||||
if key not in by_service:
|
||||
by_service[key] = []
|
||||
by_service[key].append(schedule)
|
||||
|
||||
# Merge duplicates, keeping highest confidence
|
||||
results: list[ExtractedSchedule] = []
|
||||
|
||||
for service_key, items in by_service.items():
|
||||
# Sort by confidence
|
||||
items.sort(key=lambda x: x.confidence, reverse=True)
|
||||
best = items[0]
|
||||
|
||||
# Merge interval info from other items if missing
|
||||
miles = best.interval_miles
|
||||
months = best.interval_months
|
||||
details = best.details
|
||||
fluid_spec = best.fluid_spec
|
||||
|
||||
for item in items[1:]:
|
||||
if not miles and item.interval_miles:
|
||||
miles = item.interval_miles
|
||||
if not months and item.interval_months:
|
||||
months = item.interval_months
|
||||
if not details and item.details:
|
||||
details = item.details
|
||||
if not fluid_spec and item.fluid_spec:
|
||||
fluid_spec = item.fluid_spec
|
||||
|
||||
# Build details string
|
||||
detail_parts = []
|
||||
if details:
|
||||
detail_parts.append(details)
|
||||
if fluid_spec:
|
||||
detail_parts.append(f"Use {fluid_spec}")
|
||||
|
||||
results.append(
|
||||
ExtractedSchedule(
|
||||
service=best.normalized_service or best.service,
|
||||
interval_miles=miles,
|
||||
interval_months=months,
|
||||
details=" - ".join(detail_parts) if detail_parts else None,
|
||||
confidence=best.confidence,
|
||||
subtypes=best.subtypes,
|
||||
)
|
||||
)
|
||||
|
||||
# Sort by confidence
|
||||
results.sort(key=lambda x: x.confidence, reverse=True)
|
||||
|
||||
return results
|
||||
|
||||
def _extract_vehicle_info(
|
||||
self, pdf_bytes: bytes, pdf_info: PdfInfo
|
||||
) -> Optional[VehicleInfo]:
|
||||
"""Extract vehicle make/model/year from manual."""
|
||||
# Check metadata first
|
||||
if pdf_info.title:
|
||||
info = self._parse_vehicle_from_title(pdf_info.title)
|
||||
if info:
|
||||
return info
|
||||
|
||||
# Try first page
|
||||
try:
|
||||
first_page = pdf_preprocessor.extract_text_from_page(pdf_bytes, 0)
|
||||
text = first_page.text_content
|
||||
|
||||
if not text and first_page.image_bytes:
|
||||
# OCR first page
|
||||
image = Image.open(io.BytesIO(first_page.image_bytes))
|
||||
text = pytesseract.image_to_string(image)
|
||||
|
||||
if text:
|
||||
return self._parse_vehicle_from_text(text)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract vehicle info: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def _parse_vehicle_from_title(self, title: str) -> Optional[VehicleInfo]:
|
||||
"""Parse vehicle info from document title."""
|
||||
import re
|
||||
|
||||
# Common patterns: "2024 Honda Civic Owner's Manual"
|
||||
year_match = re.search(r"(20\d{2}|19\d{2})", title)
|
||||
year = int(year_match.group(1)) if year_match else None
|
||||
|
||||
# Common makes
|
||||
makes = [
|
||||
"Acura", "Alfa Romeo", "Audi", "BMW", "Buick", "Cadillac",
|
||||
"Chevrolet", "Chrysler", "Dodge", "Ferrari", "Fiat", "Ford",
|
||||
"Genesis", "GMC", "Honda", "Hyundai", "Infiniti", "Jaguar",
|
||||
"Jeep", "Kia", "Lamborghini", "Land Rover", "Lexus", "Lincoln",
|
||||
"Maserati", "Mazda", "McLaren", "Mercedes", "Mini", "Mitsubishi",
|
||||
"Nissan", "Porsche", "Ram", "Rolls-Royce", "Subaru", "Tesla",
|
||||
"Toyota", "Volkswagen", "Volvo",
|
||||
]
|
||||
|
||||
make = None
|
||||
model = None
|
||||
|
||||
for m in makes:
|
||||
if m.lower() in title.lower():
|
||||
make = m
|
||||
# Try to find model after make
|
||||
idx = title.lower().find(m.lower())
|
||||
after = title[idx + len(m):].strip()
|
||||
# First word after make is likely model
|
||||
model_match = re.match(r"^(\w+)", after)
|
||||
if model_match:
|
||||
model = model_match.group(1)
|
||||
break
|
||||
|
||||
if year or make:
|
||||
return VehicleInfo(make=make, model=model, year=year)
|
||||
|
||||
return None
|
||||
|
||||
def _parse_vehicle_from_text(self, text: str) -> Optional[VehicleInfo]:
|
||||
"""Parse vehicle info from page text."""
|
||||
return self._parse_vehicle_from_title(text[:500]) # Use first 500 chars
|
||||
|
||||
|
||||
# Singleton instance
|
||||
manual_extractor = ManualExtractor()
|
||||
@@ -56,6 +56,8 @@ async def root() -> dict:
|
||||
"endpoints": [
|
||||
"POST /extract - Synchronous OCR extraction",
|
||||
"POST /extract/vin - VIN-specific extraction with validation",
|
||||
"POST /extract/receipt - Receipt extraction (fuel, general)",
|
||||
"POST /extract/manual - Owner's manual extraction (async)",
|
||||
"POST /jobs - Submit async OCR job",
|
||||
"GET /jobs/{job_id} - Get async job status",
|
||||
],
|
||||
|
||||
@@ -6,6 +6,10 @@ from .schemas import (
|
||||
JobResponse,
|
||||
JobStatus,
|
||||
JobSubmitRequest,
|
||||
ManualExtractionResponse,
|
||||
ManualJobResponse,
|
||||
ManualMaintenanceSchedule,
|
||||
ManualVehicleInfo,
|
||||
OcrResponse,
|
||||
ReceiptExtractedField,
|
||||
ReceiptExtractionResponse,
|
||||
@@ -20,6 +24,10 @@ __all__ = [
|
||||
"JobResponse",
|
||||
"JobStatus",
|
||||
"JobSubmitRequest",
|
||||
"ManualExtractionResponse",
|
||||
"ManualJobResponse",
|
||||
"ManualMaintenanceSchedule",
|
||||
"ManualVehicleInfo",
|
||||
"OcrResponse",
|
||||
"ReceiptExtractedField",
|
||||
"ReceiptExtractionResponse",
|
||||
|
||||
@@ -115,3 +115,57 @@ class ReceiptExtractionResponse(BaseModel):
|
||||
error: Optional[str] = None
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
# Manual extraction models
|
||||
|
||||
|
||||
class ManualVehicleInfo(BaseModel):
|
||||
"""Vehicle information extracted from manual."""
|
||||
|
||||
make: Optional[str] = None
|
||||
model: Optional[str] = None
|
||||
year: Optional[int] = None
|
||||
|
||||
|
||||
class ManualMaintenanceSchedule(BaseModel):
|
||||
"""A single maintenance schedule entry."""
|
||||
|
||||
service: str
|
||||
interval_miles: Optional[int] = Field(default=None, alias="intervalMiles")
|
||||
interval_months: Optional[int] = Field(default=None, alias="intervalMonths")
|
||||
details: Optional[str] = None
|
||||
confidence: float = Field(ge=0.0, le=1.0)
|
||||
subtypes: list[str] = Field(default_factory=list)
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
class ManualExtractionResponse(BaseModel):
|
||||
"""Response from manual extraction endpoint."""
|
||||
|
||||
success: bool
|
||||
vehicle_info: Optional[ManualVehicleInfo] = Field(default=None, alias="vehicleInfo")
|
||||
maintenance_schedules: list[ManualMaintenanceSchedule] = Field(
|
||||
default_factory=list, alias="maintenanceSchedules"
|
||||
)
|
||||
raw_tables: list[dict] = Field(default_factory=list, alias="rawTables")
|
||||
processing_time_ms: int = Field(alias="processingTimeMs")
|
||||
total_pages: int = Field(alias="totalPages")
|
||||
pages_processed: int = Field(alias="pagesProcessed")
|
||||
error: Optional[str] = None
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
class ManualJobResponse(BaseModel):
|
||||
"""Response for async manual extraction job."""
|
||||
|
||||
job_id: str = Field(alias="jobId")
|
||||
status: JobStatus
|
||||
progress: Optional[int] = Field(default=None, ge=0, le=100)
|
||||
estimated_seconds: Optional[int] = Field(default=None, alias="estimatedSeconds")
|
||||
result: Optional[ManualExtractionResponse] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
"""Pattern matching modules for receipt field extraction."""
|
||||
"""Pattern matching modules for receipt and manual field extraction."""
|
||||
from app.patterns.date_patterns import DatePatternMatcher, date_matcher
|
||||
from app.patterns.currency_patterns import CurrencyPatternMatcher, currency_matcher
|
||||
from app.patterns.fuel_patterns import FuelPatternMatcher, fuel_matcher
|
||||
from app.patterns.maintenance_patterns import MaintenancePatternMatcher, maintenance_matcher
|
||||
from app.patterns.service_mapping import ServiceMapper, service_mapper
|
||||
|
||||
__all__ = [
|
||||
"DatePatternMatcher",
|
||||
@@ -10,4 +12,8 @@ __all__ = [
|
||||
"currency_matcher",
|
||||
"FuelPatternMatcher",
|
||||
"fuel_matcher",
|
||||
"MaintenancePatternMatcher",
|
||||
"maintenance_matcher",
|
||||
"ServiceMapper",
|
||||
"service_mapper",
|
||||
]
|
||||
|
||||
335
ocr/app/patterns/maintenance_patterns.py
Normal file
335
ocr/app/patterns/maintenance_patterns.py
Normal file
@@ -0,0 +1,335 @@
|
||||
"""Maintenance schedule pattern matching for owner's manual extraction."""
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class MileageIntervalMatch:
|
||||
"""Result of mileage interval pattern matching."""
|
||||
|
||||
value: int # Miles
|
||||
raw_match: str
|
||||
confidence: float
|
||||
pattern_name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimeIntervalMatch:
|
||||
"""Result of time interval pattern matching."""
|
||||
|
||||
value: int # Months
|
||||
raw_match: str
|
||||
confidence: float
|
||||
pattern_name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class FluidSpecMatch:
|
||||
"""Result of fluid specification pattern matching."""
|
||||
|
||||
value: str # e.g., "0W-20", "ATF-Z1", "DOT 4"
|
||||
fluid_type: str # e.g., "oil", "transmission", "brake"
|
||||
raw_match: str
|
||||
confidence: float
|
||||
|
||||
|
||||
class MaintenancePatternMatcher:
|
||||
"""Extract maintenance-specific data from owner's manual text."""
|
||||
|
||||
# Mileage interval patterns
|
||||
MILEAGE_PATTERNS = [
|
||||
# "every 5,000 miles" or "every 5000 miles"
|
||||
(
|
||||
r"every\s+([\d,]+)\s*(?:miles?|mi\.?)",
|
||||
"every_miles",
|
||||
0.95,
|
||||
),
|
||||
# "at 30,000 mi" or "at 30000 miles"
|
||||
(
|
||||
r"at\s+([\d,]+)\s*(?:miles?|mi\.?)",
|
||||
"at_miles",
|
||||
0.93,
|
||||
),
|
||||
# "5,000 miles or" (interval before "or")
|
||||
(
|
||||
r"([\d,]+)\s*(?:miles?|mi\.?)\s*(?:or|/)",
|
||||
"miles_or",
|
||||
0.90,
|
||||
),
|
||||
# "every 5,000-7,500 miles" (range - take lower)
|
||||
(
|
||||
r"every\s+([\d,]+)\s*[-–]\s*[\d,]+\s*(?:miles?|mi\.?)",
|
||||
"miles_range",
|
||||
0.88,
|
||||
),
|
||||
# "7,500 mi/12 months" (interval with slash)
|
||||
(
|
||||
r"([\d,]+)\s*(?:miles?|mi\.?)\s*/",
|
||||
"miles_slash",
|
||||
0.87,
|
||||
),
|
||||
# Standalone "X,XXX miles" in table context
|
||||
(
|
||||
r"(?<![0-9])([\d,]+)\s*(?:miles?|mi\.?)(?![a-z])",
|
||||
"standalone_miles",
|
||||
0.75,
|
||||
),
|
||||
]
|
||||
|
||||
# Time interval patterns
|
||||
TIME_PATTERNS = [
|
||||
# "every 6 months"
|
||||
(
|
||||
r"every\s+(\d+)\s*months?",
|
||||
"every_months",
|
||||
0.95,
|
||||
),
|
||||
# "6 months or" (interval before "or")
|
||||
(
|
||||
r"(\d+)\s*months?\s*(?:or|/)",
|
||||
"months_or",
|
||||
0.90,
|
||||
),
|
||||
# "annually" -> 12 months
|
||||
(
|
||||
r"\bannually\b",
|
||||
"annually",
|
||||
0.95,
|
||||
),
|
||||
# "semi-annually" or "semi-annual" -> 6 months
|
||||
(
|
||||
r"\bsemi-?annual(?:ly)?\b",
|
||||
"semi_annual",
|
||||
0.95,
|
||||
),
|
||||
# "every year" -> 12 months
|
||||
(
|
||||
r"every\s+year",
|
||||
"every_year",
|
||||
0.93,
|
||||
),
|
||||
# "every 2 years" -> 24 months
|
||||
(
|
||||
r"every\s+(\d+)\s*years?",
|
||||
"every_years",
|
||||
0.93,
|
||||
),
|
||||
# "12 mo/7,500 mi" or "12 months/"
|
||||
(
|
||||
r"(\d+)\s*(?:mo(?:nths?)?\.?)\s*/",
|
||||
"months_slash",
|
||||
0.87,
|
||||
),
|
||||
# Standalone "X months" in table context
|
||||
(
|
||||
r"(?<![0-9])(\d+)\s*months?(?![a-z])",
|
||||
"standalone_months",
|
||||
0.75,
|
||||
),
|
||||
]
|
||||
|
||||
# Fluid specification patterns
|
||||
FLUID_PATTERNS = [
|
||||
# Oil viscosity: 0W-20, 5W-30, 10W-40
|
||||
(
|
||||
r"\b(\d+W-\d+)\b",
|
||||
"oil",
|
||||
0.95,
|
||||
),
|
||||
# Full synthetic variants
|
||||
(
|
||||
r"(full\s+synthetic\s+\d+W-\d+)",
|
||||
"oil",
|
||||
0.93,
|
||||
),
|
||||
# Transmission fluid: ATF-Z1, ATF+4, Dexron VI
|
||||
(
|
||||
r"\b(ATF[- ]?\w+)\b",
|
||||
"transmission",
|
||||
0.90,
|
||||
),
|
||||
(
|
||||
r"\b(Dexron\s*(?:VI|IV|III)?)\b",
|
||||
"transmission",
|
||||
0.90,
|
||||
),
|
||||
(
|
||||
r"\b(Mercon\s*(?:V|LV|SP)?)\b",
|
||||
"transmission",
|
||||
0.90,
|
||||
),
|
||||
# Brake fluid: DOT 3, DOT 4, DOT 5.1
|
||||
(
|
||||
r"\b(DOT\s*\d(?:\.\d)?)\b",
|
||||
"brake",
|
||||
0.95,
|
||||
),
|
||||
# Coolant types
|
||||
(
|
||||
r"\b((?:Type\s+)?(?:2|II)\s+(?:coolant|antifreeze))\b",
|
||||
"coolant",
|
||||
0.88,
|
||||
),
|
||||
(
|
||||
r"\b((?:50/50|pre-mixed)\s+(?:coolant|antifreeze))\b",
|
||||
"coolant",
|
||||
0.85,
|
||||
),
|
||||
# Power steering fluid
|
||||
(
|
||||
r"\b(power\s+steering\s+fluid)\b",
|
||||
"power_steering",
|
||||
0.90,
|
||||
),
|
||||
]
|
||||
|
||||
def extract_mileage_interval(self, text: str) -> Optional[MileageIntervalMatch]:
|
||||
"""
|
||||
Extract mileage interval from text.
|
||||
|
||||
Args:
|
||||
text: Text to search for mileage intervals
|
||||
|
||||
Returns:
|
||||
MileageIntervalMatch or None if no interval found
|
||||
"""
|
||||
text_lower = text.lower()
|
||||
|
||||
for pattern, name, confidence in self.MILEAGE_PATTERNS:
|
||||
match = re.search(pattern, text_lower, re.IGNORECASE)
|
||||
if match:
|
||||
# Extract the number and remove commas
|
||||
mileage_str = match.group(1).replace(",", "")
|
||||
mileage = int(mileage_str)
|
||||
|
||||
if self._is_reasonable_mileage(mileage):
|
||||
return MileageIntervalMatch(
|
||||
value=mileage,
|
||||
raw_match=match.group(0),
|
||||
confidence=confidence,
|
||||
pattern_name=name,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def extract_time_interval(self, text: str) -> Optional[TimeIntervalMatch]:
|
||||
"""
|
||||
Extract time interval from text.
|
||||
|
||||
Args:
|
||||
text: Text to search for time intervals
|
||||
|
||||
Returns:
|
||||
TimeIntervalMatch or None if no interval found
|
||||
"""
|
||||
text_lower = text.lower()
|
||||
|
||||
for pattern, name, confidence in self.TIME_PATTERNS:
|
||||
match = re.search(pattern, text_lower, re.IGNORECASE)
|
||||
if match:
|
||||
# Handle special cases
|
||||
if name == "annually":
|
||||
months = 12
|
||||
elif name == "semi_annual":
|
||||
months = 6
|
||||
elif name == "every_year":
|
||||
months = 12
|
||||
elif name == "every_years":
|
||||
years = int(match.group(1))
|
||||
months = years * 12
|
||||
else:
|
||||
months = int(match.group(1))
|
||||
|
||||
if self._is_reasonable_months(months):
|
||||
return TimeIntervalMatch(
|
||||
value=months,
|
||||
raw_match=match.group(0),
|
||||
confidence=confidence,
|
||||
pattern_name=name,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def extract_fluid_spec(self, text: str) -> Optional[FluidSpecMatch]:
|
||||
"""
|
||||
Extract fluid specification from text.
|
||||
|
||||
Args:
|
||||
text: Text to search for fluid specs
|
||||
|
||||
Returns:
|
||||
FluidSpecMatch or None if no spec found
|
||||
"""
|
||||
for pattern, fluid_type, confidence in self.FLUID_PATTERNS:
|
||||
match = re.search(pattern, text, re.IGNORECASE)
|
||||
if match:
|
||||
return FluidSpecMatch(
|
||||
value=match.group(1).upper() if fluid_type != "coolant" else match.group(1),
|
||||
fluid_type=fluid_type,
|
||||
raw_match=match.group(0),
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def extract_all_fluid_specs(self, text: str) -> list[FluidSpecMatch]:
|
||||
"""
|
||||
Extract all fluid specifications from text.
|
||||
|
||||
Args:
|
||||
text: Text to search for fluid specs
|
||||
|
||||
Returns:
|
||||
List of FluidSpecMatch objects
|
||||
"""
|
||||
results = []
|
||||
seen_values: set[str] = set()
|
||||
|
||||
for pattern, fluid_type, confidence in self.FLUID_PATTERNS:
|
||||
for match in re.finditer(pattern, text, re.IGNORECASE):
|
||||
value = match.group(1).upper() if fluid_type != "coolant" else match.group(1)
|
||||
if value not in seen_values:
|
||||
seen_values.add(value)
|
||||
results.append(
|
||||
FluidSpecMatch(
|
||||
value=value,
|
||||
fluid_type=fluid_type,
|
||||
raw_match=match.group(0),
|
||||
confidence=confidence,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def extract_combined_interval(
|
||||
self, text: str
|
||||
) -> tuple[Optional[MileageIntervalMatch], Optional[TimeIntervalMatch]]:
|
||||
"""
|
||||
Extract both mileage and time intervals from a combined pattern.
|
||||
|
||||
Many schedules use patterns like "every 5,000 miles or 6 months".
|
||||
|
||||
Args:
|
||||
text: Text to search
|
||||
|
||||
Returns:
|
||||
Tuple of (mileage_match, time_match)
|
||||
"""
|
||||
mileage = self.extract_mileage_interval(text)
|
||||
time = self.extract_time_interval(text)
|
||||
return mileage, time
|
||||
|
||||
def _is_reasonable_mileage(self, mileage: int) -> bool:
|
||||
"""Check if mileage interval is reasonable for maintenance."""
|
||||
# Typical ranges: 1,000 to 100,000 miles
|
||||
return 500 <= mileage <= 150000
|
||||
|
||||
def _is_reasonable_months(self, months: int) -> bool:
|
||||
"""Check if month interval is reasonable for maintenance."""
|
||||
# Typical ranges: 1 to 120 months (10 years)
|
||||
return 1 <= months <= 120
|
||||
|
||||
|
||||
# Singleton instance
|
||||
maintenance_matcher = MaintenancePatternMatcher()
|
||||
259
ocr/app/patterns/service_mapping.py
Normal file
259
ocr/app/patterns/service_mapping.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Service name normalization and mapping to maintenance subtypes."""
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServiceMapping:
|
||||
"""Mapping result from extracted text to maintenance subtypes."""
|
||||
|
||||
normalized_name: str # Standardized service name
|
||||
subtypes: list[str] # Maintenance subtypes from the system
|
||||
category: str # routine_maintenance, repair, performance_upgrade
|
||||
confidence: float
|
||||
|
||||
|
||||
# Maintenance subtypes from the system (must match exactly)
|
||||
ROUTINE_MAINTENANCE_SUBTYPES = [
|
||||
"Accelerator Pedal",
|
||||
"Air Filter Element",
|
||||
"Brakes and Traction Control",
|
||||
"Cabin Air Filter / Purifier",
|
||||
"Coolant",
|
||||
"Doors",
|
||||
"Drive Belt",
|
||||
"Engine Oil",
|
||||
"Evaporative Emissions System",
|
||||
"Exhaust System",
|
||||
"Fluid - A/T",
|
||||
"Fluid - Differential",
|
||||
"Fluid - M/T",
|
||||
"Fluid Filter - A/T",
|
||||
"Fluids",
|
||||
"Fuel Delivery and Air Induction",
|
||||
"Hood Shock / Support",
|
||||
"Neutral Safety Switch",
|
||||
"Parking Brake System",
|
||||
"Restraints and Safety Systems",
|
||||
"Shift Interlock A/T",
|
||||
"Spark Plug",
|
||||
"Steering and Suspension",
|
||||
"Tires",
|
||||
"Trunk / Liftgate Shock / Support",
|
||||
"Washer Fluid",
|
||||
"Wiper Blade",
|
||||
]
|
||||
|
||||
|
||||
class ServiceMapper:
|
||||
"""Map extracted service names to maintenance subtypes."""
|
||||
|
||||
# Mapping from common service terms to system subtypes
|
||||
# Keys are lowercase patterns, values are (normalized_name, subtypes, category, confidence)
|
||||
SERVICE_MAPPINGS: dict[str, tuple[str, list[str], str, float]] = {
|
||||
# Oil related
|
||||
"engine oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
"oil change": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
"motor oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.93),
|
||||
"oil and filter": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
"oil & filter": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
"change engine oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
"replace engine oil": ("Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
# Air filter
|
||||
"air filter": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.90),
|
||||
"engine air filter": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.95),
|
||||
"air cleaner": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.88),
|
||||
"air cleaner element": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.93),
|
||||
"replace air filter": ("Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.95),
|
||||
# Cabin filter
|
||||
"cabin air filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.95),
|
||||
"cabin filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.93),
|
||||
"a/c filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.88),
|
||||
"hvac filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.88),
|
||||
"interior air filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.90),
|
||||
"dust and pollen filter": ("Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.90),
|
||||
# Tires
|
||||
"tire rotation": ("Tire Rotation", ["Tires"], "routine_maintenance", 0.98),
|
||||
"rotate tires": ("Tire Rotation", ["Tires"], "routine_maintenance", 0.95),
|
||||
"tire inspection": ("Tire Inspection", ["Tires"], "routine_maintenance", 0.93),
|
||||
"inspect tires": ("Tire Inspection", ["Tires"], "routine_maintenance", 0.93),
|
||||
"check tire pressure": ("Tire Pressure Check", ["Tires"], "routine_maintenance", 0.90),
|
||||
"tire pressure": ("Tire Pressure Check", ["Tires"], "routine_maintenance", 0.85),
|
||||
# Brakes
|
||||
"brake inspection": ("Brake Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.95),
|
||||
"inspect brakes": ("Brake Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.93),
|
||||
"brake fluid": ("Brake Fluid Service", ["Brakes and Traction Control"], "routine_maintenance", 0.93),
|
||||
"brake pads": ("Brake Pad Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.90),
|
||||
"parking brake": ("Parking Brake Inspection", ["Parking Brake System"], "routine_maintenance", 0.93),
|
||||
# Coolant
|
||||
"coolant": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.90),
|
||||
"engine coolant": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.93),
|
||||
"antifreeze": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.90),
|
||||
"cooling system": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.88),
|
||||
"radiator fluid": ("Coolant Service", ["Coolant"], "routine_maintenance", 0.88),
|
||||
"replace coolant": ("Coolant Replacement", ["Coolant"], "routine_maintenance", 0.95),
|
||||
# Transmission
|
||||
"transmission fluid": ("Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.93),
|
||||
"automatic transmission fluid": ("Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.95),
|
||||
"atf": ("Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.90),
|
||||
"manual transmission fluid": ("Manual Transmission Fluid", ["Fluid - M/T"], "routine_maintenance", 0.95),
|
||||
"cvt fluid": ("CVT Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.93),
|
||||
"transmission filter": ("Transmission Filter", ["Fluid Filter - A/T"], "routine_maintenance", 0.93),
|
||||
# Differential
|
||||
"differential fluid": ("Differential Fluid Service", ["Fluid - Differential"], "routine_maintenance", 0.95),
|
||||
"rear differential": ("Differential Fluid Service", ["Fluid - Differential"], "routine_maintenance", 0.93),
|
||||
"front differential": ("Differential Fluid Service", ["Fluid - Differential"], "routine_maintenance", 0.93),
|
||||
"transfer case": ("Transfer Case Fluid", ["Fluid - Differential"], "routine_maintenance", 0.90),
|
||||
# Spark plugs
|
||||
"spark plug": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
|
||||
"spark plugs": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
|
||||
"replace spark plugs": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
|
||||
"ignition plugs": ("Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.88),
|
||||
# Drive belt
|
||||
"drive belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.93),
|
||||
"serpentine belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.93),
|
||||
"accessory belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.90),
|
||||
"timing belt": ("Timing Belt Service", ["Drive Belt"], "routine_maintenance", 0.90),
|
||||
"v-belt": ("Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.88),
|
||||
# Wipers
|
||||
"wiper blade": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.95),
|
||||
"wiper blades": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.95),
|
||||
"windshield wiper": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.93),
|
||||
"replace wipers": ("Wiper Blade Replacement", ["Wiper Blade"], "routine_maintenance", 0.93),
|
||||
# Washer fluid
|
||||
"washer fluid": ("Washer Fluid", ["Washer Fluid"], "routine_maintenance", 0.95),
|
||||
"windshield washer": ("Washer Fluid", ["Washer Fluid"], "routine_maintenance", 0.90),
|
||||
# Steering/Suspension
|
||||
"steering": ("Steering Inspection", ["Steering and Suspension"], "routine_maintenance", 0.85),
|
||||
"suspension": ("Suspension Inspection", ["Steering and Suspension"], "routine_maintenance", 0.85),
|
||||
"power steering": ("Power Steering Fluid", ["Steering and Suspension"], "routine_maintenance", 0.90),
|
||||
"power steering fluid": ("Power Steering Fluid", ["Steering and Suspension"], "routine_maintenance", 0.93),
|
||||
# Exhaust
|
||||
"exhaust": ("Exhaust System Inspection", ["Exhaust System"], "routine_maintenance", 0.88),
|
||||
"exhaust system": ("Exhaust System Inspection", ["Exhaust System"], "routine_maintenance", 0.93),
|
||||
# Fuel system
|
||||
"fuel filter": ("Fuel Filter Replacement", ["Fuel Delivery and Air Induction"], "routine_maintenance", 0.93),
|
||||
"fuel system": ("Fuel System Inspection", ["Fuel Delivery and Air Induction"], "routine_maintenance", 0.88),
|
||||
"fuel injection": ("Fuel Injection Service", ["Fuel Delivery and Air Induction"], "routine_maintenance", 0.88),
|
||||
# Emissions
|
||||
"evaporative emissions": ("Evaporative Emissions Inspection", ["Evaporative Emissions System"], "routine_maintenance", 0.93),
|
||||
"evap system": ("Evaporative Emissions Inspection", ["Evaporative Emissions System"], "routine_maintenance", 0.90),
|
||||
"emissions": ("Evaporative Emissions Inspection", ["Evaporative Emissions System"], "routine_maintenance", 0.80),
|
||||
# Safety systems
|
||||
"seat belt": ("Safety Systems Inspection", ["Restraints and Safety Systems"], "routine_maintenance", 0.90),
|
||||
"airbag": ("Safety Systems Inspection", ["Restraints and Safety Systems"], "routine_maintenance", 0.85),
|
||||
"restraint": ("Safety Systems Inspection", ["Restraints and Safety Systems"], "routine_maintenance", 0.85),
|
||||
# Miscellaneous
|
||||
"battery": ("Battery Inspection", ["Fluids"], "routine_maintenance", 0.80),
|
||||
"inspect battery": ("Battery Inspection", ["Fluids"], "routine_maintenance", 0.85),
|
||||
"door hinges": ("Door Lubrication", ["Doors"], "routine_maintenance", 0.85),
|
||||
"hood shock": ("Hood Shock Inspection", ["Hood Shock / Support"], "routine_maintenance", 0.90),
|
||||
"trunk shock": ("Trunk Shock Inspection", ["Trunk / Liftgate Shock / Support"], "routine_maintenance", 0.90),
|
||||
"liftgate": ("Liftgate Inspection", ["Trunk / Liftgate Shock / Support"], "routine_maintenance", 0.88),
|
||||
}
|
||||
|
||||
# Pattern-based mappings for fuzzy matching
|
||||
SERVICE_PATTERNS: list[tuple[str, str, list[str], str, float]] = [
|
||||
# (regex_pattern, normalized_name, subtypes, category, confidence)
|
||||
(r"oil\s+(?:and|&)\s+filter", "Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.95),
|
||||
(r"(?:change|replace)\s+(?:the\s+)?oil", "Engine Oil Change", ["Engine Oil"], "routine_maintenance", 0.93),
|
||||
(r"(?:inspect|check)\s+(?:the\s+)?brakes?", "Brake Inspection", ["Brakes and Traction Control"], "routine_maintenance", 0.90),
|
||||
(r"(?:inspect|check)\s+(?:the\s+)?tires?", "Tire Inspection", ["Tires"], "routine_maintenance", 0.90),
|
||||
(r"(?:rotate|rotation)\s+(?:the\s+)?tires?", "Tire Rotation", ["Tires"], "routine_maintenance", 0.95),
|
||||
(r"(?:replace|change)\s+(?:the\s+)?(?:engine\s+)?air\s+filter", "Air Filter Replacement", ["Air Filter Element"], "routine_maintenance", 0.95),
|
||||
(r"(?:replace|change)\s+(?:the\s+)?cabin\s+(?:air\s+)?filter", "Cabin Air Filter Replacement", ["Cabin Air Filter / Purifier"], "routine_maintenance", 0.95),
|
||||
(r"(?:replace|change)\s+(?:the\s+)?spark\s+plugs?", "Spark Plug Replacement", ["Spark Plug"], "routine_maintenance", 0.95),
|
||||
(r"(?:replace|change)\s+(?:the\s+)?coolant", "Coolant Replacement", ["Coolant"], "routine_maintenance", 0.93),
|
||||
(r"(?:flush|drain)\s+(?:the\s+)?coolant", "Coolant Flush", ["Coolant"], "routine_maintenance", 0.93),
|
||||
(r"(?:replace|change)\s+(?:the\s+)?(?:a/?t|automatic\s+transmission)\s+fluid", "Transmission Fluid Service", ["Fluid - A/T"], "routine_maintenance", 0.93),
|
||||
(r"(?:inspect|check)\s+(?:the\s+)?(?:drive|serpentine|accessory)\s+belt", "Drive Belt Inspection", ["Drive Belt"], "routine_maintenance", 0.90),
|
||||
]
|
||||
|
||||
def map_service(self, service_text: str) -> Optional[ServiceMapping]:
|
||||
"""
|
||||
Map extracted service text to maintenance subtypes.
|
||||
|
||||
Args:
|
||||
service_text: Service name or description from the manual
|
||||
|
||||
Returns:
|
||||
ServiceMapping or None if no mapping found
|
||||
"""
|
||||
normalized_text = service_text.lower().strip()
|
||||
|
||||
# Try exact mapping first
|
||||
for key, (name, subtypes, category, conf) in self.SERVICE_MAPPINGS.items():
|
||||
if key in normalized_text:
|
||||
return ServiceMapping(
|
||||
normalized_name=name,
|
||||
subtypes=subtypes,
|
||||
category=category,
|
||||
confidence=conf,
|
||||
)
|
||||
|
||||
# Try pattern matching
|
||||
for pattern, name, subtypes, category, conf in self.SERVICE_PATTERNS:
|
||||
if re.search(pattern, normalized_text, re.IGNORECASE):
|
||||
return ServiceMapping(
|
||||
normalized_name=name,
|
||||
subtypes=subtypes,
|
||||
category=category,
|
||||
confidence=conf,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def map_service_fuzzy(self, service_text: str, threshold: float = 0.6) -> Optional[ServiceMapping]:
|
||||
"""
|
||||
Map service text with fuzzy matching for typos and variations.
|
||||
|
||||
Args:
|
||||
service_text: Service name or description
|
||||
threshold: Minimum similarity threshold (0.0-1.0)
|
||||
|
||||
Returns:
|
||||
ServiceMapping or None
|
||||
"""
|
||||
# First try exact matching
|
||||
result = self.map_service(service_text)
|
||||
if result:
|
||||
return result
|
||||
|
||||
# Fall back to word overlap matching
|
||||
words = set(service_text.lower().split())
|
||||
|
||||
best_match: Optional[ServiceMapping] = None
|
||||
best_score = 0.0
|
||||
|
||||
for key, (name, subtypes, category, conf) in self.SERVICE_MAPPINGS.items():
|
||||
key_words = set(key.split())
|
||||
overlap = len(words & key_words)
|
||||
total = len(words | key_words)
|
||||
|
||||
if total > 0:
|
||||
score = overlap / total
|
||||
if score > best_score and score >= threshold:
|
||||
best_score = score
|
||||
best_match = ServiceMapping(
|
||||
normalized_name=name,
|
||||
subtypes=subtypes,
|
||||
category=category,
|
||||
confidence=conf * score, # Reduce confidence by match quality
|
||||
)
|
||||
|
||||
return best_match
|
||||
|
||||
def get_all_service_keywords(self) -> list[str]:
|
||||
"""Get all service keywords for table header detection."""
|
||||
keywords = list(self.SERVICE_MAPPINGS.keys())
|
||||
# Add common header terms
|
||||
keywords.extend([
|
||||
"service", "maintenance", "item", "operation",
|
||||
"inspection", "replacement", "interval", "schedule",
|
||||
])
|
||||
return keywords
|
||||
|
||||
|
||||
# Singleton instance
|
||||
service_mapper = ServiceMapper()
|
||||
@@ -5,6 +5,12 @@ from app.preprocessors.receipt_preprocessor import (
|
||||
ReceiptPreprocessor,
|
||||
receipt_preprocessor,
|
||||
)
|
||||
from app.preprocessors.pdf_preprocessor import (
|
||||
PdfPreprocessor,
|
||||
pdf_preprocessor,
|
||||
PdfPageContent,
|
||||
PdfInfo,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ImagePreprocessor",
|
||||
@@ -13,4 +19,8 @@ __all__ = [
|
||||
"vin_preprocessor",
|
||||
"ReceiptPreprocessor",
|
||||
"receipt_preprocessor",
|
||||
"PdfPreprocessor",
|
||||
"pdf_preprocessor",
|
||||
"PdfPageContent",
|
||||
"PdfInfo",
|
||||
]
|
||||
|
||||
353
ocr/app/preprocessors/pdf_preprocessor.py
Normal file
353
ocr/app/preprocessors/pdf_preprocessor.py
Normal file
@@ -0,0 +1,353 @@
|
||||
"""PDF preprocessing for owner's manual extraction."""
|
||||
import io
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Iterator, Optional
|
||||
|
||||
import fitz # PyMuPDF
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PdfPageContent:
|
||||
"""Content extracted from a single PDF page."""
|
||||
|
||||
page_number: int
|
||||
has_text: bool
|
||||
text_content: str
|
||||
image_bytes: Optional[bytes] # Rendered image for scanned pages
|
||||
width: int
|
||||
height: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class PdfInfo:
|
||||
"""Information about a PDF document."""
|
||||
|
||||
total_pages: int
|
||||
has_text_layer: bool
|
||||
is_scanned: bool # True if most pages lack text layer
|
||||
file_size_bytes: int
|
||||
title: Optional[str]
|
||||
author: Optional[str]
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
|
||||
class PdfPreprocessor:
|
||||
"""Preprocess PDFs for OCR extraction.
|
||||
|
||||
Handles two scenarios:
|
||||
1. Native PDFs with text layer - extract text directly
|
||||
2. Scanned PDFs - render pages to images for OCR
|
||||
|
||||
Uses PyMuPDF (fitz) for both text extraction and image rendering.
|
||||
"""
|
||||
|
||||
# DPI for rendering scanned pages
|
||||
DEFAULT_DPI = 300
|
||||
|
||||
# Minimum text length to consider a page has text
|
||||
MIN_TEXT_LENGTH = 50
|
||||
|
||||
# Maximum pages to sample for scan detection
|
||||
SAMPLE_PAGES = 10
|
||||
|
||||
def get_pdf_info(self, pdf_bytes: bytes) -> PdfInfo:
|
||||
"""
|
||||
Analyze PDF and return metadata.
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
|
||||
Returns:
|
||||
PdfInfo with document metadata
|
||||
"""
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
try:
|
||||
total_pages = len(doc)
|
||||
metadata = doc.metadata or {}
|
||||
|
||||
# Sample pages to determine if scanned
|
||||
text_pages = 0
|
||||
sample_count = min(total_pages, self.SAMPLE_PAGES)
|
||||
|
||||
# Sample from beginning, middle, and end
|
||||
if total_pages <= self.SAMPLE_PAGES:
|
||||
sample_indices = list(range(total_pages))
|
||||
else:
|
||||
sample_indices = [
|
||||
0, 1, 2, # Beginning
|
||||
total_pages // 2 - 1, total_pages // 2, total_pages // 2 + 1, # Middle
|
||||
total_pages - 3, total_pages - 2, total_pages - 1, # End
|
||||
]
|
||||
sample_indices = [i for i in sample_indices if 0 <= i < total_pages]
|
||||
|
||||
for page_idx in sample_indices:
|
||||
page = doc[page_idx]
|
||||
text = page.get_text().strip()
|
||||
if len(text) >= self.MIN_TEXT_LENGTH:
|
||||
text_pages += 1
|
||||
|
||||
# Consider it a scanned PDF if less than half of sampled pages have text
|
||||
has_text_layer = text_pages > 0
|
||||
is_scanned = text_pages < len(sample_indices) / 2
|
||||
|
||||
return PdfInfo(
|
||||
total_pages=total_pages,
|
||||
has_text_layer=has_text_layer,
|
||||
is_scanned=is_scanned,
|
||||
file_size_bytes=len(pdf_bytes),
|
||||
title=metadata.get("title"),
|
||||
author=metadata.get("author"),
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
def extract_text_from_page(
|
||||
self, pdf_bytes: bytes, page_number: int
|
||||
) -> PdfPageContent:
|
||||
"""
|
||||
Extract content from a single PDF page.
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
page_number: Zero-indexed page number
|
||||
|
||||
Returns:
|
||||
PdfPageContent with text and/or image
|
||||
"""
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
try:
|
||||
if page_number >= len(doc):
|
||||
raise ValueError(f"Page {page_number} does not exist (max: {len(doc) - 1})")
|
||||
|
||||
page = doc[page_number]
|
||||
text = page.get_text().strip()
|
||||
has_text = len(text) >= self.MIN_TEXT_LENGTH
|
||||
|
||||
rect = page.rect
|
||||
width = int(rect.width)
|
||||
height = int(rect.height)
|
||||
|
||||
# If page has text, we don't need to render
|
||||
image_bytes = None
|
||||
if not has_text:
|
||||
image_bytes = self._render_page_to_image(page, self.DEFAULT_DPI)
|
||||
|
||||
return PdfPageContent(
|
||||
page_number=page_number,
|
||||
has_text=has_text,
|
||||
text_content=text,
|
||||
image_bytes=image_bytes,
|
||||
width=width,
|
||||
height=height,
|
||||
)
|
||||
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
def extract_all_pages(
|
||||
self,
|
||||
pdf_bytes: bytes,
|
||||
dpi: int = DEFAULT_DPI,
|
||||
force_ocr: bool = False,
|
||||
) -> Iterator[PdfPageContent]:
|
||||
"""
|
||||
Extract content from all pages as a generator.
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
dpi: DPI for rendering scanned pages
|
||||
force_ocr: If True, render all pages regardless of text layer
|
||||
|
||||
Yields:
|
||||
PdfPageContent for each page
|
||||
"""
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
try:
|
||||
for page_number in range(len(doc)):
|
||||
page = doc[page_number]
|
||||
text = page.get_text().strip()
|
||||
has_text = len(text) >= self.MIN_TEXT_LENGTH
|
||||
|
||||
rect = page.rect
|
||||
width = int(rect.width)
|
||||
height = int(rect.height)
|
||||
|
||||
# Render to image if no text or force_ocr
|
||||
image_bytes = None
|
||||
if not has_text or force_ocr:
|
||||
image_bytes = self._render_page_to_image(page, dpi)
|
||||
|
||||
yield PdfPageContent(
|
||||
page_number=page_number,
|
||||
has_text=has_text,
|
||||
text_content=text if has_text else "",
|
||||
image_bytes=image_bytes,
|
||||
width=width,
|
||||
height=height,
|
||||
)
|
||||
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
def extract_page_range(
|
||||
self,
|
||||
pdf_bytes: bytes,
|
||||
start_page: int,
|
||||
end_page: int,
|
||||
dpi: int = DEFAULT_DPI,
|
||||
) -> list[PdfPageContent]:
|
||||
"""
|
||||
Extract content from a range of pages.
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
start_page: First page (zero-indexed)
|
||||
end_page: Last page (exclusive)
|
||||
dpi: DPI for rendering
|
||||
|
||||
Returns:
|
||||
List of PdfPageContent
|
||||
"""
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
try:
|
||||
results = []
|
||||
end_page = min(end_page, len(doc))
|
||||
|
||||
for page_number in range(start_page, end_page):
|
||||
page = doc[page_number]
|
||||
text = page.get_text().strip()
|
||||
has_text = len(text) >= self.MIN_TEXT_LENGTH
|
||||
|
||||
rect = page.rect
|
||||
width = int(rect.width)
|
||||
height = int(rect.height)
|
||||
|
||||
image_bytes = None
|
||||
if not has_text:
|
||||
image_bytes = self._render_page_to_image(page, dpi)
|
||||
|
||||
results.append(
|
||||
PdfPageContent(
|
||||
page_number=page_number,
|
||||
has_text=has_text,
|
||||
text_content=text if has_text else "",
|
||||
image_bytes=image_bytes,
|
||||
width=width,
|
||||
height=height,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
def find_maintenance_section(
|
||||
self, pdf_bytes: bytes, keywords: Optional[list[str]] = None
|
||||
) -> list[int]:
|
||||
"""
|
||||
Find pages likely containing maintenance schedules.
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
keywords: Keywords to search for (defaults to common terms)
|
||||
|
||||
Returns:
|
||||
List of page numbers likely containing maintenance info
|
||||
"""
|
||||
if keywords is None:
|
||||
keywords = [
|
||||
"maintenance schedule",
|
||||
"maintenance interval",
|
||||
"service schedule",
|
||||
"service interval",
|
||||
"recommended maintenance",
|
||||
"scheduled maintenance",
|
||||
"routine maintenance",
|
||||
"periodic maintenance",
|
||||
"owner's maintenance",
|
||||
"maintenance requirements",
|
||||
]
|
||||
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
try:
|
||||
maintenance_pages = []
|
||||
|
||||
for page_number in range(len(doc)):
|
||||
page = doc[page_number]
|
||||
text = page.get_text().lower()
|
||||
|
||||
for keyword in keywords:
|
||||
if keyword.lower() in text:
|
||||
maintenance_pages.append(page_number)
|
||||
break
|
||||
|
||||
return maintenance_pages
|
||||
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
def _render_page_to_image(self, page: fitz.Page, dpi: int) -> bytes:
|
||||
"""
|
||||
Render a PDF page to PNG image bytes.
|
||||
|
||||
Args:
|
||||
page: PyMuPDF page object
|
||||
dpi: Target DPI for rendering
|
||||
|
||||
Returns:
|
||||
PNG image bytes
|
||||
"""
|
||||
# Calculate scale factor from DPI
|
||||
# Default PDF resolution is 72 DPI
|
||||
scale = dpi / 72.0
|
||||
matrix = fitz.Matrix(scale, scale)
|
||||
|
||||
# Render page to pixmap
|
||||
pixmap = page.get_pixmap(matrix=matrix)
|
||||
|
||||
# Convert to PNG bytes
|
||||
png_bytes = pixmap.tobytes("png")
|
||||
|
||||
return png_bytes
|
||||
|
||||
def render_page_for_table_detection(
|
||||
self, pdf_bytes: bytes, page_number: int, dpi: int = 150
|
||||
) -> bytes:
|
||||
"""
|
||||
Render a page at lower DPI for table detection (faster).
|
||||
|
||||
Args:
|
||||
pdf_bytes: Raw PDF bytes
|
||||
page_number: Page to render
|
||||
dpi: DPI for rendering (lower for faster processing)
|
||||
|
||||
Returns:
|
||||
PNG image bytes
|
||||
"""
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
|
||||
try:
|
||||
if page_number >= len(doc):
|
||||
raise ValueError(f"Page {page_number} does not exist")
|
||||
|
||||
page = doc[page_number]
|
||||
return self._render_page_to_image(page, dpi)
|
||||
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
|
||||
# Singleton instance
|
||||
pdf_preprocessor = PdfPreprocessor()
|
||||
@@ -2,19 +2,24 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, File, Form, HTTPException, Query, UploadFile
|
||||
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, Query, UploadFile
|
||||
|
||||
from app.extractors.vin_extractor import vin_extractor
|
||||
from app.extractors.receipt_extractor import receipt_extractor
|
||||
from app.extractors.manual_extractor import manual_extractor
|
||||
from app.models import (
|
||||
BoundingBox,
|
||||
ManualExtractionResponse,
|
||||
ManualJobResponse,
|
||||
ManualMaintenanceSchedule,
|
||||
ManualVehicleInfo,
|
||||
OcrResponse,
|
||||
ReceiptExtractedField,
|
||||
ReceiptExtractionResponse,
|
||||
VinAlternative,
|
||||
VinExtractionResponse,
|
||||
)
|
||||
from app.services import ocr_service
|
||||
from app.services import ocr_service, job_queue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -23,6 +28,9 @@ router = APIRouter(prefix="/extract", tags=["extract"])
|
||||
# Maximum file size for synchronous processing (10MB)
|
||||
MAX_SYNC_SIZE = 10 * 1024 * 1024
|
||||
|
||||
# Maximum file size for manual/PDF processing (200MB)
|
||||
MAX_MANUAL_SIZE = 200 * 1024 * 1024
|
||||
|
||||
|
||||
@router.post("", response_model=OcrResponse)
|
||||
async def extract_text(
|
||||
@@ -257,3 +265,166 @@ async def extract_receipt(
|
||||
processingTimeMs=result.processing_time_ms,
|
||||
error=result.error,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/manual", response_model=ManualJobResponse)
|
||||
async def extract_manual(
|
||||
background_tasks: BackgroundTasks,
|
||||
file: UploadFile = File(..., description="Owner's manual PDF file"),
|
||||
vehicle_id: Optional[str] = Form(None, description="Vehicle ID for context"),
|
||||
) -> ManualJobResponse:
|
||||
"""
|
||||
Submit an async job to extract maintenance schedules from an owner's manual.
|
||||
|
||||
Supports PDF files up to 200MB. Processing is done asynchronously due to
|
||||
the time required for large documents.
|
||||
|
||||
Pipeline:
|
||||
1. Analyze PDF structure (text layer vs scanned)
|
||||
2. Find maintenance schedule sections
|
||||
3. Extract text or perform OCR on scanned pages
|
||||
4. Detect and parse maintenance tables
|
||||
5. Extract service intervals and fluid specifications
|
||||
|
||||
- **file**: Owner's manual PDF (max 200MB)
|
||||
- **vehicle_id**: Optional vehicle ID for context
|
||||
|
||||
Returns immediately with job_id. Poll GET /jobs/{job_id} for status and results.
|
||||
|
||||
Response when completed:
|
||||
- **vehicleInfo**: Detected make/model/year
|
||||
- **maintenanceSchedules**: List of extracted maintenance items with intervals
|
||||
- **rawTables**: Metadata about detected tables
|
||||
- **processingTimeMs**: Total processing time
|
||||
"""
|
||||
# Validate file presence
|
||||
if not file.filename:
|
||||
raise HTTPException(status_code=400, detail="No file provided")
|
||||
|
||||
# Validate file type
|
||||
content_type = file.content_type or ""
|
||||
if not content_type.startswith("application/pdf") and not file.filename.lower().endswith(".pdf"):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="File must be a PDF document",
|
||||
)
|
||||
|
||||
# Read file content
|
||||
content = await file.read()
|
||||
file_size = len(content)
|
||||
|
||||
# Validate file size
|
||||
if file_size > MAX_MANUAL_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File too large. Max: {MAX_MANUAL_SIZE // (1024*1024)}MB.",
|
||||
)
|
||||
|
||||
if file_size == 0:
|
||||
raise HTTPException(status_code=400, detail="Empty file provided")
|
||||
|
||||
logger.info(
|
||||
f"Manual extraction: {file.filename}, "
|
||||
f"size: {file_size} bytes, "
|
||||
f"vehicle_id: {vehicle_id}"
|
||||
)
|
||||
|
||||
# Estimate processing time based on file size
|
||||
# Rough estimate: 1 second per MB for native PDFs, 3 seconds for scanned
|
||||
estimated_seconds = max(30, (file_size // (1024 * 1024)) * 2)
|
||||
|
||||
# Submit job to queue
|
||||
job_id = await job_queue.submit_manual_job(
|
||||
file_bytes=content,
|
||||
vehicle_id=vehicle_id,
|
||||
)
|
||||
|
||||
# Schedule background processing
|
||||
background_tasks.add_task(process_manual_job, job_id)
|
||||
|
||||
# Return initial status
|
||||
return ManualJobResponse(
|
||||
jobId=job_id,
|
||||
status="pending",
|
||||
progress=0,
|
||||
estimatedSeconds=estimated_seconds,
|
||||
)
|
||||
|
||||
|
||||
async def process_manual_job(job_id: str) -> None:
|
||||
"""Background task to process a manual extraction job."""
|
||||
import asyncio
|
||||
|
||||
logger.info(f"Starting manual extraction job {job_id}")
|
||||
|
||||
try:
|
||||
# Update status to processing
|
||||
await job_queue.update_manual_job_progress(job_id, 5, "Starting extraction")
|
||||
|
||||
# Get job data
|
||||
file_bytes = await job_queue.get_job_data(job_id)
|
||||
if not file_bytes:
|
||||
await job_queue.fail_manual_job(job_id, "Job data not found")
|
||||
return
|
||||
|
||||
# Define progress callback
|
||||
async def progress_callback(percent: int, message: str) -> None:
|
||||
await job_queue.update_manual_job_progress(job_id, percent, message)
|
||||
|
||||
# Run extraction in thread pool (CPU-bound)
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def sync_progress_callback(percent: int, message: str) -> None:
|
||||
# Schedule the async update
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
job_queue.update_manual_job_progress(job_id, percent, message),
|
||||
loop,
|
||||
)
|
||||
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: manual_extractor.extract(
|
||||
pdf_bytes=file_bytes,
|
||||
progress_callback=sync_progress_callback,
|
||||
),
|
||||
)
|
||||
|
||||
if result.success:
|
||||
# Convert to response model
|
||||
vehicle_info = None
|
||||
if result.vehicle_info:
|
||||
vehicle_info = ManualVehicleInfo(
|
||||
make=result.vehicle_info.make,
|
||||
model=result.vehicle_info.model,
|
||||
year=result.vehicle_info.year,
|
||||
)
|
||||
|
||||
schedules = [
|
||||
ManualMaintenanceSchedule(
|
||||
service=s.service,
|
||||
intervalMiles=s.interval_miles,
|
||||
intervalMonths=s.interval_months,
|
||||
details=s.details,
|
||||
confidence=s.confidence,
|
||||
subtypes=s.subtypes,
|
||||
)
|
||||
for s in result.maintenance_schedules
|
||||
]
|
||||
|
||||
response = ManualExtractionResponse(
|
||||
success=True,
|
||||
vehicleInfo=vehicle_info,
|
||||
maintenanceSchedules=schedules,
|
||||
rawTables=result.raw_tables,
|
||||
processingTimeMs=result.processing_time_ms,
|
||||
totalPages=result.total_pages,
|
||||
pagesProcessed=result.pages_processed,
|
||||
)
|
||||
|
||||
await job_queue.complete_manual_job(job_id, response)
|
||||
else:
|
||||
await job_queue.fail_manual_job(job_id, result.error or "Extraction failed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Manual job {job_id} failed: {e}", exc_info=True)
|
||||
await job_queue.fail_manual_job(job_id, str(e))
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
"""Async OCR job endpoints."""
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile
|
||||
|
||||
from app.models import JobResponse, JobSubmitRequest
|
||||
from app.models import JobResponse, JobSubmitRequest, ManualJobResponse
|
||||
from app.services import job_queue, ocr_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -73,12 +73,13 @@ async def submit_job(
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{job_id}", response_model=JobResponse)
|
||||
async def get_job_status(job_id: str) -> JobResponse:
|
||||
@router.get("/{job_id}", response_model=Union[JobResponse, ManualJobResponse])
|
||||
async def get_job_status(job_id: str) -> Union[JobResponse, ManualJobResponse]:
|
||||
"""
|
||||
Get the status of an async OCR job.
|
||||
|
||||
Poll this endpoint to check job progress and retrieve results.
|
||||
Works for both regular OCR jobs and manual extraction jobs.
|
||||
|
||||
Returns:
|
||||
- **pending**: Job is queued
|
||||
@@ -86,15 +87,20 @@ async def get_job_status(job_id: str) -> JobResponse:
|
||||
- **completed**: Job finished successfully (includes result)
|
||||
- **failed**: Job failed (includes error message)
|
||||
"""
|
||||
# Try regular job first
|
||||
result = await job_queue.get_job_status(job_id)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
if result is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Job {job_id} not found. Jobs expire after 1 hour.",
|
||||
)
|
||||
# Try manual job
|
||||
manual_result = await job_queue.get_manual_job_status(job_id)
|
||||
if manual_result is not None:
|
||||
return manual_result
|
||||
|
||||
return result
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Job {job_id} not found. Jobs expire after 1-2 hours.",
|
||||
)
|
||||
|
||||
|
||||
async def process_job(job_id: str) -> None:
|
||||
|
||||
@@ -3,23 +3,34 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Optional
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
import redis.asyncio as redis
|
||||
|
||||
from app.config import settings
|
||||
from app.models import JobResponse, JobStatus, OcrResponse
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.models import ManualExtractionResponse, ManualJobResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Job TTL in seconds (1 hour)
|
||||
JOB_TTL = 3600
|
||||
|
||||
# Manual job TTL (2 hours for larger files)
|
||||
MANUAL_JOB_TTL = 7200
|
||||
|
||||
# Key prefixes
|
||||
JOB_PREFIX = "ocr:job:"
|
||||
JOB_DATA_PREFIX = "ocr:job:data:"
|
||||
JOB_RESULT_PREFIX = "ocr:job:result:"
|
||||
|
||||
# Manual job prefixes
|
||||
MANUAL_JOB_PREFIX = "ocr:manual:job:"
|
||||
MANUAL_JOB_DATA_PREFIX = "ocr:manual:job:data:"
|
||||
MANUAL_JOB_RESULT_PREFIX = "ocr:manual:job:result:"
|
||||
|
||||
|
||||
class JobQueue:
|
||||
"""Manages async OCR jobs using Redis."""
|
||||
@@ -228,6 +239,156 @@ class JobQueue:
|
||||
except Exception as e:
|
||||
logger.error(f"Callback failed for job {job_id}: {e}")
|
||||
|
||||
# Manual extraction job methods
|
||||
|
||||
async def submit_manual_job(
|
||||
self,
|
||||
file_bytes: bytes,
|
||||
vehicle_id: Optional[str] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Submit a new manual extraction job.
|
||||
|
||||
Args:
|
||||
file_bytes: Raw PDF bytes
|
||||
vehicle_id: Optional vehicle ID for context
|
||||
|
||||
Returns:
|
||||
Job ID
|
||||
"""
|
||||
r = await self.get_redis()
|
||||
job_id = str(uuid.uuid4())
|
||||
|
||||
# Store job metadata
|
||||
job_meta = {
|
||||
"status": JobStatus.PENDING.value,
|
||||
"progress": 0,
|
||||
"progress_message": "",
|
||||
"vehicle_id": vehicle_id or "",
|
||||
"job_type": "manual",
|
||||
}
|
||||
|
||||
# Store file data separately (binary)
|
||||
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
|
||||
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
|
||||
|
||||
# Use pipeline for atomic operation
|
||||
async with r.pipeline() as pipe:
|
||||
# Store metadata as hash
|
||||
await pipe.hset(meta_key, mapping=job_meta) # type: ignore
|
||||
await pipe.expire(meta_key, MANUAL_JOB_TTL)
|
||||
|
||||
# Store binary data
|
||||
await pipe.set(data_key, file_bytes)
|
||||
await pipe.expire(data_key, MANUAL_JOB_TTL)
|
||||
|
||||
await pipe.execute()
|
||||
|
||||
logger.info(f"Manual job {job_id} submitted")
|
||||
return job_id
|
||||
|
||||
async def get_manual_job_status(self, job_id: str) -> Optional["ManualJobResponse"]:
|
||||
"""
|
||||
Get the status of a manual extraction job.
|
||||
|
||||
Args:
|
||||
job_id: Job ID to check
|
||||
|
||||
Returns:
|
||||
ManualJobResponse or None if job doesn't exist
|
||||
"""
|
||||
from app.models import ManualJobResponse, ManualExtractionResponse
|
||||
|
||||
r = await self.get_redis()
|
||||
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
|
||||
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
|
||||
|
||||
# Get job metadata
|
||||
meta = await r.hgetall(meta_key) # type: ignore
|
||||
if not meta:
|
||||
return None
|
||||
|
||||
status = JobStatus(meta.get("status", JobStatus.PENDING.value))
|
||||
progress = int(meta.get("progress", 0))
|
||||
error = meta.get("error")
|
||||
|
||||
# Get result if completed
|
||||
result = None
|
||||
if status == JobStatus.COMPLETED:
|
||||
result_json = await r.get(result_key)
|
||||
if result_json:
|
||||
result_dict = json.loads(result_json)
|
||||
result = ManualExtractionResponse(**result_dict)
|
||||
|
||||
return ManualJobResponse(
|
||||
jobId=job_id,
|
||||
status=status,
|
||||
progress=progress if status == JobStatus.PROCESSING else None,
|
||||
result=result,
|
||||
error=error if status == JobStatus.FAILED else None,
|
||||
)
|
||||
|
||||
async def update_manual_job_progress(
|
||||
self, job_id: str, progress: int, message: str = ""
|
||||
) -> None:
|
||||
"""Update manual job progress percentage and message."""
|
||||
r = await self.get_redis()
|
||||
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
|
||||
|
||||
await r.hset(meta_key, mapping={ # type: ignore
|
||||
"status": JobStatus.PROCESSING.value,
|
||||
"progress": progress,
|
||||
"progress_message": message,
|
||||
})
|
||||
|
||||
async def complete_manual_job(
|
||||
self, job_id: str, result: "ManualExtractionResponse"
|
||||
) -> None:
|
||||
"""Mark manual job as completed with result."""
|
||||
r = await self.get_redis()
|
||||
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
|
||||
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
|
||||
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
|
||||
|
||||
# Store result
|
||||
result_dict = result.model_dump(by_alias=True)
|
||||
result_json = json.dumps(result_dict)
|
||||
|
||||
async with r.pipeline() as pipe:
|
||||
# Update status
|
||||
await pipe.hset(meta_key, mapping={ # type: ignore
|
||||
"status": JobStatus.COMPLETED.value,
|
||||
"progress": 100,
|
||||
})
|
||||
|
||||
# Store result
|
||||
await pipe.set(result_key, result_json)
|
||||
await pipe.expire(result_key, MANUAL_JOB_TTL)
|
||||
|
||||
# Delete file data (no longer needed)
|
||||
await pipe.delete(data_key)
|
||||
|
||||
await pipe.execute()
|
||||
|
||||
logger.info(f"Manual job {job_id} completed")
|
||||
|
||||
async def fail_manual_job(self, job_id: str, error: str) -> None:
|
||||
"""Mark manual job as failed with error message."""
|
||||
r = await self.get_redis()
|
||||
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
|
||||
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
|
||||
|
||||
async with r.pipeline() as pipe:
|
||||
await pipe.hset(meta_key, mapping={ # type: ignore
|
||||
"status": JobStatus.FAILED.value,
|
||||
"error": error,
|
||||
})
|
||||
# Delete file data
|
||||
await pipe.delete(data_key)
|
||||
await pipe.execute()
|
||||
|
||||
logger.error(f"Manual job {job_id} failed: {error}")
|
||||
|
||||
|
||||
# Singleton instance
|
||||
job_queue = JobQueue()
|
||||
|
||||
12
ocr/app/table_extraction/__init__.py
Normal file
12
ocr/app/table_extraction/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""Table extraction components for maintenance schedule parsing."""
|
||||
from app.table_extraction.detector import TableDetector, table_detector, DetectedTable
|
||||
from app.table_extraction.parser import TableParser, table_parser, ParsedScheduleRow
|
||||
|
||||
__all__ = [
|
||||
"TableDetector",
|
||||
"table_detector",
|
||||
"DetectedTable",
|
||||
"TableParser",
|
||||
"table_parser",
|
||||
"ParsedScheduleRow",
|
||||
]
|
||||
322
ocr/app/table_extraction/detector.py
Normal file
322
ocr/app/table_extraction/detector.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""Table detection for maintenance schedule extraction."""
|
||||
import io
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectedTable:
|
||||
"""A detected table in a document."""
|
||||
|
||||
page_number: int
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
confidence: float
|
||||
is_maintenance_table: bool
|
||||
header_row: Optional[list[str]] = None
|
||||
raw_content: list[list[str]] = field(default_factory=list)
|
||||
|
||||
|
||||
class TableDetector:
|
||||
"""Detect tables in document pages.
|
||||
|
||||
Uses computer vision techniques to identify table regions:
|
||||
1. Line detection for bordered tables
|
||||
2. Text alignment analysis for borderless tables
|
||||
3. Header keyword matching for maintenance schedule identification
|
||||
"""
|
||||
|
||||
# Keywords indicating maintenance schedule table headers
|
||||
MAINTENANCE_HEADERS = [
|
||||
"service", "maintenance", "item", "operation",
|
||||
"miles", "mi", "km", "kilometers",
|
||||
"months", "mo", "interval",
|
||||
"check", "replace", "inspect", "change",
|
||||
"schedule", "frequency",
|
||||
]
|
||||
|
||||
# Keywords in content that indicate maintenance
|
||||
MAINTENANCE_CONTENT_KEYWORDS = [
|
||||
"oil", "filter", "brake", "tire", "coolant",
|
||||
"fluid", "spark plug", "belt", "hose",
|
||||
"inspect", "replace", "change", "check",
|
||||
]
|
||||
|
||||
def detect_tables_in_image(
|
||||
self, image_bytes: bytes, page_number: int = 0
|
||||
) -> list[DetectedTable]:
|
||||
"""
|
||||
Detect tables in an image using line detection.
|
||||
|
||||
Args:
|
||||
image_bytes: PNG/JPEG image bytes
|
||||
page_number: Page number for the result
|
||||
|
||||
Returns:
|
||||
List of DetectedTable objects
|
||||
"""
|
||||
# Load image
|
||||
nparr = np.frombuffer(image_bytes, np.uint8)
|
||||
img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
|
||||
|
||||
if img is None:
|
||||
logger.warning("Failed to decode image for table detection")
|
||||
return []
|
||||
|
||||
# Apply threshold
|
||||
_, binary = cv2.threshold(img, 150, 255, cv2.THRESH_BINARY_INV)
|
||||
|
||||
# Detect horizontal lines
|
||||
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
|
||||
horizontal_lines = cv2.morphologyEx(
|
||||
binary, cv2.MORPH_OPEN, horizontal_kernel, iterations=2
|
||||
)
|
||||
|
||||
# Detect vertical lines
|
||||
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40))
|
||||
vertical_lines = cv2.morphologyEx(
|
||||
binary, cv2.MORPH_OPEN, vertical_kernel, iterations=2
|
||||
)
|
||||
|
||||
# Combine lines
|
||||
table_mask = cv2.add(horizontal_lines, vertical_lines)
|
||||
|
||||
# Find contours
|
||||
contours, _ = cv2.findContours(
|
||||
table_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
|
||||
)
|
||||
|
||||
tables = []
|
||||
height, width = img.shape[:2]
|
||||
|
||||
for contour in contours:
|
||||
x, y, w, h = cv2.boundingRect(contour)
|
||||
|
||||
# Filter by size (tables should be reasonably large)
|
||||
if w < width * 0.3 or h < height * 0.05:
|
||||
continue
|
||||
if w > width * 0.95 and h > height * 0.95:
|
||||
continue # Skip full-page rectangles
|
||||
|
||||
# Calculate confidence based on aspect ratio and size
|
||||
aspect_ratio = w / h if h > 0 else 0
|
||||
size_ratio = (w * h) / (width * height)
|
||||
|
||||
# Tables typically have reasonable aspect ratios
|
||||
if 0.5 <= aspect_ratio <= 10 and 0.01 <= size_ratio <= 0.8:
|
||||
confidence = min(0.9, 0.5 + size_ratio + (1 - abs(aspect_ratio - 2) / 10))
|
||||
|
||||
tables.append(
|
||||
DetectedTable(
|
||||
page_number=page_number,
|
||||
x=x,
|
||||
y=y,
|
||||
width=w,
|
||||
height=h,
|
||||
confidence=confidence,
|
||||
is_maintenance_table=False, # Will be determined later
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f"Detected {len(tables)} potential tables on page {page_number}")
|
||||
return tables
|
||||
|
||||
def detect_tables_in_text(
|
||||
self, text: str, page_number: int = 0
|
||||
) -> list[DetectedTable]:
|
||||
"""
|
||||
Detect table-like structures in text using pattern analysis.
|
||||
|
||||
Useful for native PDFs where text is available.
|
||||
|
||||
Args:
|
||||
text: Extracted text content
|
||||
page_number: Page number
|
||||
|
||||
Returns:
|
||||
List of DetectedTable with content populated
|
||||
"""
|
||||
tables = []
|
||||
lines = text.split("\n")
|
||||
|
||||
# Look for patterns that suggest tabular data
|
||||
# - Multiple columns separated by whitespace or tabs
|
||||
# - Consistent column alignment across rows
|
||||
|
||||
current_table_lines: list[str] = []
|
||||
in_table = False
|
||||
table_start_idx = 0
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
# Check if line looks like table row
|
||||
is_table_row = self._is_table_row(line)
|
||||
|
||||
if is_table_row:
|
||||
if not in_table:
|
||||
in_table = True
|
||||
table_start_idx = i
|
||||
current_table_lines = []
|
||||
current_table_lines.append(line)
|
||||
else:
|
||||
if in_table and len(current_table_lines) >= 3:
|
||||
# End of table, process it
|
||||
table = self._process_text_table(
|
||||
current_table_lines, page_number, table_start_idx
|
||||
)
|
||||
if table:
|
||||
tables.append(table)
|
||||
in_table = False
|
||||
current_table_lines = []
|
||||
|
||||
# Handle table at end of text
|
||||
if in_table and len(current_table_lines) >= 3:
|
||||
table = self._process_text_table(
|
||||
current_table_lines, page_number, table_start_idx
|
||||
)
|
||||
if table:
|
||||
tables.append(table)
|
||||
|
||||
return tables
|
||||
|
||||
def is_maintenance_table(
|
||||
self, table: DetectedTable, full_text: Optional[str] = None
|
||||
) -> bool:
|
||||
"""
|
||||
Determine if a detected table is a maintenance schedule.
|
||||
|
||||
Args:
|
||||
table: Detected table to analyze
|
||||
full_text: Optional surrounding text for context
|
||||
|
||||
Returns:
|
||||
True if likely a maintenance schedule table
|
||||
"""
|
||||
# Check header row for maintenance keywords
|
||||
if table.header_row:
|
||||
header_text = " ".join(table.header_row).lower()
|
||||
header_matches = sum(
|
||||
1 for kw in self.MAINTENANCE_HEADERS if kw in header_text
|
||||
)
|
||||
if header_matches >= 2:
|
||||
return True
|
||||
|
||||
# Check content for maintenance keywords
|
||||
if table.raw_content:
|
||||
content_text = " ".join(
|
||||
" ".join(row) for row in table.raw_content
|
||||
).lower()
|
||||
content_matches = sum(
|
||||
1 for kw in self.MAINTENANCE_CONTENT_KEYWORDS if kw in content_text
|
||||
)
|
||||
if content_matches >= 3:
|
||||
return True
|
||||
|
||||
# Check surrounding text
|
||||
if full_text:
|
||||
text_lower = full_text.lower()
|
||||
context_keywords = [
|
||||
"maintenance schedule",
|
||||
"service schedule",
|
||||
"maintenance interval",
|
||||
"recommended maintenance",
|
||||
]
|
||||
if any(kw in text_lower for kw in context_keywords):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _is_table_row(self, line: str) -> bool:
|
||||
"""Check if a line looks like a table row."""
|
||||
# Skip empty lines
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
return False
|
||||
|
||||
# Check for multiple whitespace-separated columns
|
||||
parts = re.split(r"\s{2,}|\t", stripped)
|
||||
if len(parts) >= 2:
|
||||
# At least 2 columns with content
|
||||
non_empty = [p for p in parts if p.strip()]
|
||||
return len(non_empty) >= 2
|
||||
|
||||
# Check for common table patterns
|
||||
# e.g., "Service Item 5,000 miles 6 months"
|
||||
if re.search(r"\d+[,.]?\d*\s*(miles?|mi\.?|km|months?|mo\.?)", stripped, re.I):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _process_text_table(
|
||||
self, lines: list[str], page_number: int, start_line: int
|
||||
) -> Optional[DetectedTable]:
|
||||
"""Process extracted text lines into a table structure."""
|
||||
if not lines:
|
||||
return None
|
||||
|
||||
# Parse rows
|
||||
rows = []
|
||||
for line in lines:
|
||||
# Split on multiple whitespace or tabs
|
||||
parts = re.split(r"\s{2,}|\t", line.strip())
|
||||
cells = [p.strip() for p in parts if p.strip()]
|
||||
if cells:
|
||||
rows.append(cells)
|
||||
|
||||
if len(rows) < 2:
|
||||
return None
|
||||
|
||||
# First row is likely header
|
||||
header_row = rows[0]
|
||||
|
||||
# Check if this looks like a maintenance table
|
||||
table = DetectedTable(
|
||||
page_number=page_number,
|
||||
x=0, # Text tables don't have coordinates
|
||||
y=start_line,
|
||||
width=0,
|
||||
height=len(rows),
|
||||
confidence=0.7,
|
||||
is_maintenance_table=False,
|
||||
header_row=header_row,
|
||||
raw_content=rows[1:],
|
||||
)
|
||||
|
||||
# Determine if it's a maintenance table
|
||||
table.is_maintenance_table = self.is_maintenance_table(table)
|
||||
|
||||
if table.is_maintenance_table:
|
||||
table.confidence = 0.85
|
||||
|
||||
return table
|
||||
|
||||
def extract_table_text_from_region(
|
||||
self, image_bytes: bytes, table: DetectedTable
|
||||
) -> list[list[str]]:
|
||||
"""
|
||||
Extract text from a table region using OCR.
|
||||
|
||||
Args:
|
||||
image_bytes: Full page image
|
||||
table: Detected table with coordinates
|
||||
|
||||
Returns:
|
||||
2D list of cell contents
|
||||
"""
|
||||
# This would use Tesseract on the cropped region
|
||||
# For now, return empty - actual OCR will be done in manual_extractor
|
||||
logger.debug(f"Table region: ({table.x}, {table.y}) {table.width}x{table.height}")
|
||||
return []
|
||||
|
||||
|
||||
# Singleton instance
|
||||
table_detector = TableDetector()
|
||||
357
ocr/app/table_extraction/parser.py
Normal file
357
ocr/app/table_extraction/parser.py
Normal file
@@ -0,0 +1,357 @@
|
||||
"""Parse maintenance schedule tables into structured data."""
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
from app.patterns.maintenance_patterns import maintenance_matcher
|
||||
from app.patterns.service_mapping import service_mapper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ParsedScheduleRow:
|
||||
"""A parsed maintenance schedule row."""
|
||||
|
||||
service: str
|
||||
normalized_service: Optional[str]
|
||||
subtypes: list[str]
|
||||
interval_miles: Optional[int]
|
||||
interval_months: Optional[int]
|
||||
details: Optional[str]
|
||||
fluid_spec: Optional[str]
|
||||
confidence: float
|
||||
raw_row: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class TableParser:
|
||||
"""Parse detected tables into maintenance schedules.
|
||||
|
||||
Handles various table formats:
|
||||
- Service | Miles | Months | Notes
|
||||
- Service | Interval | Description
|
||||
- Miles/Months header with service rows
|
||||
"""
|
||||
|
||||
# Common column header patterns
|
||||
COLUMN_PATTERNS = {
|
||||
"service": [
|
||||
r"service", r"item", r"maintenance", r"operation",
|
||||
r"component", r"part", r"system", r"description",
|
||||
],
|
||||
"miles": [
|
||||
r"miles?", r"mi\.?", r"mileage", r"odometer",
|
||||
r"km", r"kilometers?",
|
||||
],
|
||||
"months": [
|
||||
r"months?", r"mo\.?", r"time", r"interval",
|
||||
r"years?", r"yr\.?",
|
||||
],
|
||||
"details": [
|
||||
r"notes?", r"details?", r"remarks?", r"comments?",
|
||||
r"specification", r"specs?", r"procedure",
|
||||
],
|
||||
}
|
||||
|
||||
def parse_table(
|
||||
self,
|
||||
header_row: list[str],
|
||||
data_rows: list[list[str]],
|
||||
) -> list[ParsedScheduleRow]:
|
||||
"""
|
||||
Parse a maintenance table into structured schedule rows.
|
||||
|
||||
Args:
|
||||
header_row: Table header cells
|
||||
data_rows: Table data rows
|
||||
|
||||
Returns:
|
||||
List of ParsedScheduleRow objects
|
||||
"""
|
||||
# Identify column types
|
||||
column_types = self._identify_columns(header_row)
|
||||
|
||||
if not column_types:
|
||||
logger.warning("Could not identify table columns")
|
||||
return self._parse_without_headers(data_rows)
|
||||
|
||||
results = []
|
||||
|
||||
for row in data_rows:
|
||||
parsed = self._parse_row(row, column_types)
|
||||
if parsed:
|
||||
results.append(parsed)
|
||||
|
||||
return results
|
||||
|
||||
def parse_text_block(self, text: str) -> list[ParsedScheduleRow]:
|
||||
"""
|
||||
Parse maintenance schedules from unstructured text.
|
||||
|
||||
Useful when table detection fails but text contains schedule info.
|
||||
|
||||
Args:
|
||||
text: Text block that may contain maintenance schedules
|
||||
|
||||
Returns:
|
||||
List of ParsedScheduleRow objects
|
||||
"""
|
||||
results = []
|
||||
lines = text.split("\n")
|
||||
|
||||
for line in lines:
|
||||
# Look for lines with service + interval pattern
|
||||
service_match = service_mapper.map_service(line)
|
||||
mileage_match = maintenance_matcher.extract_mileage_interval(line)
|
||||
time_match = maintenance_matcher.extract_time_interval(line)
|
||||
|
||||
if service_match and (mileage_match or time_match):
|
||||
# Extract fluid spec if present
|
||||
fluid_match = maintenance_matcher.extract_fluid_spec(line)
|
||||
|
||||
results.append(
|
||||
ParsedScheduleRow(
|
||||
service=line.strip(),
|
||||
normalized_service=service_match.normalized_name,
|
||||
subtypes=service_match.subtypes,
|
||||
interval_miles=mileage_match.value if mileage_match else None,
|
||||
interval_months=time_match.value if time_match else None,
|
||||
details=None,
|
||||
fluid_spec=fluid_match.value if fluid_match else None,
|
||||
confidence=min(
|
||||
service_match.confidence,
|
||||
mileage_match.confidence if mileage_match else 1.0,
|
||||
time_match.confidence if time_match else 1.0,
|
||||
),
|
||||
raw_row=[line],
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def _identify_columns(
|
||||
self, header_row: list[str]
|
||||
) -> dict[int, str]:
|
||||
"""
|
||||
Identify column types from header row.
|
||||
|
||||
Args:
|
||||
header_row: Table header cells
|
||||
|
||||
Returns:
|
||||
Dict mapping column index to type
|
||||
"""
|
||||
column_types: dict[int, str] = {}
|
||||
|
||||
for i, header in enumerate(header_row):
|
||||
header_lower = header.lower().strip()
|
||||
|
||||
for col_type, patterns in self.COLUMN_PATTERNS.items():
|
||||
for pattern in patterns:
|
||||
if re.search(pattern, header_lower, re.IGNORECASE):
|
||||
column_types[i] = col_type
|
||||
break
|
||||
if i in column_types:
|
||||
break
|
||||
|
||||
# If no service column found, assume first column
|
||||
if "service" not in column_types.values() and header_row:
|
||||
for i, header in enumerate(header_row):
|
||||
if i not in column_types:
|
||||
column_types[i] = "service"
|
||||
break
|
||||
|
||||
return column_types
|
||||
|
||||
def _parse_row(
|
||||
self,
|
||||
row: list[str],
|
||||
column_types: dict[int, str],
|
||||
) -> Optional[ParsedScheduleRow]:
|
||||
"""
|
||||
Parse a single data row using identified column types.
|
||||
|
||||
Args:
|
||||
row: Table row cells
|
||||
column_types: Column index to type mapping
|
||||
|
||||
Returns:
|
||||
ParsedScheduleRow or None
|
||||
"""
|
||||
service = ""
|
||||
interval_miles: Optional[int] = None
|
||||
interval_months: Optional[int] = None
|
||||
details: Optional[str] = None
|
||||
fluid_spec: Optional[str] = None
|
||||
|
||||
# Extract values based on column types
|
||||
for i, cell in enumerate(row):
|
||||
cell_value = cell.strip()
|
||||
if not cell_value:
|
||||
continue
|
||||
|
||||
col_type = column_types.get(i)
|
||||
|
||||
if col_type == "service":
|
||||
service = cell_value
|
||||
elif col_type == "miles":
|
||||
miles = self._extract_miles(cell_value)
|
||||
if miles:
|
||||
interval_miles = miles
|
||||
elif col_type == "months":
|
||||
months = self._extract_months(cell_value)
|
||||
if months:
|
||||
interval_months = months
|
||||
elif col_type == "details":
|
||||
details = cell_value
|
||||
# Also check for fluid specs in details
|
||||
fluid_match = maintenance_matcher.extract_fluid_spec(cell_value)
|
||||
if fluid_match:
|
||||
fluid_spec = fluid_match.value
|
||||
|
||||
# If no explicit miles/months columns, try to extract from service text
|
||||
if not interval_miles and not interval_months:
|
||||
mileage_match = maintenance_matcher.extract_mileage_interval(service)
|
||||
time_match = maintenance_matcher.extract_time_interval(service)
|
||||
if mileage_match:
|
||||
interval_miles = mileage_match.value
|
||||
if time_match:
|
||||
interval_months = time_match.value
|
||||
|
||||
# Check for intervals in any cell
|
||||
if not interval_miles:
|
||||
for cell in row:
|
||||
mileage_match = maintenance_matcher.extract_mileage_interval(cell)
|
||||
if mileage_match:
|
||||
interval_miles = mileage_match.value
|
||||
break
|
||||
|
||||
if not interval_months:
|
||||
for cell in row:
|
||||
time_match = maintenance_matcher.extract_time_interval(cell)
|
||||
if time_match:
|
||||
interval_months = time_match.value
|
||||
break
|
||||
|
||||
# Skip if no service identified
|
||||
if not service:
|
||||
return None
|
||||
|
||||
# Map service to normalized name and subtypes
|
||||
service_match = service_mapper.map_service(service)
|
||||
|
||||
normalized_service = service_match.normalized_name if service_match else None
|
||||
subtypes = service_match.subtypes if service_match else []
|
||||
service_confidence = service_match.confidence if service_match else 0.5
|
||||
|
||||
# Calculate overall confidence
|
||||
interval_confidence = 0.0
|
||||
if interval_miles:
|
||||
interval_confidence = max(interval_confidence, 0.8)
|
||||
if interval_months:
|
||||
interval_confidence = max(interval_confidence, 0.8)
|
||||
|
||||
confidence = (service_confidence + interval_confidence) / 2 if interval_confidence else service_confidence * 0.7
|
||||
|
||||
return ParsedScheduleRow(
|
||||
service=service,
|
||||
normalized_service=normalized_service,
|
||||
subtypes=subtypes,
|
||||
interval_miles=interval_miles,
|
||||
interval_months=interval_months,
|
||||
details=details,
|
||||
fluid_spec=fluid_spec,
|
||||
confidence=confidence,
|
||||
raw_row=row,
|
||||
)
|
||||
|
||||
def _parse_without_headers(
|
||||
self, data_rows: list[list[str]]
|
||||
) -> list[ParsedScheduleRow]:
|
||||
"""
|
||||
Parse table without clear headers by analyzing content.
|
||||
|
||||
Args:
|
||||
data_rows: Table rows
|
||||
|
||||
Returns:
|
||||
List of ParsedScheduleRow
|
||||
"""
|
||||
results = []
|
||||
|
||||
for row in data_rows:
|
||||
if not row:
|
||||
continue
|
||||
|
||||
# Join all cells and try to extract info
|
||||
row_text = " ".join(row)
|
||||
|
||||
service_match = service_mapper.map_service(row_text)
|
||||
mileage_match = maintenance_matcher.extract_mileage_interval(row_text)
|
||||
time_match = maintenance_matcher.extract_time_interval(row_text)
|
||||
fluid_match = maintenance_matcher.extract_fluid_spec(row_text)
|
||||
|
||||
if service_match:
|
||||
results.append(
|
||||
ParsedScheduleRow(
|
||||
service=row[0] if row else row_text,
|
||||
normalized_service=service_match.normalized_name,
|
||||
subtypes=service_match.subtypes,
|
||||
interval_miles=mileage_match.value if mileage_match else None,
|
||||
interval_months=time_match.value if time_match else None,
|
||||
details=None,
|
||||
fluid_spec=fluid_match.value if fluid_match else None,
|
||||
confidence=service_match.confidence * 0.8, # Reduce for no-header parsing
|
||||
raw_row=row,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def _extract_miles(self, text: str) -> Optional[int]:
|
||||
"""Extract mileage value from cell text."""
|
||||
# First try pattern matcher
|
||||
match = maintenance_matcher.extract_mileage_interval(text)
|
||||
if match:
|
||||
return match.value
|
||||
|
||||
# Try simple number extraction
|
||||
# Look for patterns like "5,000", "5000", "5K"
|
||||
number_match = re.search(r"([\d,]+)(?:K)?", text.replace(" ", ""), re.IGNORECASE)
|
||||
if number_match:
|
||||
num_str = number_match.group(1).replace(",", "")
|
||||
try:
|
||||
value = int(num_str)
|
||||
# Handle "5K" notation
|
||||
if "K" in text.upper() and value < 1000:
|
||||
value *= 1000
|
||||
if 500 <= value <= 150000:
|
||||
return value
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _extract_months(self, text: str) -> Optional[int]:
|
||||
"""Extract month interval from cell text."""
|
||||
# First try pattern matcher
|
||||
match = maintenance_matcher.extract_time_interval(text)
|
||||
if match:
|
||||
return match.value
|
||||
|
||||
# Try simple number extraction
|
||||
number_match = re.search(r"(\d+)", text)
|
||||
if number_match:
|
||||
try:
|
||||
value = int(number_match.group(1))
|
||||
if 1 <= value <= 120:
|
||||
return value
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# Singleton instance
|
||||
table_parser = TableParser()
|
||||
@@ -16,6 +16,9 @@ numpy>=1.24.0
|
||||
# OCR Engines
|
||||
pytesseract>=0.3.10
|
||||
|
||||
# PDF Processing
|
||||
PyMuPDF>=1.23.0
|
||||
|
||||
# Redis for job queue
|
||||
redis>=5.0.0
|
||||
|
||||
|
||||
164
ocr/tests/test_maintenance_patterns.py
Normal file
164
ocr/tests/test_maintenance_patterns.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""Tests for maintenance pattern matching."""
|
||||
import pytest
|
||||
|
||||
from app.patterns.maintenance_patterns import maintenance_matcher
|
||||
|
||||
|
||||
class TestMileageIntervalExtraction:
|
||||
"""Tests for mileage interval extraction."""
|
||||
|
||||
def test_every_miles_pattern(self) -> None:
|
||||
"""Test 'every X miles' pattern."""
|
||||
result = maintenance_matcher.extract_mileage_interval("every 5,000 miles")
|
||||
assert result is not None
|
||||
assert result.value == 5000
|
||||
assert result.confidence >= 0.9
|
||||
|
||||
def test_every_miles_no_comma(self) -> None:
|
||||
"""Test 'every X miles' without comma."""
|
||||
result = maintenance_matcher.extract_mileage_interval("every 5000 miles")
|
||||
assert result is not None
|
||||
assert result.value == 5000
|
||||
|
||||
def test_at_miles_pattern(self) -> None:
|
||||
"""Test 'at X miles' pattern."""
|
||||
result = maintenance_matcher.extract_mileage_interval("at 30,000 mi")
|
||||
assert result is not None
|
||||
assert result.value == 30000
|
||||
|
||||
def test_miles_or_pattern(self) -> None:
|
||||
"""Test 'X miles or' pattern."""
|
||||
result = maintenance_matcher.extract_mileage_interval("7,500 miles or 12 months")
|
||||
assert result is not None
|
||||
assert result.value == 7500
|
||||
|
||||
def test_miles_slash_pattern(self) -> None:
|
||||
"""Test 'X mi/Y months' pattern."""
|
||||
result = maintenance_matcher.extract_mileage_interval("5000 mi/6 months")
|
||||
assert result is not None
|
||||
assert result.value == 5000
|
||||
|
||||
def test_no_mileage(self) -> None:
|
||||
"""Test text without mileage."""
|
||||
result = maintenance_matcher.extract_mileage_interval("check brake fluid")
|
||||
assert result is None
|
||||
|
||||
def test_unreasonable_mileage(self) -> None:
|
||||
"""Test unreasonably low/high mileage is rejected."""
|
||||
result = maintenance_matcher.extract_mileage_interval("every 10 miles")
|
||||
assert result is None
|
||||
|
||||
result = maintenance_matcher.extract_mileage_interval("every 1,000,000 miles")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestTimeIntervalExtraction:
|
||||
"""Tests for time interval extraction."""
|
||||
|
||||
def test_every_months_pattern(self) -> None:
|
||||
"""Test 'every X months' pattern."""
|
||||
result = maintenance_matcher.extract_time_interval("every 6 months")
|
||||
assert result is not None
|
||||
assert result.value == 6
|
||||
assert result.confidence >= 0.9
|
||||
|
||||
def test_months_or_pattern(self) -> None:
|
||||
"""Test 'X months or' pattern."""
|
||||
result = maintenance_matcher.extract_time_interval("12 months or 10,000 miles")
|
||||
assert result is not None
|
||||
assert result.value == 12
|
||||
|
||||
def test_annually_pattern(self) -> None:
|
||||
"""Test 'annually' keyword."""
|
||||
result = maintenance_matcher.extract_time_interval("check annually")
|
||||
assert result is not None
|
||||
assert result.value == 12
|
||||
|
||||
def test_semi_annual_pattern(self) -> None:
|
||||
"""Test 'semi-annually' keyword."""
|
||||
result = maintenance_matcher.extract_time_interval("inspect semi-annually")
|
||||
assert result is not None
|
||||
assert result.value == 6
|
||||
|
||||
def test_every_years_pattern(self) -> None:
|
||||
"""Test 'every X years' pattern."""
|
||||
result = maintenance_matcher.extract_time_interval("replace every 2 years")
|
||||
assert result is not None
|
||||
assert result.value == 24
|
||||
|
||||
def test_no_time_interval(self) -> None:
|
||||
"""Test text without time interval."""
|
||||
result = maintenance_matcher.extract_time_interval("change oil filter")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestFluidSpecExtraction:
|
||||
"""Tests for fluid specification extraction."""
|
||||
|
||||
def test_oil_viscosity(self) -> None:
|
||||
"""Test oil viscosity patterns."""
|
||||
result = maintenance_matcher.extract_fluid_spec("Use 0W-20 oil")
|
||||
assert result is not None
|
||||
assert result.value == "0W-20"
|
||||
assert result.fluid_type == "oil"
|
||||
|
||||
result = maintenance_matcher.extract_fluid_spec("5W-30 synthetic")
|
||||
assert result is not None
|
||||
assert result.value == "5W-30"
|
||||
|
||||
def test_transmission_fluid(self) -> None:
|
||||
"""Test transmission fluid patterns."""
|
||||
result = maintenance_matcher.extract_fluid_spec("ATF-Z1 transmission fluid")
|
||||
assert result is not None
|
||||
assert "ATF" in result.value
|
||||
assert result.fluid_type == "transmission"
|
||||
|
||||
result = maintenance_matcher.extract_fluid_spec("Dexron VI")
|
||||
assert result is not None
|
||||
assert result.fluid_type == "transmission"
|
||||
|
||||
def test_brake_fluid(self) -> None:
|
||||
"""Test brake fluid patterns."""
|
||||
result = maintenance_matcher.extract_fluid_spec("DOT 4 brake fluid")
|
||||
assert result is not None
|
||||
assert "DOT" in result.value
|
||||
assert result.fluid_type == "brake"
|
||||
|
||||
def test_extract_all_fluid_specs(self) -> None:
|
||||
"""Test extracting multiple fluid specs."""
|
||||
text = "Use 0W-20 oil and DOT 4 brake fluid"
|
||||
results = maintenance_matcher.extract_all_fluid_specs(text)
|
||||
assert len(results) >= 2
|
||||
|
||||
|
||||
class TestCombinedInterval:
|
||||
"""Tests for combined interval extraction."""
|
||||
|
||||
def test_mileage_and_time(self) -> None:
|
||||
"""Test extracting both intervals."""
|
||||
text = "every 5,000 miles or 6 months, whichever comes first"
|
||||
mileage, time = maintenance_matcher.extract_combined_interval(text)
|
||||
|
||||
assert mileage is not None
|
||||
assert mileage.value == 5000
|
||||
|
||||
assert time is not None
|
||||
assert time.value == 6
|
||||
|
||||
def test_only_mileage(self) -> None:
|
||||
"""Test with only mileage."""
|
||||
text = "replace every 30,000 miles"
|
||||
mileage, time = maintenance_matcher.extract_combined_interval(text)
|
||||
|
||||
assert mileage is not None
|
||||
assert mileage.value == 30000
|
||||
assert time is None
|
||||
|
||||
def test_only_time(self) -> None:
|
||||
"""Test with only time."""
|
||||
text = "inspect annually"
|
||||
mileage, time = maintenance_matcher.extract_combined_interval(text)
|
||||
|
||||
assert mileage is None
|
||||
assert time is not None
|
||||
assert time.value == 12
|
||||
116
ocr/tests/test_service_mapping.py
Normal file
116
ocr/tests/test_service_mapping.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Tests for service name mapping."""
|
||||
import pytest
|
||||
|
||||
from app.patterns.service_mapping import service_mapper
|
||||
|
||||
|
||||
class TestServiceMapping:
|
||||
"""Tests for service to subtype mapping."""
|
||||
|
||||
def test_engine_oil_mapping(self) -> None:
|
||||
"""Test engine oil service mapping."""
|
||||
result = service_mapper.map_service("engine oil")
|
||||
assert result is not None
|
||||
assert result.normalized_name == "Engine Oil Change"
|
||||
assert "Engine Oil" in result.subtypes
|
||||
assert result.category == "routine_maintenance"
|
||||
|
||||
def test_oil_change_mapping(self) -> None:
|
||||
"""Test oil change service mapping."""
|
||||
result = service_mapper.map_service("oil change")
|
||||
assert result is not None
|
||||
assert "Engine Oil" in result.subtypes
|
||||
|
||||
def test_air_filter_mapping(self) -> None:
|
||||
"""Test air filter service mapping."""
|
||||
result = service_mapper.map_service("engine air filter")
|
||||
assert result is not None
|
||||
assert result.normalized_name == "Air Filter Replacement"
|
||||
assert "Air Filter Element" in result.subtypes
|
||||
|
||||
def test_cabin_filter_mapping(self) -> None:
|
||||
"""Test cabin air filter mapping."""
|
||||
result = service_mapper.map_service("cabin air filter")
|
||||
assert result is not None
|
||||
assert "Cabin Air Filter / Purifier" in result.subtypes
|
||||
|
||||
def test_tire_rotation_mapping(self) -> None:
|
||||
"""Test tire rotation mapping."""
|
||||
result = service_mapper.map_service("tire rotation")
|
||||
assert result is not None
|
||||
assert "Tires" in result.subtypes
|
||||
assert result.confidence >= 0.95
|
||||
|
||||
def test_brake_inspection_mapping(self) -> None:
|
||||
"""Test brake inspection mapping."""
|
||||
result = service_mapper.map_service("brake inspection")
|
||||
assert result is not None
|
||||
assert "Brakes and Traction Control" in result.subtypes
|
||||
|
||||
def test_coolant_mapping(self) -> None:
|
||||
"""Test coolant service mapping."""
|
||||
result = service_mapper.map_service("engine coolant")
|
||||
assert result is not None
|
||||
assert "Coolant" in result.subtypes
|
||||
|
||||
def test_transmission_fluid_mapping(self) -> None:
|
||||
"""Test transmission fluid mapping."""
|
||||
result = service_mapper.map_service("automatic transmission fluid")
|
||||
assert result is not None
|
||||
assert "Fluid - A/T" in result.subtypes
|
||||
|
||||
def test_spark_plug_mapping(self) -> None:
|
||||
"""Test spark plug mapping."""
|
||||
result = service_mapper.map_service("spark plugs")
|
||||
assert result is not None
|
||||
assert "Spark Plug" in result.subtypes
|
||||
|
||||
def test_wiper_blade_mapping(self) -> None:
|
||||
"""Test wiper blade mapping."""
|
||||
result = service_mapper.map_service("wiper blades")
|
||||
assert result is not None
|
||||
assert "Wiper Blade" in result.subtypes
|
||||
|
||||
def test_unknown_service(self) -> None:
|
||||
"""Test unknown service returns None."""
|
||||
result = service_mapper.map_service("quantum flux capacitor")
|
||||
assert result is None
|
||||
|
||||
def test_case_insensitive(self) -> None:
|
||||
"""Test mapping is case insensitive."""
|
||||
result = service_mapper.map_service("ENGINE OIL")
|
||||
assert result is not None
|
||||
assert "Engine Oil" in result.subtypes
|
||||
|
||||
def test_partial_match(self) -> None:
|
||||
"""Test partial matching in longer text."""
|
||||
result = service_mapper.map_service("Replace engine oil and filter")
|
||||
assert result is not None
|
||||
assert "Engine Oil" in result.subtypes
|
||||
|
||||
|
||||
class TestFuzzyMapping:
|
||||
"""Tests for fuzzy service mapping."""
|
||||
|
||||
def test_fuzzy_oil_change(self) -> None:
|
||||
"""Test fuzzy matching for oil change."""
|
||||
result = service_mapper.map_service_fuzzy("change the engine oil")
|
||||
assert result is not None
|
||||
assert "Engine Oil" in result.subtypes
|
||||
|
||||
def test_fuzzy_low_threshold(self) -> None:
|
||||
"""Test fuzzy matching with low similarity."""
|
||||
result = service_mapper.map_service_fuzzy("oil", threshold=0.3)
|
||||
assert result is not None # Should match "engine oil" partially
|
||||
|
||||
|
||||
class TestKeywords:
|
||||
"""Tests for keyword extraction."""
|
||||
|
||||
def test_get_keywords(self) -> None:
|
||||
"""Test getting service keywords."""
|
||||
keywords = service_mapper.get_all_service_keywords()
|
||||
assert len(keywords) > 0
|
||||
assert "engine oil" in keywords
|
||||
assert "service" in keywords
|
||||
assert "maintenance" in keywords
|
||||
122
ocr/tests/test_table_parser.py
Normal file
122
ocr/tests/test_table_parser.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Tests for table parsing."""
|
||||
import pytest
|
||||
|
||||
from app.table_extraction.parser import table_parser
|
||||
|
||||
|
||||
class TestTableParsing:
|
||||
"""Tests for maintenance table parsing."""
|
||||
|
||||
def test_parse_simple_table(self) -> None:
|
||||
"""Test parsing a simple maintenance table."""
|
||||
header = ["Service", "Miles", "Months"]
|
||||
data = [
|
||||
["Engine Oil", "5,000", "6"],
|
||||
["Air Filter", "30,000", "24"],
|
||||
["Cabin Filter", "15,000", "12"],
|
||||
]
|
||||
|
||||
results = table_parser.parse_table(header, data)
|
||||
|
||||
assert len(results) == 3
|
||||
|
||||
# Check oil change
|
||||
oil = next(r for r in results if "oil" in r.service.lower())
|
||||
assert oil.interval_miles == 5000
|
||||
assert oil.interval_months == 6
|
||||
|
||||
def test_parse_table_with_notes(self) -> None:
|
||||
"""Test parsing table with notes column."""
|
||||
header = ["Item", "Interval", "Notes"]
|
||||
data = [
|
||||
["Engine Oil", "5,000 miles or 6 months", "Use 0W-20"],
|
||||
["Brake Fluid", "30,000 miles", "DOT 4"],
|
||||
]
|
||||
|
||||
results = table_parser.parse_table(header, data)
|
||||
|
||||
assert len(results) == 2
|
||||
|
||||
def test_parse_without_headers(self) -> None:
|
||||
"""Test parsing table without clear headers."""
|
||||
data = [
|
||||
["Engine oil change", "5,000 miles", "6 months"],
|
||||
["Tire rotation", "7,500 miles", ""],
|
||||
]
|
||||
|
||||
results = table_parser._parse_without_headers(data)
|
||||
|
||||
assert len(results) >= 1
|
||||
|
||||
def test_parse_text_block(self) -> None:
|
||||
"""Test parsing unstructured text."""
|
||||
text = """
|
||||
Engine oil: replace every 5,000 miles or 6 months
|
||||
Air filter: replace every 30,000 miles
|
||||
Tire rotation: every 7,500 miles
|
||||
"""
|
||||
|
||||
results = table_parser.parse_text_block(text)
|
||||
|
||||
assert len(results) >= 2
|
||||
|
||||
|
||||
class TestColumnIdentification:
|
||||
"""Tests for column type identification."""
|
||||
|
||||
def test_identify_service_column(self) -> None:
|
||||
"""Test identifying service column."""
|
||||
header = ["Service Item", "Miles", "Months"]
|
||||
columns = table_parser._identify_columns(header)
|
||||
|
||||
assert columns.get(0) == "service"
|
||||
assert columns.get(1) == "miles"
|
||||
assert columns.get(2) == "months"
|
||||
|
||||
def test_identify_maintenance_column(self) -> None:
|
||||
"""Test identifying 'maintenance' as service column."""
|
||||
header = ["Maintenance", "Interval", "Notes"]
|
||||
columns = table_parser._identify_columns(header)
|
||||
|
||||
assert columns.get(0) == "service"
|
||||
|
||||
def test_identify_details_column(self) -> None:
|
||||
"""Test identifying details/notes column."""
|
||||
header = ["Item", "Miles", "Notes"]
|
||||
columns = table_parser._identify_columns(header)
|
||||
|
||||
assert columns.get(2) == "details"
|
||||
|
||||
|
||||
class TestIntervalExtraction:
|
||||
"""Tests for interval extraction from cells."""
|
||||
|
||||
def test_extract_miles_with_comma(self) -> None:
|
||||
"""Test extracting miles with comma separator."""
|
||||
result = table_parser._extract_miles("5,000")
|
||||
assert result == 5000
|
||||
|
||||
def test_extract_miles_without_comma(self) -> None:
|
||||
"""Test extracting miles without comma."""
|
||||
result = table_parser._extract_miles("5000")
|
||||
assert result == 5000
|
||||
|
||||
def test_extract_miles_with_unit(self) -> None:
|
||||
"""Test extracting miles with unit."""
|
||||
result = table_parser._extract_miles("5,000 miles")
|
||||
assert result == 5000
|
||||
|
||||
def test_extract_miles_k_notation(self) -> None:
|
||||
"""Test extracting miles with K notation."""
|
||||
result = table_parser._extract_miles("5K")
|
||||
assert result == 5000
|
||||
|
||||
def test_extract_months(self) -> None:
|
||||
"""Test extracting months."""
|
||||
result = table_parser._extract_months("6")
|
||||
assert result == 6
|
||||
|
||||
def test_extract_months_with_unit(self) -> None:
|
||||
"""Test extracting months with unit."""
|
||||
result = table_parser._extract_months("12 months")
|
||||
assert result == 12
|
||||
Reference in New Issue
Block a user