Files
motovaultpro/ocr/app/services/job_queue.py
Eric Gullickson a078962d3f
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 35s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 51s
Deploy to Staging / Verify Staging (pull_request) Successful in 8s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
fix: Manual scanning
2026-02-11 19:57:32 -06:00

400 lines
12 KiB
Python

"""Redis-based job queue for async OCR processing."""
import asyncio
import json
import logging
import uuid
from typing import Optional, TYPE_CHECKING
import redis.asyncio as redis
from app.config import settings
from app.models import JobResponse, JobStatus, OcrResponse
if TYPE_CHECKING:
from app.models import ManualExtractionResponse, ManualJobResponse
logger = logging.getLogger(__name__)
# Job TTL in seconds (1 hour)
JOB_TTL = 3600
# Manual job TTL (2 hours for larger files)
MANUAL_JOB_TTL = 7200
# Key prefixes
JOB_PREFIX = "ocr:job:"
JOB_DATA_PREFIX = "ocr:job:data:"
JOB_RESULT_PREFIX = "ocr:job:result:"
# Manual job prefixes
MANUAL_JOB_PREFIX = "ocr:manual:job:"
MANUAL_JOB_DATA_PREFIX = "ocr:manual:job:data:"
MANUAL_JOB_RESULT_PREFIX = "ocr:manual:job:result:"
class JobQueue:
"""Manages async OCR jobs using Redis."""
def __init__(self) -> None:
"""Initialize job queue."""
self._redis: Optional[redis.Redis] = None
async def get_redis(self) -> redis.Redis:
"""Get or create Redis connection."""
if self._redis is None:
self._redis = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
db=settings.redis_db,
decode_responses=True,
)
return self._redis
async def close(self) -> None:
"""Close Redis connection."""
if self._redis:
await self._redis.close()
self._redis = None
async def submit_job(
self,
file_bytes: bytes,
content_type: str,
callback_url: Optional[str] = None,
) -> str:
"""
Submit a new OCR job.
Args:
file_bytes: Raw file bytes to process
content_type: MIME type of the file
callback_url: Optional URL to call when job completes
Returns:
Job ID
"""
r = await self.get_redis()
job_id = str(uuid.uuid4())
# Store job metadata
job_meta = {
"status": JobStatus.PENDING.value,
"progress": 0,
"content_type": content_type,
"callback_url": callback_url or "",
}
# Store file data separately (binary)
data_key = f"{JOB_DATA_PREFIX}{job_id}"
meta_key = f"{JOB_PREFIX}{job_id}"
# Use pipeline for atomic operation
async with r.pipeline() as pipe:
# Store metadata as hash
await pipe.hset(meta_key, mapping=job_meta) # type: ignore
await pipe.expire(meta_key, JOB_TTL)
# Store binary data
await pipe.set(data_key, file_bytes)
await pipe.expire(data_key, JOB_TTL)
await pipe.execute()
logger.info(f"Job {job_id} submitted")
return job_id
async def get_job_status(self, job_id: str) -> Optional[JobResponse]:
"""
Get the status of a job.
Args:
job_id: Job ID to check
Returns:
JobResponse or None if job doesn't exist
"""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
result_key = f"{JOB_RESULT_PREFIX}{job_id}"
# Get job metadata
meta = await r.hgetall(meta_key) # type: ignore
if not meta:
return None
status = JobStatus(meta.get("status", JobStatus.PENDING.value))
progress = int(meta.get("progress", 0))
error = meta.get("error")
# Get result if completed
result = None
if status == JobStatus.COMPLETED:
result_json = await r.get(result_key)
if result_json:
result_dict = json.loads(result_json)
result = OcrResponse(**result_dict)
return JobResponse(
jobId=job_id,
status=status,
progress=progress if status == JobStatus.PROCESSING else None,
result=result,
error=error if status == JobStatus.FAILED else None,
)
async def update_job_progress(self, job_id: str, progress: int) -> None:
"""Update job progress percentage."""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
await r.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.PROCESSING.value,
"progress": progress,
})
async def complete_job(self, job_id: str, result: OcrResponse) -> None:
"""Mark job as completed with result."""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
result_key = f"{JOB_RESULT_PREFIX}{job_id}"
data_key = f"{JOB_DATA_PREFIX}{job_id}"
# Store result
result_dict = result.model_dump(by_alias=True)
result_json = json.dumps(result_dict)
async with r.pipeline() as pipe:
# Update status
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.COMPLETED.value,
"progress": 100,
})
# Store result
await pipe.set(result_key, result_json)
await pipe.expire(result_key, JOB_TTL)
# Delete file data (no longer needed)
await pipe.delete(data_key)
await pipe.execute()
logger.info(f"Job {job_id} completed")
# TODO: Trigger callback if configured
meta = await r.hgetall(meta_key) # type: ignore
callback_url = meta.get("callback_url")
if callback_url:
# Fire-and-forget callback (don't block)
asyncio.create_task(self._send_callback(callback_url, job_id, result_dict))
async def fail_job(self, job_id: str, error: str) -> None:
"""Mark job as failed with error message."""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
data_key = f"{JOB_DATA_PREFIX}{job_id}"
async with r.pipeline() as pipe:
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.FAILED.value,
"error": error,
})
# Delete file data
await pipe.delete(data_key)
await pipe.execute()
logger.error(f"Job {job_id} failed: {error}")
async def get_job_data(self, job_id: str) -> Optional[bytes]:
"""Get the file data for a job."""
return await self._get_raw_data(f"{JOB_DATA_PREFIX}{job_id}")
async def get_manual_job_data(self, job_id: str) -> Optional[bytes]:
"""Get the file data for a manual extraction job."""
return await self._get_raw_data(f"{MANUAL_JOB_DATA_PREFIX}{job_id}")
async def _get_raw_data(self, data_key: str) -> Optional[bytes]:
"""Get raw binary data from Redis."""
# Need separate connection with decode_responses=False for binary data
raw_redis = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
db=settings.redis_db,
decode_responses=False,
)
try:
data = await raw_redis.get(data_key)
return data # type: ignore
finally:
await raw_redis.close()
async def _send_callback(
self, url: str, job_id: str, result: dict
) -> None:
"""Send callback notification when job completes."""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
url,
json={"jobId": job_id, "result": result},
)
logger.info(f"Callback sent for job {job_id}")
except Exception as e:
logger.error(f"Callback failed for job {job_id}: {e}")
# Manual extraction job methods
async def submit_manual_job(
self,
file_bytes: bytes,
vehicle_id: Optional[str] = None,
) -> str:
"""
Submit a new manual extraction job.
Args:
file_bytes: Raw PDF bytes
vehicle_id: Optional vehicle ID for context
Returns:
Job ID
"""
r = await self.get_redis()
job_id = str(uuid.uuid4())
# Store job metadata
job_meta = {
"status": JobStatus.PENDING.value,
"progress": 0,
"progress_message": "",
"vehicle_id": vehicle_id or "",
"job_type": "manual",
}
# Store file data separately (binary)
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
# Use pipeline for atomic operation
async with r.pipeline() as pipe:
# Store metadata as hash
await pipe.hset(meta_key, mapping=job_meta) # type: ignore
await pipe.expire(meta_key, MANUAL_JOB_TTL)
# Store binary data
await pipe.set(data_key, file_bytes)
await pipe.expire(data_key, MANUAL_JOB_TTL)
await pipe.execute()
logger.info(f"Manual job {job_id} submitted")
return job_id
async def get_manual_job_status(self, job_id: str) -> Optional["ManualJobResponse"]:
"""
Get the status of a manual extraction job.
Args:
job_id: Job ID to check
Returns:
ManualJobResponse or None if job doesn't exist
"""
from app.models import ManualJobResponse, ManualExtractionResponse
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
# Get job metadata
meta = await r.hgetall(meta_key) # type: ignore
if not meta:
return None
status = JobStatus(meta.get("status", JobStatus.PENDING.value))
progress = int(meta.get("progress", 0))
error = meta.get("error")
# Get result if completed
result = None
if status == JobStatus.COMPLETED:
result_json = await r.get(result_key)
if result_json:
result_dict = json.loads(result_json)
result = ManualExtractionResponse(**result_dict)
return ManualJobResponse(
jobId=job_id,
status=status,
progress=progress if status == JobStatus.PROCESSING else None,
result=result,
error=error if status == JobStatus.FAILED else None,
)
async def update_manual_job_progress(
self, job_id: str, progress: int, message: str = ""
) -> None:
"""Update manual job progress percentage and message."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
await r.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.PROCESSING.value,
"progress": progress,
"progress_message": message,
})
async def complete_manual_job(
self, job_id: str, result: "ManualExtractionResponse"
) -> None:
"""Mark manual job as completed with result."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
# Store result
result_dict = result.model_dump(by_alias=True)
result_json = json.dumps(result_dict)
async with r.pipeline() as pipe:
# Update status
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.COMPLETED.value,
"progress": 100,
})
# Store result
await pipe.set(result_key, result_json)
await pipe.expire(result_key, MANUAL_JOB_TTL)
# Delete file data (no longer needed)
await pipe.delete(data_key)
await pipe.execute()
logger.info(f"Manual job {job_id} completed")
async def fail_manual_job(self, job_id: str, error: str) -> None:
"""Mark manual job as failed with error message."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
async with r.pipeline() as pipe:
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.FAILED.value,
"error": error,
})
# Delete file data
await pipe.delete(data_key)
await pipe.execute()
logger.error(f"Manual job {job_id} failed: {error}")
# Singleton instance
job_queue = JobQueue()