All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 5m59s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
OCR Service (Python/FastAPI):
- POST /extract for synchronous OCR extraction
- POST /jobs and GET /jobs/{job_id} for async processing
- Image preprocessing (deskew, denoise) for accuracy
- HEIC conversion via pillow-heif
- Redis job queue for async processing
Backend (Fastify):
- POST /api/ocr/extract - authenticated proxy to OCR
- POST /api/ocr/jobs - async job submission
- GET /api/ocr/jobs/:jobId - job polling
- Multipart file upload handling
- JWT authentication required
File size limits: 10MB sync, 200MB async
Processing time target: <3 seconds for typical photos
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
143 lines
4.1 KiB
Python
143 lines
4.1 KiB
Python
"""Async OCR job endpoints."""
|
|
import asyncio
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile
|
|
|
|
from app.models import JobResponse, JobSubmitRequest
|
|
from app.services import job_queue, ocr_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/jobs", tags=["jobs"])
|
|
|
|
# Maximum file size for async processing (200MB)
|
|
MAX_ASYNC_SIZE = 200 * 1024 * 1024
|
|
|
|
|
|
@router.post("", response_model=JobResponse)
|
|
async def submit_job(
|
|
background_tasks: BackgroundTasks,
|
|
file: UploadFile = File(..., description="Image file to process"),
|
|
callback_url: Optional[str] = Form(None, description="URL to call when job completes"),
|
|
) -> JobResponse:
|
|
"""
|
|
Submit an async OCR job for large files.
|
|
|
|
Use this endpoint for files larger than 10MB or when you don't want to wait
|
|
for processing to complete. Poll GET /jobs/{job_id} for status.
|
|
|
|
- **file**: Image file (max 200MB)
|
|
- **callback_url**: Optional webhook URL to receive job completion notification
|
|
"""
|
|
# Validate file presence
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No file provided")
|
|
|
|
# Read file content
|
|
content = await file.read()
|
|
file_size = len(content)
|
|
|
|
# Validate file size
|
|
if file_size > MAX_ASYNC_SIZE:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"File too large. Max: {MAX_ASYNC_SIZE // (1024*1024)}MB.",
|
|
)
|
|
|
|
if file_size == 0:
|
|
raise HTTPException(status_code=400, detail="Empty file provided")
|
|
|
|
logger.info(
|
|
f"Submitting async job: {file.filename}, "
|
|
f"size: {file_size} bytes, "
|
|
f"content_type: {file.content_type}"
|
|
)
|
|
|
|
# Submit job to queue
|
|
job_id = await job_queue.submit_job(
|
|
file_bytes=content,
|
|
content_type=file.content_type or "application/octet-stream",
|
|
callback_url=callback_url,
|
|
)
|
|
|
|
# Schedule background processing
|
|
background_tasks.add_task(process_job, job_id)
|
|
|
|
# Return initial status
|
|
return JobResponse(
|
|
jobId=job_id,
|
|
status="pending",
|
|
progress=0,
|
|
)
|
|
|
|
|
|
@router.get("/{job_id}", response_model=JobResponse)
|
|
async def get_job_status(job_id: str) -> JobResponse:
|
|
"""
|
|
Get the status of an async OCR job.
|
|
|
|
Poll this endpoint to check job progress and retrieve results.
|
|
|
|
Returns:
|
|
- **pending**: Job is queued
|
|
- **processing**: Job is being processed (includes progress %)
|
|
- **completed**: Job finished successfully (includes result)
|
|
- **failed**: Job failed (includes error message)
|
|
"""
|
|
result = await job_queue.get_job_status(job_id)
|
|
|
|
if result is None:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Job {job_id} not found. Jobs expire after 1 hour.",
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
async def process_job(job_id: str) -> None:
|
|
"""Background task to process an OCR job."""
|
|
logger.info(f"Starting job {job_id}")
|
|
|
|
try:
|
|
# Update status to processing
|
|
await job_queue.update_job_progress(job_id, 10)
|
|
|
|
# Get job data
|
|
file_bytes = await job_queue.get_job_data(job_id)
|
|
if not file_bytes:
|
|
await job_queue.fail_job(job_id, "Job data not found")
|
|
return
|
|
|
|
await job_queue.update_job_progress(job_id, 30)
|
|
|
|
# Get metadata for content type
|
|
status = await job_queue.get_job_status(job_id)
|
|
if not status:
|
|
return
|
|
|
|
# Perform OCR in thread pool (CPU-bound operation)
|
|
await job_queue.update_job_progress(job_id, 50)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
lambda: ocr_service.extract(
|
|
file_bytes=file_bytes,
|
|
preprocess=True,
|
|
),
|
|
)
|
|
|
|
await job_queue.update_job_progress(job_id, 90)
|
|
|
|
if result.success:
|
|
await job_queue.complete_job(job_id, result)
|
|
else:
|
|
await job_queue.fail_job(job_id, "OCR extraction failed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Job {job_id} failed: {e}", exc_info=True)
|
|
await job_queue.fail_job(job_id, str(e))
|