feat: add Gemini engine module and configuration (refs #133)

Add standalone GeminiEngine class for maintenance schedule extraction
from PDF owners manuals using Vertex AI Gemini 2.5 Flash with structured
JSON output enforcement, 20MB size limit, and lazy initialization.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Eric Gullickson
2026-02-11 10:00:47 -06:00
parent d8dec64538
commit 3705e63fde
7 changed files with 603 additions and 0 deletions

View File

@@ -29,6 +29,13 @@ class Settings:
os.getenv("VISION_MONTHLY_LIMIT", "1000")
)
# Vertex AI / Gemini configuration
self.vertex_ai_project: str = os.getenv("VERTEX_AI_PROJECT", "")
self.vertex_ai_location: str = os.getenv(
"VERTEX_AI_LOCATION", "us-central1"
)
self.gemini_model: str = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
# Redis configuration for job queue
self.redis_host: str = os.getenv("REDIS_HOST", "mvp-redis")
self.redis_port: int = int(os.getenv("REDIS_PORT", "6379"))

View File

@@ -0,0 +1,228 @@
"""Gemini 2.5 Flash engine for maintenance schedule extraction from PDFs.
Standalone module (does NOT extend OcrEngine) because Gemini performs
semantic document understanding, not traditional OCR word-box extraction.
Uses Vertex AI SDK with structured JSON output enforcement.
"""
import json
import logging
import os
from dataclasses import dataclass
from typing import Any
from app.config import settings
logger = logging.getLogger(__name__)
# 20 MB hard limit for inline base64 PDF delivery
_MAX_PDF_BYTES = 20 * 1024 * 1024
_EXTRACTION_PROMPT = """\
Extract all routine scheduled maintenance items from this vehicle owners manual.
For each maintenance item, extract:
- serviceName: The maintenance task name (e.g., "Engine Oil Change", "Tire Rotation", \
"Cabin Air Filter Replacement")
- intervalMiles: The mileage interval as a number, or null if not specified \
(e.g., 5000, 30000)
- intervalMonths: The time interval in months as a number, or null if not specified \
(e.g., 6, 12, 24)
- details: Any additional details such as fluid specifications, part numbers, \
or special instructions (e.g., "Use 0W-20 full synthetic oil")
Only include routine scheduled maintenance items with clear intervals. \
Do not include one-time procedures, troubleshooting steps, or warranty information.
Return the results as a JSON object with a single "maintenanceSchedule" array.\
"""
_RESPONSE_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"maintenanceSchedule": {
"type": "array",
"items": {
"type": "object",
"properties": {
"serviceName": {"type": "string"},
"intervalMiles": {"type": "number", "nullable": True},
"intervalMonths": {"type": "number", "nullable": True},
"details": {"type": "string", "nullable": True},
},
"required": ["serviceName"],
},
},
},
"required": ["maintenanceSchedule"],
}
class GeminiEngineError(Exception):
"""Base exception for Gemini engine errors."""
class GeminiUnavailableError(GeminiEngineError):
"""Raised when the Gemini engine cannot be initialized."""
class GeminiProcessingError(GeminiEngineError):
"""Raised when Gemini fails to process a document."""
@dataclass
class MaintenanceItem:
"""A single extracted maintenance schedule item."""
service_name: str
interval_miles: int | None = None
interval_months: int | None = None
details: str | None = None
@dataclass
class MaintenanceExtractionResult:
"""Result from Gemini maintenance schedule extraction."""
items: list[MaintenanceItem]
model: str
class GeminiEngine:
"""Gemini 2.5 Flash wrapper for maintenance schedule extraction.
Standalone class (not an OcrEngine subclass) because Gemini performs
semantic document understanding rather than traditional OCR.
Uses lazy initialization: the Vertex AI client is not created until
the first ``extract_maintenance()`` call.
"""
def __init__(self) -> None:
self._model: Any | None = None
def _get_model(self) -> Any:
"""Create the GenerativeModel on first use.
Authentication uses the same WIF credential path as Google Vision.
"""
if self._model is not None:
return self._model
key_path = settings.google_vision_key_path
if not os.path.isfile(key_path):
raise GeminiUnavailableError(
f"Google credential config not found at {key_path}. "
"Set GOOGLE_VISION_KEY_PATH or mount the secret."
)
try:
from google.cloud import aiplatform # type: ignore[import-untyped]
from vertexai.generative_models import ( # type: ignore[import-untyped]
GenerationConfig,
GenerativeModel,
)
# Point ADC at the WIF credential config
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path
os.environ["GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES"] = "1"
aiplatform.init(
project=settings.vertex_ai_project,
location=settings.vertex_ai_location,
)
model_name = settings.gemini_model
self._model = GenerativeModel(model_name)
self._generation_config = GenerationConfig(
response_mime_type="application/json",
response_schema=_RESPONSE_SCHEMA,
)
logger.info(
"Gemini engine initialized (model=%s, project=%s, location=%s)",
model_name,
settings.vertex_ai_project,
settings.vertex_ai_location,
)
return self._model
except ImportError as exc:
raise GeminiUnavailableError(
"google-cloud-aiplatform is not installed. "
"Install with: pip install google-cloud-aiplatform"
) from exc
except Exception as exc:
raise GeminiUnavailableError(
f"Failed to initialize Gemini engine: {exc}"
) from exc
def extract_maintenance(
self, pdf_bytes: bytes
) -> MaintenanceExtractionResult:
"""Extract maintenance schedules from a PDF owners manual.
Args:
pdf_bytes: Raw PDF file bytes (<= 20 MB).
Returns:
Structured maintenance extraction result.
Raises:
GeminiProcessingError: If the PDF is too large or extraction fails.
GeminiUnavailableError: If the engine cannot be initialized.
"""
if len(pdf_bytes) > _MAX_PDF_BYTES:
size_mb = len(pdf_bytes) / (1024 * 1024)
raise GeminiProcessingError(
f"PDF size ({size_mb:.1f} MB) exceeds the 20 MB limit for "
"inline processing. Upload to GCS and use a gs:// URI instead."
)
model = self._get_model()
try:
from vertexai.generative_models import Part # type: ignore[import-untyped]
pdf_part = Part.from_data(
data=pdf_bytes,
mime_type="application/pdf",
)
response = model.generate_content(
[pdf_part, _EXTRACTION_PROMPT],
generation_config=self._generation_config,
)
raw = json.loads(response.text)
items = [
MaintenanceItem(
service_name=item["serviceName"],
interval_miles=item.get("intervalMiles"),
interval_months=item.get("intervalMonths"),
details=item.get("details"),
)
for item in raw.get("maintenanceSchedule", [])
]
logger.info(
"Gemini extracted %d maintenance items from PDF (%d bytes)",
len(items),
len(pdf_bytes),
)
return MaintenanceExtractionResult(
items=items,
model=settings.gemini_model,
)
except (GeminiEngineError,):
raise
except json.JSONDecodeError as exc:
raise GeminiProcessingError(
f"Gemini returned invalid JSON: {exc}"
) from exc
except Exception as exc:
raise GeminiProcessingError(
f"Gemini maintenance extraction failed: {exc}"
) from exc