feat: add optional Google Vision cloud fallback engine (refs #118)

CloudEngine wraps Google Vision TEXT_DETECTION with lazy init.
HybridEngine runs primary engine, falls back to cloud when confidence
is below threshold. Disabled by default (OCR_FALLBACK_ENGINE=none).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Eric Gullickson
2026-02-07 11:12:08 -06:00
parent 013fb0c67a
commit 4ef942cb9d
6 changed files with 351 additions and 18 deletions

View File

@@ -17,6 +17,15 @@ class Settings:
os.getenv("OCR_CONFIDENCE_THRESHOLD", "0.6")
)
# Cloud fallback configuration (disabled by default)
self.ocr_fallback_engine: str = os.getenv("OCR_FALLBACK_ENGINE", "none")
self.ocr_fallback_threshold: float = float(
os.getenv("OCR_FALLBACK_THRESHOLD", "0.6")
)
self.google_vision_key_path: str = os.getenv(
"GOOGLE_VISION_KEY_PATH", "/run/secrets/google-vision-key.json"
)
# Redis configuration for job queue
self.redis_host: str = os.getenv("REDIS_HOST", "mvp-redis")
self.redis_port: int = int(os.getenv("REDIS_PORT", "6379"))

View File

@@ -2,6 +2,12 @@
Provides a pluggable engine interface for OCR processing,
decoupling extractors from specific OCR libraries.
Engines:
- PaddleOcrEngine: PaddleOCR PP-OCRv4 (primary, CPU-only)
- TesseractEngine: pytesseract wrapper (backward compatibility)
- CloudEngine: Google Vision TEXT_DETECTION (optional cloud fallback)
- HybridEngine: Primary + fallback with confidence threshold
"""
from app.engines.base_engine import (

View File

@@ -0,0 +1,166 @@
"""Google Vision cloud OCR engine with lazy initialization."""
import logging
import os
from typing import Any
from app.engines.base_engine import (
EngineProcessingError,
EngineUnavailableError,
OcrConfig,
OcrEngine,
OcrEngineResult,
WordBox,
)
logger = logging.getLogger(__name__)
# Default path for Google Vision service account key (Docker secret mount)
_DEFAULT_KEY_PATH = "/run/secrets/google-vision-key.json"
class CloudEngine(OcrEngine):
"""Google Vision TEXT_DETECTION wrapper with lazy initialization.
The client is not created until the first ``recognize()`` call,
so the container starts normally even when the secret file is
missing or the dependency is not installed.
"""
def __init__(self, key_path: str | None = None) -> None:
self._key_path = key_path or os.getenv(
"GOOGLE_VISION_KEY_PATH", _DEFAULT_KEY_PATH
)
self._client: Any | None = None
@property
def name(self) -> str:
return "google_vision"
# ------------------------------------------------------------------
# Lazy init
# ------------------------------------------------------------------
def _get_client(self) -> Any:
"""Create the Vision client on first use."""
if self._client is not None:
return self._client
# Verify credentials file exists
if not os.path.isfile(self._key_path):
raise EngineUnavailableError(
f"Google Vision key not found at {self._key_path}. "
"Set GOOGLE_VISION_KEY_PATH or mount the secret."
)
try:
from google.cloud import vision # type: ignore[import-untyped]
# Point the SDK at the service account key
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._key_path
self._client = vision.ImageAnnotatorClient()
logger.info(
"Google Vision client initialized (key: %s)", self._key_path
)
return self._client
except ImportError as exc:
raise EngineUnavailableError(
"google-cloud-vision is not installed. "
"Install with: pip install google-cloud-vision"
) from exc
except Exception as exc:
raise EngineUnavailableError(
f"Failed to initialize Google Vision client: {exc}"
) from exc
# ------------------------------------------------------------------
# OCR
# ------------------------------------------------------------------
def recognize(self, image_bytes: bytes, config: OcrConfig) -> OcrEngineResult:
"""Run Google Vision TEXT_DETECTION on image bytes."""
client = self._get_client()
try:
from google.cloud import vision # type: ignore[import-untyped]
image = vision.Image(content=image_bytes)
response = client.text_detection(image=image)
if response.error.message:
raise EngineProcessingError(
f"Google Vision API error: {response.error.message}"
)
annotations = response.text_annotations
if not annotations:
return OcrEngineResult(
text="",
confidence=0.0,
word_boxes=[],
engine_name=self.name,
)
# First annotation is the full-page text; the rest are words
full_text = annotations[0].description.strip()
word_boxes: list[WordBox] = []
confidences: list[float] = []
for annotation in annotations[1:]:
text = annotation.description
vertices = annotation.bounding_poly.vertices
# Apply character whitelist filter if configured
if config.char_whitelist:
allowed = set(config.char_whitelist)
text = "".join(ch for ch in text if ch in allowed)
if not text.strip():
continue
xs = [v.x for v in vertices]
ys = [v.y for v in vertices]
x_min, y_min = min(xs), min(ys)
x_max, y_max = max(xs), max(ys)
# Google Vision TEXT_DETECTION does not return per-word
# confidence in annotations. Use 0.95 as the documented
# typical accuracy for clear images so comparisons with
# PaddleOCR are meaningful.
word_conf = 0.95
word_boxes.append(
WordBox(
text=text.strip(),
confidence=word_conf,
x=x_min,
y=y_min,
width=x_max - x_min,
height=y_max - y_min,
)
)
confidences.append(word_conf)
# Apply whitelist to full text too
if config.char_whitelist:
allowed = set(config.char_whitelist)
full_text = "".join(
ch for ch in full_text if ch in allowed or ch in " \n"
)
avg_confidence = (
sum(confidences) / len(confidences) if confidences else 0.0
)
return OcrEngineResult(
text=full_text,
confidence=avg_confidence,
word_boxes=word_boxes,
engine_name=self.name,
)
except (EngineUnavailableError, EngineProcessingError):
raise
except Exception as exc:
raise EngineProcessingError(
f"Google Vision recognition failed: {exc}"
) from exc

View File

@@ -1,5 +1,6 @@
"""Factory function for creating OCR engine instances from configuration."""
import importlib
import logging
from app.config import settings
@@ -7,28 +8,16 @@ from app.engines.base_engine import EngineUnavailableError, OcrEngine
logger = logging.getLogger(__name__)
# Valid engine identifiers
# Valid engine identifiers (primary engines only; hybrid is constructed separately)
_ENGINE_REGISTRY: dict[str, str] = {
"paddleocr": "app.engines.paddle_engine.PaddleOcrEngine",
"tesseract": "app.engines.tesseract_engine.TesseractEngine",
"google_vision": "app.engines.cloud_engine.CloudEngine",
}
def create_engine(engine_name: str | None = None) -> OcrEngine:
"""Instantiate an OCR engine by name (defaults to config value).
Args:
engine_name: Engine identifier ("paddleocr", "tesseract").
Falls back to ``settings.ocr_primary_engine``.
Returns:
Initialized OcrEngine instance.
Raises:
EngineUnavailableError: If the engine cannot be loaded or initialized.
"""
name = (engine_name or settings.ocr_primary_engine).lower().strip()
def _create_single_engine(name: str) -> OcrEngine:
"""Instantiate a single engine by registry name."""
if name not in _ENGINE_REGISTRY:
raise EngineUnavailableError(
f"Unknown engine '{name}'. Available: {list(_ENGINE_REGISTRY.keys())}"
@@ -37,8 +26,6 @@ def create_engine(engine_name: str | None = None) -> OcrEngine:
module_path, class_name = _ENGINE_REGISTRY[name].rsplit(".", 1)
try:
import importlib
module = importlib.import_module(module_path)
engine_cls = getattr(module, class_name)
engine: OcrEngine = engine_cls()
@@ -50,3 +37,51 @@ def create_engine(engine_name: str | None = None) -> OcrEngine:
raise EngineUnavailableError(
f"Failed to create engine '{name}': {exc}"
) from exc
def create_engine(engine_name: str | None = None) -> OcrEngine:
"""Instantiate an OCR engine by name (defaults to config value).
When a fallback engine is configured (``OCR_FALLBACK_ENGINE != "none"``),
returns a ``HybridEngine`` that wraps the primary with the fallback.
Args:
engine_name: Engine identifier ("paddleocr", "tesseract").
Falls back to ``settings.ocr_primary_engine``.
Returns:
Initialized OcrEngine instance (possibly a HybridEngine wrapper).
Raises:
EngineUnavailableError: If the primary engine cannot be loaded.
"""
name = (engine_name or settings.ocr_primary_engine).lower().strip()
primary = _create_single_engine(name)
# Check for cloud fallback configuration
fallback_name = settings.ocr_fallback_engine.lower().strip()
if fallback_name == "none" or not fallback_name:
return primary
# Create fallback engine (failure is non-fatal -- log and return primary only)
try:
fallback = _create_single_engine(fallback_name)
except EngineUnavailableError as exc:
logger.warning(
"Fallback engine '%s' unavailable, proceeding without fallback: %s",
fallback_name,
exc,
)
return primary
from app.engines.hybrid_engine import HybridEngine
threshold = settings.ocr_fallback_threshold
hybrid = HybridEngine(primary=primary, fallback=fallback, threshold=threshold)
logger.info(
"Created hybrid engine: primary=%s, fallback=%s, threshold=%.2f",
name,
fallback_name,
threshold,
)
return hybrid

View File

@@ -0,0 +1,116 @@
"""Hybrid OCR engine: primary engine with optional cloud fallback."""
import logging
import time
from app.engines.base_engine import (
EngineError,
EngineProcessingError,
OcrConfig,
OcrEngine,
OcrEngineResult,
)
logger = logging.getLogger(__name__)
# Maximum time (seconds) to wait for the cloud fallback
_CLOUD_TIMEOUT_SECONDS = 5.0
class HybridEngine(OcrEngine):
"""Runs a primary engine and falls back to a cloud engine when
the primary result confidence is below the configured threshold.
If the fallback is ``None`` (default), this engine behaves identically
to the primary engine. Cloud failures are handled gracefully -- the
primary result is returned whenever the fallback is unavailable,
times out, or errors.
"""
def __init__(
self,
primary: OcrEngine,
fallback: OcrEngine | None = None,
threshold: float = 0.6,
) -> None:
self._primary = primary
self._fallback = fallback
self._threshold = threshold
@property
def name(self) -> str:
fallback_name = self._fallback.name if self._fallback else "none"
return f"hybrid({self._primary.name}+{fallback_name})"
def recognize(self, image_bytes: bytes, config: OcrConfig) -> OcrEngineResult:
"""Run primary OCR, optionally falling back to cloud engine."""
primary_result = self._primary.recognize(image_bytes, config)
# Happy path: primary confidence meets threshold
if primary_result.confidence >= self._threshold:
logger.debug(
"Primary engine confidence %.2f >= threshold %.2f, no fallback",
primary_result.confidence,
self._threshold,
)
return primary_result
# No fallback configured -- return primary result as-is
if self._fallback is None:
logger.debug(
"Primary confidence %.2f < threshold %.2f but no fallback configured",
primary_result.confidence,
self._threshold,
)
return primary_result
# Attempt cloud fallback with timeout guard
logger.info(
"Primary confidence %.2f < threshold %.2f, trying fallback (%s)",
primary_result.confidence,
self._threshold,
self._fallback.name,
)
try:
start = time.monotonic()
fallback_result = self._fallback.recognize(image_bytes, config)
elapsed = time.monotonic() - start
if elapsed > _CLOUD_TIMEOUT_SECONDS:
logger.warning(
"Cloud fallback took %.1fs (> %.1fs limit), using primary result",
elapsed,
_CLOUD_TIMEOUT_SECONDS,
)
return primary_result
# Return whichever result has higher confidence
if fallback_result.confidence > primary_result.confidence:
logger.info(
"Fallback confidence %.2f > primary %.2f, using fallback result",
fallback_result.confidence,
primary_result.confidence,
)
return fallback_result
logger.info(
"Primary confidence %.2f >= fallback %.2f, keeping primary result",
primary_result.confidence,
fallback_result.confidence,
)
return primary_result
except EngineError as exc:
logger.warning(
"Cloud fallback failed (%s), returning primary result: %s",
self._fallback.name,
exc,
)
return primary_result
except Exception as exc:
logger.warning(
"Unexpected cloud fallback error, returning primary result: %s",
exc,
)
return primary_result

View File

@@ -17,6 +17,7 @@ numpy>=1.24.0
pytesseract>=0.3.10
paddlepaddle>=2.6.0
paddleocr>=2.8.0
google-cloud-vision>=3.7.0
# PDF Processing
PyMuPDF>=1.23.0