All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 3m1s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
Implement async PDF processing for owner's manuals with maintenance schedule extraction: - Add PDF preprocessor with PyMuPDF for text/scanned PDF handling - Add maintenance pattern matching (mileage, time, fluid specs) - Add service name mapping to maintenance subtypes - Add table detection and parsing for schedule tables - Add manual extractor orchestrating the complete pipeline - Add POST /extract/manual endpoint for async job submission - Add Redis job queue support for manual extraction jobs - Add progress tracking during processing Processing pipeline: 1. Analyze PDF structure (text layer vs scanned) 2. Find maintenance schedule sections 3. Extract text or OCR scanned pages at 300 DPI 4. Detect and parse maintenance tables 5. Normalize service names and extract intervals 6. Return structured maintenance schedules with confidence scores Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
149 lines
4.4 KiB
Python
149 lines
4.4 KiB
Python
"""Async OCR job endpoints."""
|
|
import asyncio
|
|
import logging
|
|
from typing import Optional, Union
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile
|
|
|
|
from app.models import JobResponse, JobSubmitRequest, ManualJobResponse
|
|
from app.services import job_queue, ocr_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/jobs", tags=["jobs"])
|
|
|
|
# Maximum file size for async processing (200MB)
|
|
MAX_ASYNC_SIZE = 200 * 1024 * 1024
|
|
|
|
|
|
@router.post("", response_model=JobResponse)
|
|
async def submit_job(
|
|
background_tasks: BackgroundTasks,
|
|
file: UploadFile = File(..., description="Image file to process"),
|
|
callback_url: Optional[str] = Form(None, description="URL to call when job completes"),
|
|
) -> JobResponse:
|
|
"""
|
|
Submit an async OCR job for large files.
|
|
|
|
Use this endpoint for files larger than 10MB or when you don't want to wait
|
|
for processing to complete. Poll GET /jobs/{job_id} for status.
|
|
|
|
- **file**: Image file (max 200MB)
|
|
- **callback_url**: Optional webhook URL to receive job completion notification
|
|
"""
|
|
# Validate file presence
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No file provided")
|
|
|
|
# Read file content
|
|
content = await file.read()
|
|
file_size = len(content)
|
|
|
|
# Validate file size
|
|
if file_size > MAX_ASYNC_SIZE:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"File too large. Max: {MAX_ASYNC_SIZE // (1024*1024)}MB.",
|
|
)
|
|
|
|
if file_size == 0:
|
|
raise HTTPException(status_code=400, detail="Empty file provided")
|
|
|
|
logger.info(
|
|
f"Submitting async job: {file.filename}, "
|
|
f"size: {file_size} bytes, "
|
|
f"content_type: {file.content_type}"
|
|
)
|
|
|
|
# Submit job to queue
|
|
job_id = await job_queue.submit_job(
|
|
file_bytes=content,
|
|
content_type=file.content_type or "application/octet-stream",
|
|
callback_url=callback_url,
|
|
)
|
|
|
|
# Schedule background processing
|
|
background_tasks.add_task(process_job, job_id)
|
|
|
|
# Return initial status
|
|
return JobResponse(
|
|
jobId=job_id,
|
|
status="pending",
|
|
progress=0,
|
|
)
|
|
|
|
|
|
@router.get("/{job_id}", response_model=Union[JobResponse, ManualJobResponse])
|
|
async def get_job_status(job_id: str) -> Union[JobResponse, ManualJobResponse]:
|
|
"""
|
|
Get the status of an async OCR job.
|
|
|
|
Poll this endpoint to check job progress and retrieve results.
|
|
Works for both regular OCR jobs and manual extraction jobs.
|
|
|
|
Returns:
|
|
- **pending**: Job is queued
|
|
- **processing**: Job is being processed (includes progress %)
|
|
- **completed**: Job finished successfully (includes result)
|
|
- **failed**: Job failed (includes error message)
|
|
"""
|
|
# Try regular job first
|
|
result = await job_queue.get_job_status(job_id)
|
|
if result is not None:
|
|
return result
|
|
|
|
# Try manual job
|
|
manual_result = await job_queue.get_manual_job_status(job_id)
|
|
if manual_result is not None:
|
|
return manual_result
|
|
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Job {job_id} not found. Jobs expire after 1-2 hours.",
|
|
)
|
|
|
|
|
|
async def process_job(job_id: str) -> None:
|
|
"""Background task to process an OCR job."""
|
|
logger.info(f"Starting job {job_id}")
|
|
|
|
try:
|
|
# Update status to processing
|
|
await job_queue.update_job_progress(job_id, 10)
|
|
|
|
# Get job data
|
|
file_bytes = await job_queue.get_job_data(job_id)
|
|
if not file_bytes:
|
|
await job_queue.fail_job(job_id, "Job data not found")
|
|
return
|
|
|
|
await job_queue.update_job_progress(job_id, 30)
|
|
|
|
# Get metadata for content type
|
|
status = await job_queue.get_job_status(job_id)
|
|
if not status:
|
|
return
|
|
|
|
# Perform OCR in thread pool (CPU-bound operation)
|
|
await job_queue.update_job_progress(job_id, 50)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
lambda: ocr_service.extract(
|
|
file_bytes=file_bytes,
|
|
preprocess=True,
|
|
),
|
|
)
|
|
|
|
await job_queue.update_job_progress(job_id, 90)
|
|
|
|
if result.success:
|
|
await job_queue.complete_job(job_id, result)
|
|
else:
|
|
await job_queue.fail_job(job_id, "OCR extraction failed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Job {job_id} failed: {e}", exc_info=True)
|
|
await job_queue.fail_job(job_id, str(e))
|