"""Async OCR job endpoints.""" import asyncio import logging from typing import Optional from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile from app.models import JobResponse, JobSubmitRequest from app.services import job_queue, ocr_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/jobs", tags=["jobs"]) # Maximum file size for async processing (200MB) MAX_ASYNC_SIZE = 200 * 1024 * 1024 @router.post("", response_model=JobResponse) async def submit_job( background_tasks: BackgroundTasks, file: UploadFile = File(..., description="Image file to process"), callback_url: Optional[str] = Form(None, description="URL to call when job completes"), ) -> JobResponse: """ Submit an async OCR job for large files. Use this endpoint for files larger than 10MB or when you don't want to wait for processing to complete. Poll GET /jobs/{job_id} for status. - **file**: Image file (max 200MB) - **callback_url**: Optional webhook URL to receive job completion notification """ # Validate file presence if not file.filename: raise HTTPException(status_code=400, detail="No file provided") # Read file content content = await file.read() file_size = len(content) # Validate file size if file_size > MAX_ASYNC_SIZE: raise HTTPException( status_code=413, detail=f"File too large. Max: {MAX_ASYNC_SIZE // (1024*1024)}MB.", ) if file_size == 0: raise HTTPException(status_code=400, detail="Empty file provided") logger.info( f"Submitting async job: {file.filename}, " f"size: {file_size} bytes, " f"content_type: {file.content_type}" ) # Submit job to queue job_id = await job_queue.submit_job( file_bytes=content, content_type=file.content_type or "application/octet-stream", callback_url=callback_url, ) # Schedule background processing background_tasks.add_task(process_job, job_id) # Return initial status return JobResponse( jobId=job_id, status="pending", progress=0, ) @router.get("/{job_id}", response_model=JobResponse) async def get_job_status(job_id: str) -> JobResponse: """ Get the status of an async OCR job. Poll this endpoint to check job progress and retrieve results. Returns: - **pending**: Job is queued - **processing**: Job is being processed (includes progress %) - **completed**: Job finished successfully (includes result) - **failed**: Job failed (includes error message) """ result = await job_queue.get_job_status(job_id) if result is None: raise HTTPException( status_code=404, detail=f"Job {job_id} not found. Jobs expire after 1 hour.", ) return result async def process_job(job_id: str) -> None: """Background task to process an OCR job.""" logger.info(f"Starting job {job_id}") try: # Update status to processing await job_queue.update_job_progress(job_id, 10) # Get job data file_bytes = await job_queue.get_job_data(job_id) if not file_bytes: await job_queue.fail_job(job_id, "Job data not found") return await job_queue.update_job_progress(job_id, 30) # Get metadata for content type status = await job_queue.get_job_status(job_id) if not status: return # Perform OCR in thread pool (CPU-bound operation) await job_queue.update_job_progress(job_id, 50) loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: ocr_service.extract( file_bytes=file_bytes, preprocess=True, ), ) await job_queue.update_job_progress(job_id, 90) if result.success: await job_queue.complete_job(job_id, result) else: await job_queue.fail_job(job_id, "OCR extraction failed") except Exception as e: logger.error(f"Job {job_id} failed: {e}", exc_info=True) await job_queue.fail_job(job_id, str(e))