"""Redis-based job queue for async OCR processing.""" import asyncio import json import logging import uuid from typing import Optional import redis.asyncio as redis from app.config import settings from app.models import JobResponse, JobStatus, OcrResponse logger = logging.getLogger(__name__) # Job TTL in seconds (1 hour) JOB_TTL = 3600 # Key prefixes JOB_PREFIX = "ocr:job:" JOB_DATA_PREFIX = "ocr:job:data:" JOB_RESULT_PREFIX = "ocr:job:result:" class JobQueue: """Manages async OCR jobs using Redis.""" def __init__(self) -> None: """Initialize job queue.""" self._redis: Optional[redis.Redis] = None async def get_redis(self) -> redis.Redis: """Get or create Redis connection.""" if self._redis is None: self._redis = redis.Redis( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, decode_responses=True, ) return self._redis async def close(self) -> None: """Close Redis connection.""" if self._redis: await self._redis.close() self._redis = None async def submit_job( self, file_bytes: bytes, content_type: str, callback_url: Optional[str] = None, ) -> str: """ Submit a new OCR job. Args: file_bytes: Raw file bytes to process content_type: MIME type of the file callback_url: Optional URL to call when job completes Returns: Job ID """ r = await self.get_redis() job_id = str(uuid.uuid4()) # Store job metadata job_meta = { "status": JobStatus.PENDING.value, "progress": 0, "content_type": content_type, "callback_url": callback_url or "", } # Store file data separately (binary) data_key = f"{JOB_DATA_PREFIX}{job_id}" meta_key = f"{JOB_PREFIX}{job_id}" # Use pipeline for atomic operation async with r.pipeline() as pipe: # Store metadata as hash await pipe.hset(meta_key, mapping=job_meta) # type: ignore await pipe.expire(meta_key, JOB_TTL) # Store binary data await pipe.set(data_key, file_bytes) await pipe.expire(data_key, JOB_TTL) await pipe.execute() logger.info(f"Job {job_id} submitted") return job_id async def get_job_status(self, job_id: str) -> Optional[JobResponse]: """ Get the status of a job. Args: job_id: Job ID to check Returns: JobResponse or None if job doesn't exist """ r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" result_key = f"{JOB_RESULT_PREFIX}{job_id}" # Get job metadata meta = await r.hgetall(meta_key) # type: ignore if not meta: return None status = JobStatus(meta.get("status", JobStatus.PENDING.value)) progress = int(meta.get("progress", 0)) error = meta.get("error") # Get result if completed result = None if status == JobStatus.COMPLETED: result_json = await r.get(result_key) if result_json: result_dict = json.loads(result_json) result = OcrResponse(**result_dict) return JobResponse( jobId=job_id, status=status, progress=progress if status == JobStatus.PROCESSING else None, result=result, error=error if status == JobStatus.FAILED else None, ) async def update_job_progress(self, job_id: str, progress: int) -> None: """Update job progress percentage.""" r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" await r.hset(meta_key, mapping={ # type: ignore "status": JobStatus.PROCESSING.value, "progress": progress, }) async def complete_job(self, job_id: str, result: OcrResponse) -> None: """Mark job as completed with result.""" r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" result_key = f"{JOB_RESULT_PREFIX}{job_id}" data_key = f"{JOB_DATA_PREFIX}{job_id}" # Store result result_dict = result.model_dump(by_alias=True) result_json = json.dumps(result_dict) async with r.pipeline() as pipe: # Update status await pipe.hset(meta_key, mapping={ # type: ignore "status": JobStatus.COMPLETED.value, "progress": 100, }) # Store result await pipe.set(result_key, result_json) await pipe.expire(result_key, JOB_TTL) # Delete file data (no longer needed) await pipe.delete(data_key) await pipe.execute() logger.info(f"Job {job_id} completed") # TODO: Trigger callback if configured meta = await r.hgetall(meta_key) # type: ignore callback_url = meta.get("callback_url") if callback_url: # Fire-and-forget callback (don't block) asyncio.create_task(self._send_callback(callback_url, job_id, result_dict)) async def fail_job(self, job_id: str, error: str) -> None: """Mark job as failed with error message.""" r = await self.get_redis() meta_key = f"{JOB_PREFIX}{job_id}" data_key = f"{JOB_DATA_PREFIX}{job_id}" async with r.pipeline() as pipe: await pipe.hset(meta_key, mapping={ # type: ignore "status": JobStatus.FAILED.value, "error": error, }) # Delete file data await pipe.delete(data_key) await pipe.execute() logger.error(f"Job {job_id} failed: {error}") async def get_job_data(self, job_id: str) -> Optional[bytes]: """Get the file data for a job.""" r = await self.get_redis() data_key = f"{JOB_DATA_PREFIX}{job_id}" # Get raw bytes (not decoded) raw_redis = redis.Redis( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, decode_responses=False, ) try: data = await raw_redis.get(data_key) return data # type: ignore finally: await raw_redis.close() async def _send_callback( self, url: str, job_id: str, result: dict ) -> None: """Send callback notification when job completes.""" try: import httpx async with httpx.AsyncClient(timeout=10.0) as client: await client.post( url, json={"jobId": job_id, "result": result}, ) logger.info(f"Callback sent for job {job_id}") except Exception as e: logger.error(f"Callback failed for job {job_id}: {e}") # Singleton instance job_queue = JobQueue()