"""Redis-based job queue for async OCR processing.""" import asyncio import json import logging import uuid from typing import Optional, TYPE_CHECKING import redis.asyncio as redis from app.config import settings from app.models import JobResponse, JobStatus, OcrResponse if TYPE_CHECKING: from app.models import ManualExtractionResponse, ManualJobResponse logger = logging.getLogger(__name__) # Job TTL in seconds (1 hour) JOB_TTL = 3600 # Manual job TTL (2 hours for larger files) MANUAL_JOB_TTL = 7200 # Key prefixes JOB_PREFIX = "ocr:job:" JOB_DATA_PREFIX = "ocr:job:data:" JOB_RESULT_PREFIX = "ocr:job:result:" # Manual job prefixes MANUAL_JOB_PREFIX = "ocr:manual:job:" MANUAL_JOB_DATA_PREFIX = "ocr:manual:job:data:" MANUAL_JOB_RESULT_PREFIX = "ocr:manual:job:result:" class JobQueue: """Manages async OCR jobs using Redis.""" def __init__(self) -> None: """Initialize job queue.""" self._redis: Optional[redis.Redis] = None async def get_redis(self) -> redis.Redis: """Get or create Redis connection.""" if self._redis is None: self._redis = redis.Redis( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, decode_responses=True, ) return self._redis async def close(self) -> None: """Close Redis connection.""" if self._redis: await self._redis.close() self._redis = None async def submit_job( self, file_bytes: bytes, content_type: str, callback_url: Optional[str] = None, ) -> str: """ Submit a new OCR job. Args: file_bytes: Raw file bytes to process content_type: MIME type of the file callback_url: Optional URL to call when job completes Returns: Job ID """ r = await self.get_redis() job_id = str(uuid.uuid4()) # Store job metadata job_meta = { "status": JobStatus.PENDING.value, "progress": 0, "content_type": content_type, "callback_url": callback_url or "", } # Store file data separately (binary) data_key = f"{JOB_DATA_PREFIX}{job_id}" meta_key = f"{JOB_PREFIX}{job_id}" # Use pipeline for atomic operation async with r.pipeline() as pipe: # Store metadata as hash await pipe.hset(meta_key, mapping=job_meta) # type: ignore await pipe.expire(meta_key, JOB_TTL) # Store binary data await pipe.set(data_key, file_bytes) await pipe.expire(data_key, JOB_TTL) await pipe.execute() logger.info(f"Job {job_id} submitted") return job_id async def get_job_status(self, job_id: str) -> Optional[JobResponse]: """ Get the status of a job. Args: job_id: Job ID to check Returns: JobResponse or None if job doesn't exist """ r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" result_key = f"{JOB_RESULT_PREFIX}{job_id}" # Get job metadata meta = await r.hgetall(meta_key) # type: ignore if not meta: return None status = JobStatus(meta.get("status", JobStatus.PENDING.value)) progress = int(meta.get("progress", 0)) error = meta.get("error") # Get result if completed result = None if status == JobStatus.COMPLETED: result_json = await r.get(result_key) if result_json: result_dict = json.loads(result_json) result = OcrResponse(**result_dict) return JobResponse( jobId=job_id, status=status, progress=progress if status == JobStatus.PROCESSING else None, result=result, error=error if status == JobStatus.FAILED else None, ) async def update_job_progress(self, job_id: str, progress: int) -> None: """Update job progress percentage.""" r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" await r.hset(meta_key, mapping={ # type: ignore "status": JobStatus.PROCESSING.value, "progress": progress, }) async def complete_job(self, job_id: str, result: OcrResponse) -> None: """Mark job as completed with result.""" r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" result_key = f"{JOB_RESULT_PREFIX}{job_id}" data_key = f"{JOB_DATA_PREFIX}{job_id}" # Store result result_dict = result.model_dump(by_alias=True) result_json = json.dumps(result_dict) async with r.pipeline() as pipe: # Update status await pipe.hset(meta_key, mapping={ # type: ignore "status": JobStatus.COMPLETED.value, "progress": 100, }) # Store result await pipe.set(result_key, result_json) await pipe.expire(result_key, JOB_TTL) # Delete file data (no longer needed) await pipe.delete(data_key) await pipe.execute() logger.info(f"Job {job_id} completed") # TODO: Trigger callback if configured meta = await r.hgetall(meta_key) # type: ignore callback_url = meta.get("callback_url") if callback_url: # Fire-and-forget callback (don't block) asyncio.create_task(self._send_callback(callback_url, job_id, result_dict)) async def fail_job(self, job_id: str, error: str) -> None: """Mark job as failed with error message.""" r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" data_key = f"{JOB_DATA_PREFIX}{job_id}" async with r.pipeline() as pipe: await pipe.hset(meta_key, mapping={ # type: ignore "status": JobStatus.FAILED.value, "error": error, }) # Delete file data await pipe.delete(data_key) await pipe.execute() logger.error(f"Job {job_id} failed: {error}") async def get_job_data(self, job_id: str) -> Optional[bytes]: """Get the file data for a job.""" return await self._get_raw_data(f"{JOB_DATA_PREFIX}{job_id}") async def get_manual_job_data(self, job_id: str) -> Optional[bytes]: """Get the file data for a manual extraction job.""" return await self._get_raw_data(f"{MANUAL_JOB_DATA_PREFIX}{job_id}") async def _get_raw_data(self, data_key: str) -> Optional[bytes]: """Get raw binary data from Redis.""" # Need separate connection with decode_responses=False for binary data raw_redis = redis.Redis( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, decode_responses=False, ) try: data = await raw_redis.get(data_key) return data # type: ignore finally: await raw_redis.close() async def _send_callback( self, url: str, job_id: str, result: dict ) -> None: """Send callback notification when job completes.""" try: import httpx async with httpx.AsyncClient(timeout=10.0) as client: await client.post( url, json={"jobId": job_id, "result": result}, ) logger.info(f"Callback sent for job {job_id}") except Exception as e: logger.error(f"Callback failed for job {job_id}: {e}") # Manual extraction job methods async def submit_manual_job( self, file_bytes: bytes, vehicle_id: Optional[str] = None, ) -> str: """ Submit a new manual extraction job. Args: file_bytes: Raw PDF bytes vehicle_id: Optional vehicle ID for context Returns: Job ID """ r = await self.get_redis() job_id = str(uuid.uuid4()) # Store job metadata job_meta = { "status": JobStatus.PENDING.value, "progress": 0, "progress_message": "", "vehicle_id": vehicle_id or "", "job_type": "manual", } # Store file data separately (binary) data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}" meta_key = f"{MANUAL_JOB_PREFIX}{job_id}" # Use pipeline for atomic operation async with r.pipeline() as pipe: # Store metadata as hash await pipe.hset(meta_key, mapping=job_meta) # type: ignore await pipe.expire(meta_key, MANUAL_JOB_TTL) # Store binary data await pipe.set(data_key, file_bytes) await pipe.expire(data_key, MANUAL_JOB_TTL) await pipe.execute() logger.info(f"Manual job {job_id} submitted") return job_id async def get_manual_job_status(self, job_id: str) -> Optional["ManualJobResponse"]: """ Get the status of a manual extraction job. Args: job_id: Job ID to check Returns: ManualJobResponse or None if job doesn't exist """ from app.models import ManualJobResponse, ManualExtractionResponse r = await self.get_redis() meta_key = f"{MANUAL_JOB_PREFIX}{job_id}" result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}" # Get job metadata meta = await r.hgetall(meta_key) # type: ignore if not meta: return None status = JobStatus(meta.get("status", JobStatus.PENDING.value)) progress = int(meta.get("progress", 0)) error = meta.get("error") # Get result if completed result = None if status == JobStatus.COMPLETED: result_json = await r.get(result_key) if result_json: result_dict = json.loads(result_json) result = ManualExtractionResponse(**result_dict) return ManualJobResponse( jobId=job_id, status=status, progress=progress if status == JobStatus.PROCESSING else None, result=result, error=error if status == JobStatus.FAILED else None, ) async def update_manual_job_progress( self, job_id: str, progress: int, message: str = "" ) -> None: """Update manual job progress percentage and message.""" r = await self.get_redis() meta_key = f"{MANUAL_JOB_PREFIX}{job_id}" await r.hset(meta_key, mapping={ # type: ignore "status": JobStatus.PROCESSING.value, "progress": progress, "progress_message": message, }) async def complete_manual_job( self, job_id: str, result: "ManualExtractionResponse" ) -> None: """Mark manual job as completed with result.""" r = await self.get_redis() meta_key = f"{MANUAL_JOB_PREFIX}{job_id}" result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}" data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}" # Store result result_dict = result.model_dump(by_alias=True) result_json = json.dumps(result_dict) async with r.pipeline() as pipe: # Update status await pipe.hset(meta_key, mapping={ # type: ignore "status": JobStatus.COMPLETED.value, "progress": 100, }) # Store result await pipe.set(result_key, result_json) await pipe.expire(result_key, MANUAL_JOB_TTL) # Delete file data (no longer needed) await pipe.delete(data_key) await pipe.execute() logger.info(f"Manual job {job_id} completed") async def fail_manual_job(self, job_id: str, error: str) -> None: """Mark manual job as failed with error message.""" r = await self.get_redis() meta_key = f"{MANUAL_JOB_PREFIX}{job_id}" data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}" async with r.pipeline() as pipe: await pipe.hset(meta_key, mapping={ # type: ignore "status": JobStatus.FAILED.value, "error": error, }) # Delete file data await pipe.delete(data_key) await pipe.execute() logger.error(f"Manual job {job_id} failed: {error}") # Singleton instance job_queue = JobQueue()