Files
motovaultpro/ocr/app/routers/extract.py
Eric Gullickson 653c535165
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 38s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 22s
Deploy to Staging / Verify Staging (pull_request) Successful in 8s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
chore: add PDF support to receipt OCR pipeline (refs #182)
The receipt extractor only accepted image MIME types, rejecting PDFs at
the OCR layer. Added application/pdf to supported types and PDF-to-image
conversion (first page at 300 DPI) before OCR preprocessing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 21:22:40 -06:00

519 lines
17 KiB
Python

"""OCR extraction endpoints."""
import logging
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, Query, UploadFile
from app.extractors.vin_extractor import vin_extractor
from app.extractors.receipt_extractor import receipt_extractor
from app.extractors.maintenance_receipt_extractor import maintenance_receipt_extractor
from app.extractors.manual_extractor import manual_extractor
from app.models import (
BoundingBox,
ManualExtractionResponse,
ManualJobResponse,
ManualMaintenanceSchedule,
ManualVehicleInfo,
OcrResponse,
ReceiptExtractedField,
ReceiptExtractionResponse,
VinAlternative,
VinExtractionResponse,
)
from app.services import ocr_service, job_queue
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/extract", tags=["extract"])
# Maximum file size for synchronous processing (10MB)
MAX_SYNC_SIZE = 10 * 1024 * 1024
# Maximum file size for manual/PDF processing (200MB)
MAX_MANUAL_SIZE = 200 * 1024 * 1024
@router.post("", response_model=OcrResponse)
async def extract_text(
file: UploadFile = File(..., description="Image file to process"),
preprocess: bool = Query(True, description="Apply image preprocessing"),
) -> OcrResponse:
"""
Extract text from an uploaded image using OCR.
Supports HEIC, JPEG, PNG, and PDF (first page only) formats.
Processing time target: <3 seconds for typical photos.
- **file**: Image file (max 10MB for sync processing)
- **preprocess**: Whether to apply deskew/denoise preprocessing (default: true)
"""
# Validate file presence
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > MAX_SYNC_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large for sync processing. Max: {MAX_SYNC_SIZE // (1024*1024)}MB. Use /jobs for larger files.",
)
if file_size == 0:
raise HTTPException(status_code=400, detail="Empty file provided")
logger.info(
f"Processing file: {file.filename}, "
f"size: {file_size} bytes, "
f"content_type: {file.content_type}"
)
# Perform OCR extraction
result = ocr_service.extract(
file_bytes=content,
content_type=file.content_type,
preprocess=preprocess,
)
if not result.success:
logger.warning(f"OCR extraction failed for {file.filename}")
raise HTTPException(
status_code=422,
detail="Failed to extract text from image. Ensure the file is a valid image format.",
)
return result
@router.post("/vin", response_model=VinExtractionResponse)
async def extract_vin(
file: UploadFile = File(..., description="Image file containing VIN"),
) -> VinExtractionResponse:
"""
Extract VIN (Vehicle Identification Number) from an uploaded image.
Uses VIN-optimized preprocessing and pattern matching:
- HEIC conversion (if needed)
- Grayscale conversion
- Deskew correction
- CLAHE contrast enhancement
- Noise reduction
- Adaptive thresholding
- VIN pattern matching (17 chars, excludes I/O/Q)
- Check digit validation
- Common OCR error correction (I->1, O->0, Q->0)
Supports HEIC, JPEG, PNG formats.
Processing time target: <3 seconds.
- **file**: Image file (max 10MB)
Returns:
- **vin**: Extracted VIN (17 alphanumeric characters)
- **confidence**: Confidence score (0.0-1.0)
- **boundingBox**: Location of VIN in image (if detected)
- **alternatives**: Other VIN candidates with confidence scores
- **processingTimeMs**: Processing time in milliseconds
"""
# Validate file presence
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > MAX_SYNC_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Max: {MAX_SYNC_SIZE // (1024*1024)}MB",
)
if file_size == 0:
raise HTTPException(status_code=400, detail="Empty file provided")
logger.info(
f"VIN extraction: {file.filename}, "
f"size: {file_size} bytes, "
f"content_type: {file.content_type}"
)
# Perform VIN extraction
result = vin_extractor.extract(
image_bytes=content,
content_type=file.content_type,
)
# Convert internal result to API response
bounding_box = None
if result.bounding_box:
bounding_box = BoundingBox(
x=result.bounding_box.x,
y=result.bounding_box.y,
width=result.bounding_box.width,
height=result.bounding_box.height,
)
alternatives = [
VinAlternative(vin=alt.vin, confidence=alt.confidence)
for alt in result.alternatives
]
return VinExtractionResponse(
success=result.success,
vin=result.vin,
confidence=result.confidence,
boundingBox=bounding_box,
alternatives=alternatives,
processingTimeMs=result.processing_time_ms,
error=result.error,
)
@router.post("/receipt", response_model=ReceiptExtractionResponse)
async def extract_receipt(
file: UploadFile = File(..., description="Receipt image file"),
receipt_type: Optional[str] = Form(
default=None,
description="Receipt type hint: 'fuel' for specialized extraction",
),
) -> ReceiptExtractionResponse:
"""
Extract data from a receipt image using OCR.
Optimized for fuel receipts with pattern-based field extraction:
- HEIC conversion (if needed)
- Grayscale conversion
- High contrast enhancement (for thermal receipts)
- Adaptive thresholding
- Pattern matching for dates, amounts, fuel quantities
Supports HEIC, JPEG, PNG formats.
Processing time target: <3 seconds.
- **file**: Receipt image file (max 10MB)
- **receipt_type**: Optional hint ("fuel" for gas station receipts)
Returns:
- **receiptType**: Detected type ("fuel" or "unknown")
- **extractedFields**: Dictionary of extracted fields with confidence scores
- merchantName: Gas station or store name
- transactionDate: Date in YYYY-MM-DD format
- totalAmount: Total purchase amount
- fuelQuantity: Gallons/liters purchased (fuel receipts)
- pricePerUnit: Price per gallon/liter (fuel receipts)
- fuelGrade: Octane rating or fuel type (fuel receipts)
- **rawText**: Full OCR text
- **processingTimeMs**: Processing time in milliseconds
"""
# Validate file presence
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > MAX_SYNC_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Max: {MAX_SYNC_SIZE // (1024*1024)}MB",
)
if file_size == 0:
raise HTTPException(status_code=400, detail="Empty file provided")
logger.info(
f"Receipt extraction: {file.filename}, "
f"size: {file_size} bytes, "
f"content_type: {file.content_type}, "
f"receipt_type: {receipt_type}"
)
# Perform receipt extraction
result = receipt_extractor.extract(
image_bytes=content,
content_type=file.content_type,
receipt_type=receipt_type,
)
if not result.success:
logger.warning(f"Receipt extraction failed for {file.filename}: {result.error}")
raise HTTPException(
status_code=422,
detail=result.error or "Failed to extract data from receipt image",
)
# Convert internal fields to API response format
extracted_fields = {
name: ReceiptExtractedField(
value=field.value,
confidence=field.confidence,
)
for name, field in result.extracted_fields.items()
}
return ReceiptExtractionResponse(
success=result.success,
receiptType=result.receipt_type,
extractedFields=extracted_fields,
rawText=result.raw_text,
processingTimeMs=result.processing_time_ms,
error=result.error,
)
@router.post("/maintenance-receipt", response_model=ReceiptExtractionResponse)
async def extract_maintenance_receipt(
file: UploadFile = File(..., description="Maintenance receipt image file"),
) -> ReceiptExtractionResponse:
"""
Extract data from a maintenance receipt image using OCR + Gemini.
Gemini-primary extraction with regex cross-validation:
- OCR preprocessing (HEIC conversion, contrast, thresholding)
- PaddleOCR text extraction
- Gemini semantic field extraction from OCR text
- Regex cross-validation for dates, amounts, odometer
Supports HEIC, JPEG, PNG, and PDF formats.
- **file**: Maintenance receipt image or PDF file (max 10MB)
Returns:
- **receiptType**: "maintenance"
- **extractedFields**: Dictionary of extracted fields with confidence scores
- serviceName: Service performed (e.g., "Oil Change")
- serviceDate: Date in YYYY-MM-DD format
- totalCost: Total cost
- shopName: Shop or business name
- laborCost: Labor cost (if broken out)
- partsCost: Parts cost (if broken out)
- odometerReading: Odometer reading (if present)
- vehicleInfo: Vehicle description (if present)
- **rawText**: Full OCR text
- **processingTimeMs**: Processing time in milliseconds
"""
# Validate file presence
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > MAX_SYNC_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Max: {MAX_SYNC_SIZE // (1024*1024)}MB",
)
if file_size == 0:
raise HTTPException(status_code=400, detail="Empty file provided")
logger.info(
f"Maintenance receipt extraction: {file.filename}, "
f"size: {file_size} bytes, "
f"content_type: {file.content_type}"
)
# Perform maintenance receipt extraction
result = maintenance_receipt_extractor.extract(
image_bytes=content,
content_type=file.content_type,
)
if not result.success:
logger.warning(
f"Maintenance receipt extraction failed for {file.filename}: {result.error}"
)
raise HTTPException(
status_code=422,
detail=result.error or "Failed to extract data from maintenance receipt",
)
# Convert internal fields to API response format
extracted_fields = {
name: ReceiptExtractedField(
value=field.value,
confidence=field.confidence,
)
for name, field in result.extracted_fields.items()
}
return ReceiptExtractionResponse(
success=result.success,
receiptType=result.receipt_type,
extractedFields=extracted_fields,
rawText=result.raw_text,
processingTimeMs=result.processing_time_ms,
error=result.error,
)
@router.post("/manual", response_model=ManualJobResponse)
async def extract_manual(
background_tasks: BackgroundTasks,
file: UploadFile = File(..., description="Owner's manual PDF file"),
vehicle_id: Optional[str] = Form(None, description="Vehicle ID for context"),
) -> ManualJobResponse:
"""
Submit an async job to extract maintenance schedules from an owner's manual.
Supports PDF files up to 200MB. Processing is done asynchronously due to
the time required for large documents.
Pipeline:
1. Send entire PDF to Gemini for semantic extraction
2. Map extracted service names to system maintenance subtypes
3. Return structured results with confidence scores
- **file**: Owner's manual PDF (max 200MB)
- **vehicle_id**: Optional vehicle ID for context
Returns immediately with job_id. Poll GET /jobs/{job_id} for status and results.
Response when completed:
- **vehicleInfo**: Detected make/model/year
- **maintenanceSchedules**: List of extracted maintenance items with intervals
- **rawTables**: Metadata about detected tables
- **processingTimeMs**: Total processing time
"""
# Validate file presence
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Validate file type
content_type = file.content_type or ""
if not content_type.startswith("application/pdf") and not file.filename.lower().endswith(".pdf"):
raise HTTPException(
status_code=400,
detail="File must be a PDF document",
)
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > MAX_MANUAL_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Max: {MAX_MANUAL_SIZE // (1024*1024)}MB.",
)
if file_size == 0:
raise HTTPException(status_code=400, detail="Empty file provided")
logger.info(
f"Manual extraction: {file.filename}, "
f"size: {file_size} bytes, "
f"vehicle_id: {vehicle_id}"
)
# Estimate processing time based on file size
# Rough estimate: 1 second per MB for native PDFs, 3 seconds for scanned
estimated_seconds = max(30, (file_size // (1024 * 1024)) * 2)
# Submit job to queue
job_id = await job_queue.submit_manual_job(
file_bytes=content,
vehicle_id=vehicle_id,
)
# Schedule background processing
background_tasks.add_task(process_manual_job, job_id)
# Return initial status
return ManualJobResponse(
jobId=job_id,
status="pending",
progress=0,
estimatedSeconds=estimated_seconds,
)
async def process_manual_job(job_id: str) -> None:
"""Background task to process a manual extraction job."""
import asyncio
logger.info(f"Starting manual extraction job {job_id}")
try:
# Update status to processing
await job_queue.update_manual_job_progress(job_id, 5, "Starting extraction")
# Get job data (must use manual-specific key prefix)
file_bytes = await job_queue.get_manual_job_data(job_id)
if not file_bytes:
await job_queue.fail_manual_job(job_id, "Job data not found")
return
# Define progress callback
async def progress_callback(percent: int, message: str) -> None:
await job_queue.update_manual_job_progress(job_id, percent, message)
# Run extraction in thread pool (CPU-bound)
loop = asyncio.get_event_loop()
def sync_progress_callback(percent: int, message: str) -> None:
# Schedule the async update
asyncio.run_coroutine_threadsafe(
job_queue.update_manual_job_progress(job_id, percent, message),
loop,
)
result = await loop.run_in_executor(
None,
lambda: manual_extractor.extract(
pdf_bytes=file_bytes,
progress_callback=sync_progress_callback,
),
)
if result.success:
# Convert to response model
vehicle_info = None
if result.vehicle_info:
vehicle_info = ManualVehicleInfo(
make=result.vehicle_info.make,
model=result.vehicle_info.model,
year=result.vehicle_info.year,
)
schedules = [
ManualMaintenanceSchedule(
service=s.service,
intervalMiles=s.interval_miles,
intervalMonths=s.interval_months,
details=s.details,
confidence=s.confidence,
subtypes=s.subtypes,
)
for s in result.maintenance_schedules
]
response = ManualExtractionResponse(
success=True,
vehicleInfo=vehicle_info,
maintenanceSchedules=schedules,
rawTables=result.raw_tables,
processingTimeMs=result.processing_time_ms,
totalPages=result.total_pages,
pagesProcessed=result.pages_processed,
)
await job_queue.complete_manual_job(job_id, response)
else:
await job_queue.fail_manual_job(job_id, result.error or "Extraction failed")
except Exception as e:
logger.error(f"Manual job {job_id} failed: {e}", exc_info=True)
await job_queue.fail_manual_job(job_id, str(e))