Files
motovaultpro/ocr/app/services/job_queue.py
Eric Gullickson 3eb54211cb
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 3m1s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
feat: add owner's manual OCR pipeline (refs #71)
Implement async PDF processing for owner's manuals with maintenance
schedule extraction:

- Add PDF preprocessor with PyMuPDF for text/scanned PDF handling
- Add maintenance pattern matching (mileage, time, fluid specs)
- Add service name mapping to maintenance subtypes
- Add table detection and parsing for schedule tables
- Add manual extractor orchestrating the complete pipeline
- Add POST /extract/manual endpoint for async job submission
- Add Redis job queue support for manual extraction jobs
- Add progress tracking during processing

Processing pipeline:
1. Analyze PDF structure (text layer vs scanned)
2. Find maintenance schedule sections
3. Extract text or OCR scanned pages at 300 DPI
4. Detect and parse maintenance tables
5. Normalize service names and extract intervals
6. Return structured maintenance schedules with confidence scores

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:30:20 -06:00

395 lines
12 KiB
Python

"""Redis-based job queue for async OCR processing."""
import asyncio
import json
import logging
import uuid
from typing import Optional, TYPE_CHECKING
import redis.asyncio as redis
from app.config import settings
from app.models import JobResponse, JobStatus, OcrResponse
if TYPE_CHECKING:
from app.models import ManualExtractionResponse, ManualJobResponse
logger = logging.getLogger(__name__)
# Job TTL in seconds (1 hour)
JOB_TTL = 3600
# Manual job TTL (2 hours for larger files)
MANUAL_JOB_TTL = 7200
# Key prefixes
JOB_PREFIX = "ocr:job:"
JOB_DATA_PREFIX = "ocr:job:data:"
JOB_RESULT_PREFIX = "ocr:job:result:"
# Manual job prefixes
MANUAL_JOB_PREFIX = "ocr:manual:job:"
MANUAL_JOB_DATA_PREFIX = "ocr:manual:job:data:"
MANUAL_JOB_RESULT_PREFIX = "ocr:manual:job:result:"
class JobQueue:
"""Manages async OCR jobs using Redis."""
def __init__(self) -> None:
"""Initialize job queue."""
self._redis: Optional[redis.Redis] = None
async def get_redis(self) -> redis.Redis:
"""Get or create Redis connection."""
if self._redis is None:
self._redis = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
db=settings.redis_db,
decode_responses=True,
)
return self._redis
async def close(self) -> None:
"""Close Redis connection."""
if self._redis:
await self._redis.close()
self._redis = None
async def submit_job(
self,
file_bytes: bytes,
content_type: str,
callback_url: Optional[str] = None,
) -> str:
"""
Submit a new OCR job.
Args:
file_bytes: Raw file bytes to process
content_type: MIME type of the file
callback_url: Optional URL to call when job completes
Returns:
Job ID
"""
r = await self.get_redis()
job_id = str(uuid.uuid4())
# Store job metadata
job_meta = {
"status": JobStatus.PENDING.value,
"progress": 0,
"content_type": content_type,
"callback_url": callback_url or "",
}
# Store file data separately (binary)
data_key = f"{JOB_DATA_PREFIX}{job_id}"
meta_key = f"{JOB_PREFIX}{job_id}"
# Use pipeline for atomic operation
async with r.pipeline() as pipe:
# Store metadata as hash
await pipe.hset(meta_key, mapping=job_meta) # type: ignore
await pipe.expire(meta_key, JOB_TTL)
# Store binary data
await pipe.set(data_key, file_bytes)
await pipe.expire(data_key, JOB_TTL)
await pipe.execute()
logger.info(f"Job {job_id} submitted")
return job_id
async def get_job_status(self, job_id: str) -> Optional[JobResponse]:
"""
Get the status of a job.
Args:
job_id: Job ID to check
Returns:
JobResponse or None if job doesn't exist
"""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
result_key = f"{JOB_RESULT_PREFIX}{job_id}"
# Get job metadata
meta = await r.hgetall(meta_key) # type: ignore
if not meta:
return None
status = JobStatus(meta.get("status", JobStatus.PENDING.value))
progress = int(meta.get("progress", 0))
error = meta.get("error")
# Get result if completed
result = None
if status == JobStatus.COMPLETED:
result_json = await r.get(result_key)
if result_json:
result_dict = json.loads(result_json)
result = OcrResponse(**result_dict)
return JobResponse(
jobId=job_id,
status=status,
progress=progress if status == JobStatus.PROCESSING else None,
result=result,
error=error if status == JobStatus.FAILED else None,
)
async def update_job_progress(self, job_id: str, progress: int) -> None:
"""Update job progress percentage."""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
await r.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.PROCESSING.value,
"progress": progress,
})
async def complete_job(self, job_id: str, result: OcrResponse) -> None:
"""Mark job as completed with result."""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
result_key = f"{JOB_RESULT_PREFIX}{job_id}"
data_key = f"{JOB_DATA_PREFIX}{job_id}"
# Store result
result_dict = result.model_dump(by_alias=True)
result_json = json.dumps(result_dict)
async with r.pipeline() as pipe:
# Update status
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.COMPLETED.value,
"progress": 100,
})
# Store result
await pipe.set(result_key, result_json)
await pipe.expire(result_key, JOB_TTL)
# Delete file data (no longer needed)
await pipe.delete(data_key)
await pipe.execute()
logger.info(f"Job {job_id} completed")
# TODO: Trigger callback if configured
meta = await r.hgetall(meta_key) # type: ignore
callback_url = meta.get("callback_url")
if callback_url:
# Fire-and-forget callback (don't block)
asyncio.create_task(self._send_callback(callback_url, job_id, result_dict))
async def fail_job(self, job_id: str, error: str) -> None:
"""Mark job as failed with error message."""
r = await self.get_redis()
meta_key = f"{JOB_PREFIX}{job_id}"
data_key = f"{JOB_DATA_PREFIX}{job_id}"
async with r.pipeline() as pipe:
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.FAILED.value,
"error": error,
})
# Delete file data
await pipe.delete(data_key)
await pipe.execute()
logger.error(f"Job {job_id} failed: {error}")
async def get_job_data(self, job_id: str) -> Optional[bytes]:
"""Get the file data for a job."""
r = await self.get_redis()
data_key = f"{JOB_DATA_PREFIX}{job_id}"
# Get raw bytes (not decoded)
raw_redis = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
db=settings.redis_db,
decode_responses=False,
)
try:
data = await raw_redis.get(data_key)
return data # type: ignore
finally:
await raw_redis.close()
async def _send_callback(
self, url: str, job_id: str, result: dict
) -> None:
"""Send callback notification when job completes."""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
url,
json={"jobId": job_id, "result": result},
)
logger.info(f"Callback sent for job {job_id}")
except Exception as e:
logger.error(f"Callback failed for job {job_id}: {e}")
# Manual extraction job methods
async def submit_manual_job(
self,
file_bytes: bytes,
vehicle_id: Optional[str] = None,
) -> str:
"""
Submit a new manual extraction job.
Args:
file_bytes: Raw PDF bytes
vehicle_id: Optional vehicle ID for context
Returns:
Job ID
"""
r = await self.get_redis()
job_id = str(uuid.uuid4())
# Store job metadata
job_meta = {
"status": JobStatus.PENDING.value,
"progress": 0,
"progress_message": "",
"vehicle_id": vehicle_id or "",
"job_type": "manual",
}
# Store file data separately (binary)
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
# Use pipeline for atomic operation
async with r.pipeline() as pipe:
# Store metadata as hash
await pipe.hset(meta_key, mapping=job_meta) # type: ignore
await pipe.expire(meta_key, MANUAL_JOB_TTL)
# Store binary data
await pipe.set(data_key, file_bytes)
await pipe.expire(data_key, MANUAL_JOB_TTL)
await pipe.execute()
logger.info(f"Manual job {job_id} submitted")
return job_id
async def get_manual_job_status(self, job_id: str) -> Optional["ManualJobResponse"]:
"""
Get the status of a manual extraction job.
Args:
job_id: Job ID to check
Returns:
ManualJobResponse or None if job doesn't exist
"""
from app.models import ManualJobResponse, ManualExtractionResponse
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
# Get job metadata
meta = await r.hgetall(meta_key) # type: ignore
if not meta:
return None
status = JobStatus(meta.get("status", JobStatus.PENDING.value))
progress = int(meta.get("progress", 0))
error = meta.get("error")
# Get result if completed
result = None
if status == JobStatus.COMPLETED:
result_json = await r.get(result_key)
if result_json:
result_dict = json.loads(result_json)
result = ManualExtractionResponse(**result_dict)
return ManualJobResponse(
jobId=job_id,
status=status,
progress=progress if status == JobStatus.PROCESSING else None,
result=result,
error=error if status == JobStatus.FAILED else None,
)
async def update_manual_job_progress(
self, job_id: str, progress: int, message: str = ""
) -> None:
"""Update manual job progress percentage and message."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
await r.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.PROCESSING.value,
"progress": progress,
"progress_message": message,
})
async def complete_manual_job(
self, job_id: str, result: "ManualExtractionResponse"
) -> None:
"""Mark manual job as completed with result."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
result_key = f"{MANUAL_JOB_RESULT_PREFIX}{job_id}"
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
# Store result
result_dict = result.model_dump(by_alias=True)
result_json = json.dumps(result_dict)
async with r.pipeline() as pipe:
# Update status
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.COMPLETED.value,
"progress": 100,
})
# Store result
await pipe.set(result_key, result_json)
await pipe.expire(result_key, MANUAL_JOB_TTL)
# Delete file data (no longer needed)
await pipe.delete(data_key)
await pipe.execute()
logger.info(f"Manual job {job_id} completed")
async def fail_manual_job(self, job_id: str, error: str) -> None:
"""Mark manual job as failed with error message."""
r = await self.get_redis()
meta_key = f"{MANUAL_JOB_PREFIX}{job_id}"
data_key = f"{MANUAL_JOB_DATA_PREFIX}{job_id}"
async with r.pipeline() as pipe:
await pipe.hset(meta_key, mapping={ # type: ignore
"status": JobStatus.FAILED.value,
"error": error,
})
# Delete file data
await pipe.delete(data_key)
await pipe.execute()
logger.error(f"Manual job {job_id} failed: {error}")
# Singleton instance
job_queue = JobQueue()