feat: add core OCR API integration (refs #65)
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 5m59s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped

OCR Service (Python/FastAPI):
- POST /extract for synchronous OCR extraction
- POST /jobs and GET /jobs/{job_id} for async processing
- Image preprocessing (deskew, denoise) for accuracy
- HEIC conversion via pillow-heif
- Redis job queue for async processing

Backend (Fastify):
- POST /api/ocr/extract - authenticated proxy to OCR
- POST /api/ocr/jobs - async job submission
- GET /api/ocr/jobs/:jobId - job polling
- Multipart file upload handling
- JWT authentication required

File size limits: 10MB sync, 200MB async
Processing time target: <3 seconds for typical photos

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Eric Gullickson
2026-02-01 16:02:11 -06:00
parent 94e49306dc
commit 852c9013b5
25 changed files with 1931 additions and 3 deletions

View File

@@ -0,0 +1,176 @@
"""Image preprocessing service for OCR accuracy improvement."""
import io
import logging
from typing import Optional
import cv2
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
class ImagePreprocessor:
"""Handles image preprocessing for improved OCR accuracy."""
def preprocess(
self,
image_bytes: bytes,
deskew: bool = True,
denoise: bool = True,
binarize: bool = False,
) -> bytes:
"""
Apply preprocessing to an image for better OCR results.
Args:
image_bytes: Raw image bytes
deskew: Whether to correct image rotation
denoise: Whether to apply noise reduction
binarize: Whether to convert to black and white
Returns:
Preprocessed image as PNG bytes
"""
# Convert bytes to numpy array via PIL
pil_image = Image.open(io.BytesIO(image_bytes))
# Convert to RGB if necessary (handles RGBA, grayscale, etc.)
if pil_image.mode not in ("RGB", "L"):
pil_image = pil_image.convert("RGB")
# Convert PIL to OpenCV format
cv_image = np.array(pil_image)
# Convert RGB to BGR for OpenCV (if color image)
if len(cv_image.shape) == 3:
cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR)
# Convert to grayscale for processing
if len(cv_image.shape) == 3:
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
else:
gray = cv_image
# Apply denoising
if denoise:
gray = self._denoise(gray)
# Apply deskewing
if deskew:
gray = self._deskew(gray)
# Apply binarization (optional - can help with some documents)
if binarize:
gray = self._binarize(gray)
# Convert back to PIL and return as PNG bytes
result_image = Image.fromarray(gray)
buffer = io.BytesIO()
result_image.save(buffer, format="PNG")
return buffer.getvalue()
def _denoise(self, image: np.ndarray) -> np.ndarray:
"""Apply noise reduction using non-local means denoising."""
try:
# fastNlMeansDenoising is effective for grayscale images
return cv2.fastNlMeansDenoising(image, h=10, templateWindowSize=7, searchWindowSize=21)
except cv2.error as e:
logger.warning(f"Denoising failed: {e}")
return image
def _deskew(self, image: np.ndarray) -> np.ndarray:
"""Correct image rotation using Hough transform."""
try:
# Detect edges
edges = cv2.Canny(image, 50, 150, apertureSize=3)
# Detect lines using Hough transform
lines = cv2.HoughLinesP(
edges,
rho=1,
theta=np.pi / 180,
threshold=100,
minLineLength=100,
maxLineGap=10,
)
if lines is None:
return image
# Calculate the average angle of detected lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 - x1 != 0: # Avoid division by zero
angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi
# Only consider nearly horizontal lines (within 45 degrees)
if -45 < angle < 45:
angles.append(angle)
if not angles:
return image
# Use median angle to avoid outliers
median_angle = np.median(angles)
# Only correct if skew is significant but not too extreme
if abs(median_angle) < 0.5 or abs(median_angle) > 15:
return image
# Rotate the image to correct skew
height, width = image.shape[:2]
center = (width // 2, height // 2)
rotation_matrix = cv2.getRotationMatrix2D(center, median_angle, 1.0)
# Calculate new image bounds to avoid cropping
cos_val = abs(rotation_matrix[0, 0])
sin_val = abs(rotation_matrix[0, 1])
new_width = int(height * sin_val + width * cos_val)
new_height = int(height * cos_val + width * sin_val)
rotation_matrix[0, 2] += (new_width - width) / 2
rotation_matrix[1, 2] += (new_height - height) / 2
rotated = cv2.warpAffine(
image,
rotation_matrix,
(new_width, new_height),
borderMode=cv2.BORDER_REPLICATE,
)
logger.debug(f"Deskewed image by {median_angle:.2f} degrees")
return rotated
except Exception as e:
logger.warning(f"Deskewing failed: {e}")
return image
def _binarize(self, image: np.ndarray) -> np.ndarray:
"""Convert to binary (black and white) using adaptive thresholding."""
try:
return cv2.adaptiveThreshold(
image,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
blockSize=11,
C=2,
)
except cv2.error as e:
logger.warning(f"Binarization failed: {e}")
return image
def get_image_info(self, image_bytes: bytes) -> dict:
"""Get basic information about an image."""
pil_image = Image.open(io.BytesIO(image_bytes))
return {
"width": pil_image.width,
"height": pil_image.height,
"mode": pil_image.mode,
"format": pil_image.format,
}
# Singleton instance
preprocessor = ImagePreprocessor()