feat: add VIN photo OCR pipeline (refs #67)
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 31s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 8s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 31s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 8s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
Implement VIN-specific OCR extraction with optimized preprocessing: - Add POST /extract/vin endpoint for VIN extraction - VIN preprocessor: CLAHE, deskew, denoise, adaptive threshold - VIN validator: check digit validation, OCR error correction (I->1, O->0) - VIN extractor: PSM modes 6/7/8, character whitelist, alternatives - Response includes confidence, bounding box, and alternatives - Unit tests for validator and preprocessor - Integration tests for VIN extraction endpoint Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
10
ocr/app/extractors/__init__.py
Normal file
10
ocr/app/extractors/__init__.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Extractors package for domain-specific OCR extraction."""
|
||||
from app.extractors.base import BaseExtractor, ExtractionResult
|
||||
from app.extractors.vin_extractor import VinExtractor, vin_extractor
|
||||
|
||||
__all__ = [
|
||||
"BaseExtractor",
|
||||
"ExtractionResult",
|
||||
"VinExtractor",
|
||||
"vin_extractor",
|
||||
]
|
||||
47
ocr/app/extractors/base.py
Normal file
47
ocr/app/extractors/base.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Base extractor class for domain-specific OCR extraction."""
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractionResult:
|
||||
"""Base result for extraction operations."""
|
||||
|
||||
success: bool
|
||||
confidence: float
|
||||
raw_text: str
|
||||
processing_time_ms: int
|
||||
extracted_data: dict[str, Any] = field(default_factory=dict)
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class BaseExtractor(ABC):
|
||||
"""Abstract base class for domain-specific extractors."""
|
||||
|
||||
@abstractmethod
|
||||
def extract(self, image_bytes: bytes, content_type: Optional[str] = None) -> ExtractionResult:
|
||||
"""
|
||||
Extract domain-specific data from an image.
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes
|
||||
content_type: MIME type of the image
|
||||
|
||||
Returns:
|
||||
ExtractionResult with extracted data
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def validate(self, data: Any) -> bool:
|
||||
"""
|
||||
Validate extracted data.
|
||||
|
||||
Args:
|
||||
data: Extracted data to validate
|
||||
|
||||
Returns:
|
||||
True if data is valid
|
||||
"""
|
||||
pass
|
||||
275
ocr/app/extractors/vin_extractor.py
Normal file
275
ocr/app/extractors/vin_extractor.py
Normal file
@@ -0,0 +1,275 @@
|
||||
"""VIN-specific OCR extractor with preprocessing and validation."""
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
import magic
|
||||
import pytesseract
|
||||
from PIL import Image
|
||||
from pillow_heif import register_heif_opener
|
||||
|
||||
from app.config import settings
|
||||
from app.extractors.base import BaseExtractor
|
||||
from app.preprocessors.vin_preprocessor import vin_preprocessor, BoundingBox
|
||||
from app.validators.vin_validator import vin_validator
|
||||
|
||||
# Register HEIF/HEIC opener
|
||||
register_heif_opener()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VinAlternative:
|
||||
"""Alternative VIN candidate with confidence."""
|
||||
|
||||
vin: str
|
||||
confidence: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class VinExtractionResult:
|
||||
"""Result of VIN extraction."""
|
||||
|
||||
success: bool
|
||||
vin: Optional[str] = None
|
||||
confidence: float = 0.0
|
||||
bounding_box: Optional[BoundingBox] = None
|
||||
alternatives: list[VinAlternative] = field(default_factory=list)
|
||||
processing_time_ms: int = 0
|
||||
error: Optional[str] = None
|
||||
raw_text: Optional[str] = None
|
||||
|
||||
|
||||
class VinExtractor(BaseExtractor):
|
||||
"""VIN-specific OCR extractor optimized for VIN plates and stickers."""
|
||||
|
||||
# Supported MIME types
|
||||
SUPPORTED_TYPES = {
|
||||
"image/jpeg",
|
||||
"image/png",
|
||||
"image/heic",
|
||||
"image/heif",
|
||||
}
|
||||
|
||||
# VIN character whitelist for Tesseract
|
||||
VIN_WHITELIST = "ABCDEFGHJKLMNPRSTUVWXYZ0123456789"
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize VIN extractor."""
|
||||
pytesseract.pytesseract.tesseract_cmd = settings.tesseract_cmd
|
||||
|
||||
def extract(
|
||||
self, image_bytes: bytes, content_type: Optional[str] = None
|
||||
) -> VinExtractionResult:
|
||||
"""
|
||||
Extract VIN from an image using optimized preprocessing and OCR.
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes (HEIC, JPEG, PNG)
|
||||
content_type: MIME type (auto-detected if not provided)
|
||||
|
||||
Returns:
|
||||
VinExtractionResult with extracted VIN and metadata
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# Detect content type if not provided
|
||||
if not content_type:
|
||||
content_type = self._detect_mime_type(image_bytes)
|
||||
|
||||
# Validate content type
|
||||
if content_type not in self.SUPPORTED_TYPES:
|
||||
return VinExtractionResult(
|
||||
success=False,
|
||||
error=f"Unsupported file type: {content_type}",
|
||||
processing_time_ms=int((time.time() - start_time) * 1000),
|
||||
)
|
||||
|
||||
try:
|
||||
# Apply VIN-optimized preprocessing
|
||||
preprocessing_result = vin_preprocessor.preprocess(image_bytes)
|
||||
preprocessed_bytes = preprocessing_result.image_bytes
|
||||
|
||||
# Perform OCR with VIN-optimized settings
|
||||
raw_text, word_confidences = self._perform_ocr(preprocessed_bytes)
|
||||
|
||||
# Extract VIN candidates from raw text
|
||||
candidates = vin_validator.extract_candidates(raw_text)
|
||||
|
||||
if not candidates:
|
||||
# No VIN candidates found - try with different PSM modes
|
||||
candidates = self._try_alternate_ocr(preprocessed_bytes)
|
||||
|
||||
if not candidates:
|
||||
return VinExtractionResult(
|
||||
success=False,
|
||||
error="No VIN pattern found in image",
|
||||
raw_text=raw_text,
|
||||
processing_time_ms=int((time.time() - start_time) * 1000),
|
||||
)
|
||||
|
||||
# Validate and score candidates
|
||||
scored_candidates = []
|
||||
for vin, start_pos, end_pos in candidates:
|
||||
validation = vin_validator.validate(vin)
|
||||
|
||||
# Calculate confidence
|
||||
base_confidence = self._calculate_base_confidence(word_confidences)
|
||||
adjusted_confidence = min(
|
||||
1.0, max(0.0, base_confidence + validation.confidence_adjustment)
|
||||
)
|
||||
|
||||
scored_candidates.append(
|
||||
(validation.vin, adjusted_confidence, validation.is_valid)
|
||||
)
|
||||
|
||||
# Sort by confidence
|
||||
scored_candidates.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# Primary result is the highest confidence valid candidate
|
||||
primary_vin = None
|
||||
primary_confidence = 0.0
|
||||
|
||||
for vin, confidence, is_valid in scored_candidates:
|
||||
if is_valid:
|
||||
primary_vin = vin
|
||||
primary_confidence = confidence
|
||||
break
|
||||
|
||||
# If no valid candidate, use the highest confidence one
|
||||
if primary_vin is None and scored_candidates:
|
||||
primary_vin = scored_candidates[0][0]
|
||||
primary_confidence = scored_candidates[0][1]
|
||||
|
||||
# Build alternatives list (excluding primary)
|
||||
alternatives = [
|
||||
VinAlternative(vin=vin, confidence=conf)
|
||||
for vin, conf, _ in scored_candidates[1:5] # Max 4 alternatives
|
||||
]
|
||||
|
||||
processing_time_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
logger.info(
|
||||
f"VIN extraction: {primary_vin}, confidence={primary_confidence:.2%}, "
|
||||
f"time={processing_time_ms}ms"
|
||||
)
|
||||
|
||||
return VinExtractionResult(
|
||||
success=True,
|
||||
vin=primary_vin,
|
||||
confidence=primary_confidence,
|
||||
bounding_box=preprocessing_result.bounding_box,
|
||||
alternatives=alternatives,
|
||||
processing_time_ms=processing_time_ms,
|
||||
raw_text=raw_text,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"VIN extraction failed: {e}", exc_info=True)
|
||||
return VinExtractionResult(
|
||||
success=False,
|
||||
error=str(e),
|
||||
processing_time_ms=int((time.time() - start_time) * 1000),
|
||||
)
|
||||
|
||||
def _detect_mime_type(self, file_bytes: bytes) -> str:
|
||||
"""Detect MIME type using python-magic."""
|
||||
mime = magic.Magic(mime=True)
|
||||
detected = mime.from_buffer(file_bytes)
|
||||
return detected or "application/octet-stream"
|
||||
|
||||
def _perform_ocr(
|
||||
self, image_bytes: bytes, psm: int = 6
|
||||
) -> tuple[str, list[float]]:
|
||||
"""
|
||||
Perform OCR with VIN-optimized settings.
|
||||
|
||||
Args:
|
||||
image_bytes: Preprocessed image bytes
|
||||
psm: Tesseract page segmentation mode
|
||||
6 = Uniform block of text
|
||||
7 = Single text line
|
||||
8 = Single word
|
||||
|
||||
Returns:
|
||||
Tuple of (raw_text, word_confidences)
|
||||
"""
|
||||
image = Image.open(io.BytesIO(image_bytes))
|
||||
|
||||
# Configure Tesseract for VIN extraction
|
||||
# Use character whitelist to exclude I, O, Q
|
||||
config = (
|
||||
f"--psm {psm} "
|
||||
f"-c tessedit_char_whitelist={self.VIN_WHITELIST}"
|
||||
)
|
||||
|
||||
# Get detailed OCR data
|
||||
ocr_data = pytesseract.image_to_data(
|
||||
image, config=config, output_type=pytesseract.Output.DICT
|
||||
)
|
||||
|
||||
# Extract words and confidences
|
||||
words = []
|
||||
confidences = []
|
||||
|
||||
for i, text in enumerate(ocr_data["text"]):
|
||||
conf = int(ocr_data["conf"][i])
|
||||
if text.strip() and conf > 0:
|
||||
words.append(text.strip())
|
||||
confidences.append(conf / 100.0)
|
||||
|
||||
raw_text = " ".join(words)
|
||||
return raw_text, confidences
|
||||
|
||||
def _try_alternate_ocr(self, image_bytes: bytes) -> list[tuple[str, int, int]]:
|
||||
"""
|
||||
Try alternate OCR configurations when initial extraction fails.
|
||||
|
||||
Returns:
|
||||
List of VIN candidates
|
||||
"""
|
||||
# Try PSM 7 (single text line)
|
||||
raw_text, _ = self._perform_ocr(image_bytes, psm=7)
|
||||
candidates = vin_validator.extract_candidates(raw_text)
|
||||
if candidates:
|
||||
return candidates
|
||||
|
||||
# Try PSM 8 (single word)
|
||||
raw_text, _ = self._perform_ocr(image_bytes, psm=8)
|
||||
candidates = vin_validator.extract_candidates(raw_text)
|
||||
if candidates:
|
||||
return candidates
|
||||
|
||||
return []
|
||||
|
||||
def _calculate_base_confidence(self, word_confidences: list[float]) -> float:
|
||||
"""Calculate base confidence from word confidences."""
|
||||
if not word_confidences:
|
||||
return 0.5
|
||||
|
||||
# Use average confidence, weighted slightly toward minimum
|
||||
avg_conf = sum(word_confidences) / len(word_confidences)
|
||||
min_conf = min(word_confidences)
|
||||
|
||||
# Blend: 70% average, 30% minimum
|
||||
return 0.7 * avg_conf + 0.3 * min_conf
|
||||
|
||||
def validate(self, data: str) -> bool:
|
||||
"""
|
||||
Validate a VIN string.
|
||||
|
||||
Args:
|
||||
data: VIN string to validate
|
||||
|
||||
Returns:
|
||||
True if VIN is valid
|
||||
"""
|
||||
result = vin_validator.validate(data)
|
||||
return result.is_valid
|
||||
|
||||
|
||||
# Singleton instance
|
||||
vin_extractor = VinExtractor()
|
||||
@@ -55,6 +55,7 @@ async def root() -> dict:
|
||||
"log_level": settings.log_level,
|
||||
"endpoints": [
|
||||
"POST /extract - Synchronous OCR extraction",
|
||||
"POST /extract/vin - VIN-specific extraction with validation",
|
||||
"POST /jobs - Submit async OCR job",
|
||||
"GET /jobs/{job_id} - Get async job status",
|
||||
],
|
||||
|
||||
@@ -1,18 +1,24 @@
|
||||
"""Pydantic models for OCR service."""
|
||||
from .schemas import (
|
||||
BoundingBox,
|
||||
DocumentType,
|
||||
ExtractedField,
|
||||
JobResponse,
|
||||
JobStatus,
|
||||
JobSubmitRequest,
|
||||
OcrResponse,
|
||||
VinAlternative,
|
||||
VinExtractionResponse,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"BoundingBox",
|
||||
"DocumentType",
|
||||
"ExtractedField",
|
||||
"JobResponse",
|
||||
"JobStatus",
|
||||
"JobSubmitRequest",
|
||||
"OcrResponse",
|
||||
"VinAlternative",
|
||||
"VinExtractionResponse",
|
||||
]
|
||||
|
||||
@@ -21,6 +21,36 @@ class ExtractedField(BaseModel):
|
||||
confidence: float = Field(ge=0.0, le=1.0)
|
||||
|
||||
|
||||
class BoundingBox(BaseModel):
|
||||
"""Bounding box for detected region."""
|
||||
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
|
||||
|
||||
class VinAlternative(BaseModel):
|
||||
"""Alternative VIN candidate."""
|
||||
|
||||
vin: str
|
||||
confidence: float = Field(ge=0.0, le=1.0)
|
||||
|
||||
|
||||
class VinExtractionResponse(BaseModel):
|
||||
"""Response from VIN extraction endpoint."""
|
||||
|
||||
success: bool
|
||||
vin: Optional[str] = None
|
||||
confidence: float = Field(ge=0.0, le=1.0)
|
||||
bounding_box: Optional[BoundingBox] = Field(default=None, alias="boundingBox")
|
||||
alternatives: list[VinAlternative] = Field(default_factory=list)
|
||||
processing_time_ms: int = Field(alias="processingTimeMs")
|
||||
error: Optional[str] = None
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
class OcrResponse(BaseModel):
|
||||
"""Response from OCR extraction."""
|
||||
|
||||
|
||||
10
ocr/app/preprocessors/__init__.py
Normal file
10
ocr/app/preprocessors/__init__.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Image preprocessors for OCR optimization."""
|
||||
from app.services.preprocessor import ImagePreprocessor, preprocessor
|
||||
from app.preprocessors.vin_preprocessor import VinPreprocessor, vin_preprocessor
|
||||
|
||||
__all__ = [
|
||||
"ImagePreprocessor",
|
||||
"preprocessor",
|
||||
"VinPreprocessor",
|
||||
"vin_preprocessor",
|
||||
]
|
||||
309
ocr/app/preprocessors/vin_preprocessor.py
Normal file
309
ocr/app/preprocessors/vin_preprocessor.py
Normal file
@@ -0,0 +1,309 @@
|
||||
"""VIN-optimized image preprocessing pipeline."""
|
||||
import io
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from pillow_heif import register_heif_opener
|
||||
|
||||
# Register HEIF/HEIC opener
|
||||
register_heif_opener()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BoundingBox:
|
||||
"""Represents a region in an image."""
|
||||
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class PreprocessingResult:
|
||||
"""Result of VIN preprocessing."""
|
||||
|
||||
image_bytes: bytes
|
||||
bounding_box: Optional[BoundingBox] = None
|
||||
preprocessing_applied: list[str] = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.preprocessing_applied is None:
|
||||
self.preprocessing_applied = []
|
||||
|
||||
|
||||
class VinPreprocessor:
|
||||
"""VIN-optimized image preprocessing for improved OCR accuracy."""
|
||||
|
||||
def preprocess(
|
||||
self,
|
||||
image_bytes: bytes,
|
||||
apply_clahe: bool = True,
|
||||
apply_deskew: bool = True,
|
||||
apply_denoise: bool = True,
|
||||
apply_threshold: bool = True,
|
||||
) -> PreprocessingResult:
|
||||
"""
|
||||
Apply VIN-optimized preprocessing pipeline.
|
||||
|
||||
Pipeline:
|
||||
1. HEIC conversion (if needed)
|
||||
2. Grayscale conversion
|
||||
3. Deskew (correct rotation/tilt)
|
||||
4. Contrast enhancement (CLAHE)
|
||||
5. Noise reduction (fastNlMeansDenoising)
|
||||
6. Adaptive thresholding
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes (HEIC, JPEG, PNG)
|
||||
apply_clahe: Apply CLAHE contrast enhancement
|
||||
apply_deskew: Apply deskew correction
|
||||
apply_denoise: Apply noise reduction
|
||||
apply_threshold: Apply adaptive thresholding
|
||||
|
||||
Returns:
|
||||
PreprocessingResult with processed image bytes
|
||||
"""
|
||||
steps_applied = []
|
||||
|
||||
# Load image with PIL (handles HEIC via pillow-heif)
|
||||
pil_image = Image.open(io.BytesIO(image_bytes))
|
||||
steps_applied.append("loaded")
|
||||
|
||||
# Convert to RGB if needed
|
||||
if pil_image.mode not in ("RGB", "L"):
|
||||
pil_image = pil_image.convert("RGB")
|
||||
steps_applied.append("convert_rgb")
|
||||
|
||||
# Convert to OpenCV format
|
||||
cv_image = np.array(pil_image)
|
||||
if len(cv_image.shape) == 3:
|
||||
cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR)
|
||||
|
||||
# Convert to grayscale
|
||||
if len(cv_image.shape) == 3:
|
||||
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = cv_image
|
||||
steps_applied.append("grayscale")
|
||||
|
||||
# Apply deskew
|
||||
if apply_deskew:
|
||||
gray = self._deskew(gray)
|
||||
steps_applied.append("deskew")
|
||||
|
||||
# Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
|
||||
if apply_clahe:
|
||||
gray = self._apply_clahe(gray)
|
||||
steps_applied.append("clahe")
|
||||
|
||||
# Apply denoising
|
||||
if apply_denoise:
|
||||
gray = self._denoise(gray)
|
||||
steps_applied.append("denoise")
|
||||
|
||||
# Apply adaptive thresholding
|
||||
if apply_threshold:
|
||||
gray = self._adaptive_threshold(gray)
|
||||
steps_applied.append("threshold")
|
||||
|
||||
# Convert back to PNG bytes
|
||||
result_image = Image.fromarray(gray)
|
||||
buffer = io.BytesIO()
|
||||
result_image.save(buffer, format="PNG")
|
||||
|
||||
return PreprocessingResult(
|
||||
image_bytes=buffer.getvalue(),
|
||||
preprocessing_applied=steps_applied,
|
||||
)
|
||||
|
||||
def _apply_clahe(self, image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Apply CLAHE (Contrast Limited Adaptive Histogram Equalization).
|
||||
|
||||
CLAHE improves contrast in images with varying illumination,
|
||||
which is common in VIN photos taken in different lighting conditions.
|
||||
"""
|
||||
try:
|
||||
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
||||
return clahe.apply(image)
|
||||
except cv2.error as e:
|
||||
logger.warning(f"CLAHE failed: {e}")
|
||||
return image
|
||||
|
||||
def _deskew(self, image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Correct image rotation using Hough transform line detection.
|
||||
|
||||
VIN plates/stickers are often photographed at slight angles.
|
||||
"""
|
||||
try:
|
||||
# Detect edges
|
||||
edges = cv2.Canny(image, 50, 150, apertureSize=3)
|
||||
|
||||
# Detect lines
|
||||
lines = cv2.HoughLinesP(
|
||||
edges,
|
||||
rho=1,
|
||||
theta=np.pi / 180,
|
||||
threshold=100,
|
||||
minLineLength=100,
|
||||
maxLineGap=10,
|
||||
)
|
||||
|
||||
if lines is None:
|
||||
return image
|
||||
|
||||
# Calculate angles of detected lines
|
||||
angles = []
|
||||
for line in lines:
|
||||
x1, y1, x2, y2 = line[0]
|
||||
if x2 - x1 != 0:
|
||||
angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi
|
||||
# Only consider nearly horizontal lines
|
||||
if -45 < angle < 45:
|
||||
angles.append(angle)
|
||||
|
||||
if not angles:
|
||||
return image
|
||||
|
||||
# Use median angle to avoid outliers
|
||||
median_angle = np.median(angles)
|
||||
|
||||
# Only correct if skew is significant but not extreme
|
||||
if abs(median_angle) < 0.5 or abs(median_angle) > 20:
|
||||
return image
|
||||
|
||||
# Rotate to correct skew
|
||||
height, width = image.shape[:2]
|
||||
center = (width // 2, height // 2)
|
||||
rotation_matrix = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
||||
|
||||
# Calculate new bounds
|
||||
cos_val = abs(rotation_matrix[0, 0])
|
||||
sin_val = abs(rotation_matrix[0, 1])
|
||||
new_width = int(height * sin_val + width * cos_val)
|
||||
new_height = int(height * cos_val + width * sin_val)
|
||||
|
||||
rotation_matrix[0, 2] += (new_width - width) / 2
|
||||
rotation_matrix[1, 2] += (new_height - height) / 2
|
||||
|
||||
rotated = cv2.warpAffine(
|
||||
image,
|
||||
rotation_matrix,
|
||||
(new_width, new_height),
|
||||
borderMode=cv2.BORDER_REPLICATE,
|
||||
)
|
||||
|
||||
logger.debug(f"Deskewed by {median_angle:.2f} degrees")
|
||||
return rotated
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Deskew failed: {e}")
|
||||
return image
|
||||
|
||||
def _denoise(self, image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Apply non-local means denoising.
|
||||
|
||||
This helps remove noise while preserving VIN character edges.
|
||||
"""
|
||||
try:
|
||||
return cv2.fastNlMeansDenoising(
|
||||
image, h=10, templateWindowSize=7, searchWindowSize=21
|
||||
)
|
||||
except cv2.error as e:
|
||||
logger.warning(f"Denoising failed: {e}")
|
||||
return image
|
||||
|
||||
def _adaptive_threshold(self, image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Apply adaptive thresholding for binarization.
|
||||
|
||||
Adaptive thresholding handles varying illumination across the image,
|
||||
which is common in VIN photos.
|
||||
"""
|
||||
try:
|
||||
return cv2.adaptiveThreshold(
|
||||
image,
|
||||
255,
|
||||
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||||
cv2.THRESH_BINARY,
|
||||
blockSize=11,
|
||||
C=2,
|
||||
)
|
||||
except cv2.error as e:
|
||||
logger.warning(f"Adaptive threshold failed: {e}")
|
||||
return image
|
||||
|
||||
def detect_vin_region(self, image_bytes: bytes) -> Optional[BoundingBox]:
|
||||
"""
|
||||
Attempt to detect the VIN region in an image.
|
||||
|
||||
Uses contour detection to find rectangular regions that might contain VINs.
|
||||
|
||||
Args:
|
||||
image_bytes: Raw image bytes
|
||||
|
||||
Returns:
|
||||
BoundingBox of detected VIN region, or None if not found
|
||||
"""
|
||||
try:
|
||||
pil_image = Image.open(io.BytesIO(image_bytes))
|
||||
if pil_image.mode != "L":
|
||||
pil_image = pil_image.convert("L")
|
||||
|
||||
cv_image = np.array(pil_image)
|
||||
|
||||
# Apply preprocessing for better contour detection
|
||||
blurred = cv2.GaussianBlur(cv_image, (5, 5), 0)
|
||||
edges = cv2.Canny(blurred, 50, 150)
|
||||
|
||||
# Find contours
|
||||
contours, _ = cv2.findContours(
|
||||
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
|
||||
)
|
||||
|
||||
if not contours:
|
||||
return None
|
||||
|
||||
# Find rectangular contours with appropriate aspect ratio for VIN
|
||||
# VIN is typically 17 characters, roughly 5:1 to 10:1 aspect ratio
|
||||
vin_candidates = []
|
||||
|
||||
for contour in contours:
|
||||
x, y, w, h = cv2.boundingRect(contour)
|
||||
if h == 0:
|
||||
continue
|
||||
|
||||
aspect_ratio = w / h
|
||||
area = w * h
|
||||
|
||||
# VIN regions typically have:
|
||||
# - Aspect ratio between 4:1 and 12:1
|
||||
# - Minimum area (to filter out noise)
|
||||
if 4 <= aspect_ratio <= 12 and area > 1000:
|
||||
vin_candidates.append((x, y, w, h, area))
|
||||
|
||||
if not vin_candidates:
|
||||
return None
|
||||
|
||||
# Return the largest candidate
|
||||
vin_candidates.sort(key=lambda c: c[4], reverse=True)
|
||||
x, y, w, h, _ = vin_candidates[0]
|
||||
|
||||
return BoundingBox(x=x, y=y, width=w, height=h)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"VIN region detection failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# Singleton instance
|
||||
vin_preprocessor = VinPreprocessor()
|
||||
@@ -3,7 +3,8 @@ import logging
|
||||
|
||||
from fastapi import APIRouter, File, HTTPException, Query, UploadFile
|
||||
|
||||
from app.models import OcrResponse
|
||||
from app.extractors.vin_extractor import vin_extractor
|
||||
from app.models import BoundingBox, OcrResponse, VinAlternative, VinExtractionResponse
|
||||
from app.services import ocr_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -67,3 +68,89 @@ async def extract_text(
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/vin", response_model=VinExtractionResponse)
|
||||
async def extract_vin(
|
||||
file: UploadFile = File(..., description="Image file containing VIN"),
|
||||
) -> VinExtractionResponse:
|
||||
"""
|
||||
Extract VIN (Vehicle Identification Number) from an uploaded image.
|
||||
|
||||
Uses VIN-optimized preprocessing and pattern matching:
|
||||
- HEIC conversion (if needed)
|
||||
- Grayscale conversion
|
||||
- Deskew correction
|
||||
- CLAHE contrast enhancement
|
||||
- Noise reduction
|
||||
- Adaptive thresholding
|
||||
- VIN pattern matching (17 chars, excludes I/O/Q)
|
||||
- Check digit validation
|
||||
- Common OCR error correction (I->1, O->0, Q->0)
|
||||
|
||||
Supports HEIC, JPEG, PNG formats.
|
||||
Processing time target: <3 seconds.
|
||||
|
||||
- **file**: Image file (max 10MB)
|
||||
|
||||
Returns:
|
||||
- **vin**: Extracted VIN (17 alphanumeric characters)
|
||||
- **confidence**: Confidence score (0.0-1.0)
|
||||
- **boundingBox**: Location of VIN in image (if detected)
|
||||
- **alternatives**: Other VIN candidates with confidence scores
|
||||
- **processingTimeMs**: Processing time in milliseconds
|
||||
"""
|
||||
# Validate file presence
|
||||
if not file.filename:
|
||||
raise HTTPException(status_code=400, detail="No file provided")
|
||||
|
||||
# Read file content
|
||||
content = await file.read()
|
||||
file_size = len(content)
|
||||
|
||||
# Validate file size
|
||||
if file_size > MAX_SYNC_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File too large. Max: {MAX_SYNC_SIZE // (1024*1024)}MB",
|
||||
)
|
||||
|
||||
if file_size == 0:
|
||||
raise HTTPException(status_code=400, detail="Empty file provided")
|
||||
|
||||
logger.info(
|
||||
f"VIN extraction: {file.filename}, "
|
||||
f"size: {file_size} bytes, "
|
||||
f"content_type: {file.content_type}"
|
||||
)
|
||||
|
||||
# Perform VIN extraction
|
||||
result = vin_extractor.extract(
|
||||
image_bytes=content,
|
||||
content_type=file.content_type,
|
||||
)
|
||||
|
||||
# Convert internal result to API response
|
||||
bounding_box = None
|
||||
if result.bounding_box:
|
||||
bounding_box = BoundingBox(
|
||||
x=result.bounding_box.x,
|
||||
y=result.bounding_box.y,
|
||||
width=result.bounding_box.width,
|
||||
height=result.bounding_box.height,
|
||||
)
|
||||
|
||||
alternatives = [
|
||||
VinAlternative(vin=alt.vin, confidence=alt.confidence)
|
||||
for alt in result.alternatives
|
||||
]
|
||||
|
||||
return VinExtractionResponse(
|
||||
success=result.success,
|
||||
vin=result.vin,
|
||||
confidence=result.confidence,
|
||||
boundingBox=bounding_box,
|
||||
alternatives=alternatives,
|
||||
processingTimeMs=result.processing_time_ms,
|
||||
error=result.error,
|
||||
)
|
||||
|
||||
4
ocr/app/validators/__init__.py
Normal file
4
ocr/app/validators/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""Validators package for OCR data validation."""
|
||||
from app.validators.vin_validator import VinValidator, vin_validator
|
||||
|
||||
__all__ = ["VinValidator", "vin_validator"]
|
||||
259
ocr/app/validators/vin_validator.py
Normal file
259
ocr/app/validators/vin_validator.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""VIN validation with check digit verification and OCR error correction."""
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class VinValidationResult:
|
||||
"""Result of VIN validation."""
|
||||
|
||||
is_valid: bool
|
||||
vin: str
|
||||
confidence_adjustment: float
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class VinValidator:
|
||||
"""Validates and corrects VIN strings."""
|
||||
|
||||
# VIN character set (excludes I, O, Q)
|
||||
VALID_CHARS = set("ABCDEFGHJKLMNPRSTUVWXYZ0123456789")
|
||||
|
||||
# Common OCR misreads and their corrections
|
||||
TRANSLITERATION = {
|
||||
"I": "1",
|
||||
"O": "0",
|
||||
"Q": "0",
|
||||
"i": "1",
|
||||
"o": "0",
|
||||
"q": "0",
|
||||
"l": "1",
|
||||
"L": "1",
|
||||
"B": "8", # Sometimes confused
|
||||
"S": "5", # Sometimes confused
|
||||
}
|
||||
|
||||
# Weights for check digit calculation (positions 1-17)
|
||||
CHECK_WEIGHTS = [8, 7, 6, 5, 4, 3, 2, 10, 0, 9, 8, 7, 6, 5, 4, 3, 2]
|
||||
|
||||
# Character to value mapping for check digit
|
||||
CHAR_VALUES = {
|
||||
"A": 1,
|
||||
"B": 2,
|
||||
"C": 3,
|
||||
"D": 4,
|
||||
"E": 5,
|
||||
"F": 6,
|
||||
"G": 7,
|
||||
"H": 8,
|
||||
"J": 1,
|
||||
"K": 2,
|
||||
"L": 3,
|
||||
"M": 4,
|
||||
"N": 5,
|
||||
"P": 7,
|
||||
"R": 9,
|
||||
"S": 2,
|
||||
"T": 3,
|
||||
"U": 4,
|
||||
"V": 5,
|
||||
"W": 6,
|
||||
"X": 7,
|
||||
"Y": 8,
|
||||
"Z": 9,
|
||||
"0": 0,
|
||||
"1": 1,
|
||||
"2": 2,
|
||||
"3": 3,
|
||||
"4": 4,
|
||||
"5": 5,
|
||||
"6": 6,
|
||||
"7": 7,
|
||||
"8": 8,
|
||||
"9": 9,
|
||||
}
|
||||
|
||||
# Modern VIN pattern (1981+): exactly 17 alphanumeric, no I/O/Q
|
||||
MODERN_VIN_PATTERN = re.compile(r"^[A-HJ-NPR-Z0-9]{17}$")
|
||||
|
||||
# Pre-1981 VIN pattern: 11-17 characters
|
||||
LEGACY_VIN_PATTERN = re.compile(r"^[A-HJ-NPR-Z0-9]{11,17}$")
|
||||
|
||||
def correct_ocr_errors(self, vin: str) -> str:
|
||||
"""
|
||||
Apply common OCR error corrections to a VIN string.
|
||||
|
||||
Args:
|
||||
vin: Raw VIN string from OCR
|
||||
|
||||
Returns:
|
||||
Corrected VIN string
|
||||
"""
|
||||
corrected = vin.upper().strip()
|
||||
|
||||
# Remove any spaces or dashes (common in formatted VINs)
|
||||
corrected = corrected.replace(" ", "").replace("-", "")
|
||||
|
||||
# Apply transliteration for common OCR errors
|
||||
result = []
|
||||
for char in corrected:
|
||||
if char in self.TRANSLITERATION:
|
||||
result.append(self.TRANSLITERATION[char])
|
||||
else:
|
||||
result.append(char)
|
||||
|
||||
return "".join(result)
|
||||
|
||||
def calculate_check_digit(self, vin: str) -> Optional[str]:
|
||||
"""
|
||||
Calculate the check digit (position 9) for a VIN.
|
||||
|
||||
Args:
|
||||
vin: 17-character VIN string
|
||||
|
||||
Returns:
|
||||
Expected check digit character, or None if calculation fails
|
||||
"""
|
||||
if len(vin) != 17:
|
||||
return None
|
||||
|
||||
try:
|
||||
total = 0
|
||||
for i, char in enumerate(vin.upper()):
|
||||
if i == 8: # Skip check digit position
|
||||
continue
|
||||
value = self.CHAR_VALUES.get(char)
|
||||
if value is None:
|
||||
return None
|
||||
total += value * self.CHECK_WEIGHTS[i]
|
||||
|
||||
remainder = total % 11
|
||||
if remainder == 10:
|
||||
return "X"
|
||||
return str(remainder)
|
||||
except (KeyError, ValueError):
|
||||
return None
|
||||
|
||||
def validate_check_digit(self, vin: str) -> bool:
|
||||
"""
|
||||
Validate the check digit of a VIN.
|
||||
|
||||
Args:
|
||||
vin: 17-character VIN string
|
||||
|
||||
Returns:
|
||||
True if check digit is valid
|
||||
"""
|
||||
if len(vin) != 17:
|
||||
return False
|
||||
|
||||
expected = self.calculate_check_digit(vin)
|
||||
if expected is None:
|
||||
return False
|
||||
|
||||
return vin[8].upper() == expected
|
||||
|
||||
def validate(
|
||||
self, vin: str, correct_errors: bool = True, allow_legacy: bool = False
|
||||
) -> VinValidationResult:
|
||||
"""
|
||||
Validate a VIN string and optionally correct OCR errors.
|
||||
|
||||
Args:
|
||||
vin: VIN string to validate
|
||||
correct_errors: Whether to apply OCR error corrections
|
||||
allow_legacy: Whether to allow pre-1981 VINs (11-17 chars)
|
||||
|
||||
Returns:
|
||||
VinValidationResult with validation status and corrected VIN
|
||||
"""
|
||||
if not vin:
|
||||
return VinValidationResult(
|
||||
is_valid=False, vin="", confidence_adjustment=-1.0, error="Empty VIN"
|
||||
)
|
||||
|
||||
# Apply error corrections if enabled
|
||||
corrected_vin = self.correct_ocr_errors(vin) if correct_errors else vin.upper()
|
||||
|
||||
# Check length
|
||||
if len(corrected_vin) != 17:
|
||||
if allow_legacy and 11 <= len(corrected_vin) <= 17:
|
||||
# Legacy VIN - reduced confidence
|
||||
if self.LEGACY_VIN_PATTERN.match(corrected_vin):
|
||||
return VinValidationResult(
|
||||
is_valid=True,
|
||||
vin=corrected_vin,
|
||||
confidence_adjustment=-0.2,
|
||||
)
|
||||
return VinValidationResult(
|
||||
is_valid=False,
|
||||
vin=corrected_vin,
|
||||
confidence_adjustment=-0.5,
|
||||
error=f"Invalid length: {len(corrected_vin)} (expected 17)",
|
||||
)
|
||||
|
||||
# Check character set
|
||||
if not self.MODERN_VIN_PATTERN.match(corrected_vin):
|
||||
invalid_chars = [c for c in corrected_vin if c not in self.VALID_CHARS]
|
||||
return VinValidationResult(
|
||||
is_valid=False,
|
||||
vin=corrected_vin,
|
||||
confidence_adjustment=-0.3,
|
||||
error=f"Invalid characters: {invalid_chars}",
|
||||
)
|
||||
|
||||
# Validate check digit
|
||||
if self.validate_check_digit(corrected_vin):
|
||||
# Valid check digit - boost confidence
|
||||
return VinValidationResult(
|
||||
is_valid=True, vin=corrected_vin, confidence_adjustment=0.1
|
||||
)
|
||||
else:
|
||||
# Invalid check digit - could be OCR error or old VIN
|
||||
return VinValidationResult(
|
||||
is_valid=True, # Still return as valid but with reduced confidence
|
||||
vin=corrected_vin,
|
||||
confidence_adjustment=-0.15,
|
||||
error="Check digit validation failed",
|
||||
)
|
||||
|
||||
def extract_candidates(
|
||||
self, text: str, max_candidates: int = 5
|
||||
) -> list[tuple[str, int, int]]:
|
||||
"""
|
||||
Extract VIN candidates from raw OCR text.
|
||||
|
||||
Args:
|
||||
text: Raw OCR text
|
||||
max_candidates: Maximum number of candidates to return
|
||||
|
||||
Returns:
|
||||
List of (vin, start_pos, end_pos) tuples
|
||||
"""
|
||||
# Pattern to find potential VIN sequences
|
||||
# Allow some flexibility for OCR errors (include I, O, Q for correction later)
|
||||
potential_vin_pattern = re.compile(r"[A-Z0-9IOQ]{11,17}", re.IGNORECASE)
|
||||
|
||||
candidates = []
|
||||
for match in potential_vin_pattern.finditer(text.upper()):
|
||||
candidate = match.group()
|
||||
corrected = self.correct_ocr_errors(candidate)
|
||||
|
||||
# Only include if it could be a valid VIN after correction
|
||||
if len(corrected) == 17 and self.MODERN_VIN_PATTERN.match(corrected):
|
||||
candidates.append((corrected, match.start(), match.end()))
|
||||
|
||||
# Sort by likelihood of being valid (check digit validation)
|
||||
def score_candidate(c: tuple[str, int, int]) -> int:
|
||||
vin = c[0]
|
||||
if self.validate_check_digit(vin):
|
||||
return 0 # Best score
|
||||
return 1
|
||||
|
||||
candidates.sort(key=score_candidate)
|
||||
return candidates[:max_candidates]
|
||||
|
||||
|
||||
# Singleton instance
|
||||
vin_validator = VinValidator()
|
||||
Reference in New Issue
Block a user