Files
motovaultpro/ocr/app/services/preprocessor.py
Eric Gullickson 852c9013b5
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 5m59s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
feat: add core OCR API integration (refs #65)
OCR Service (Python/FastAPI):
- POST /extract for synchronous OCR extraction
- POST /jobs and GET /jobs/{job_id} for async processing
- Image preprocessing (deskew, denoise) for accuracy
- HEIC conversion via pillow-heif
- Redis job queue for async processing

Backend (Fastify):
- POST /api/ocr/extract - authenticated proxy to OCR
- POST /api/ocr/jobs - async job submission
- GET /api/ocr/jobs/:jobId - job polling
- Multipart file upload handling
- JWT authentication required

File size limits: 10MB sync, 200MB async
Processing time target: <3 seconds for typical photos

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 16:02:11 -06:00

177 lines
5.6 KiB
Python

"""Image preprocessing service for OCR accuracy improvement."""
import io
import logging
from typing import Optional
import cv2
import numpy as np
from PIL import Image
logger = logging.getLogger(__name__)
class ImagePreprocessor:
"""Handles image preprocessing for improved OCR accuracy."""
def preprocess(
self,
image_bytes: bytes,
deskew: bool = True,
denoise: bool = True,
binarize: bool = False,
) -> bytes:
"""
Apply preprocessing to an image for better OCR results.
Args:
image_bytes: Raw image bytes
deskew: Whether to correct image rotation
denoise: Whether to apply noise reduction
binarize: Whether to convert to black and white
Returns:
Preprocessed image as PNG bytes
"""
# Convert bytes to numpy array via PIL
pil_image = Image.open(io.BytesIO(image_bytes))
# Convert to RGB if necessary (handles RGBA, grayscale, etc.)
if pil_image.mode not in ("RGB", "L"):
pil_image = pil_image.convert("RGB")
# Convert PIL to OpenCV format
cv_image = np.array(pil_image)
# Convert RGB to BGR for OpenCV (if color image)
if len(cv_image.shape) == 3:
cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR)
# Convert to grayscale for processing
if len(cv_image.shape) == 3:
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
else:
gray = cv_image
# Apply denoising
if denoise:
gray = self._denoise(gray)
# Apply deskewing
if deskew:
gray = self._deskew(gray)
# Apply binarization (optional - can help with some documents)
if binarize:
gray = self._binarize(gray)
# Convert back to PIL and return as PNG bytes
result_image = Image.fromarray(gray)
buffer = io.BytesIO()
result_image.save(buffer, format="PNG")
return buffer.getvalue()
def _denoise(self, image: np.ndarray) -> np.ndarray:
"""Apply noise reduction using non-local means denoising."""
try:
# fastNlMeansDenoising is effective for grayscale images
return cv2.fastNlMeansDenoising(image, h=10, templateWindowSize=7, searchWindowSize=21)
except cv2.error as e:
logger.warning(f"Denoising failed: {e}")
return image
def _deskew(self, image: np.ndarray) -> np.ndarray:
"""Correct image rotation using Hough transform."""
try:
# Detect edges
edges = cv2.Canny(image, 50, 150, apertureSize=3)
# Detect lines using Hough transform
lines = cv2.HoughLinesP(
edges,
rho=1,
theta=np.pi / 180,
threshold=100,
minLineLength=100,
maxLineGap=10,
)
if lines is None:
return image
# Calculate the average angle of detected lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 - x1 != 0: # Avoid division by zero
angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi
# Only consider nearly horizontal lines (within 45 degrees)
if -45 < angle < 45:
angles.append(angle)
if not angles:
return image
# Use median angle to avoid outliers
median_angle = np.median(angles)
# Only correct if skew is significant but not too extreme
if abs(median_angle) < 0.5 or abs(median_angle) > 15:
return image
# Rotate the image to correct skew
height, width = image.shape[:2]
center = (width // 2, height // 2)
rotation_matrix = cv2.getRotationMatrix2D(center, median_angle, 1.0)
# Calculate new image bounds to avoid cropping
cos_val = abs(rotation_matrix[0, 0])
sin_val = abs(rotation_matrix[0, 1])
new_width = int(height * sin_val + width * cos_val)
new_height = int(height * cos_val + width * sin_val)
rotation_matrix[0, 2] += (new_width - width) / 2
rotation_matrix[1, 2] += (new_height - height) / 2
rotated = cv2.warpAffine(
image,
rotation_matrix,
(new_width, new_height),
borderMode=cv2.BORDER_REPLICATE,
)
logger.debug(f"Deskewed image by {median_angle:.2f} degrees")
return rotated
except Exception as e:
logger.warning(f"Deskewing failed: {e}")
return image
def _binarize(self, image: np.ndarray) -> np.ndarray:
"""Convert to binary (black and white) using adaptive thresholding."""
try:
return cv2.adaptiveThreshold(
image,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
blockSize=11,
C=2,
)
except cv2.error as e:
logger.warning(f"Binarization failed: {e}")
return image
def get_image_info(self, image_bytes: bytes) -> dict:
"""Get basic information about an image."""
pil_image = Image.open(io.BytesIO(image_bytes))
return {
"width": pil_image.width,
"height": pil_image.height,
"mode": pil_image.mode,
"format": pil_image.format,
}
# Singleton instance
preprocessor = ImagePreprocessor()