All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 5m59s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
OCR Service (Python/FastAPI):
- POST /extract for synchronous OCR extraction
- POST /jobs and GET /jobs/{job_id} for async processing
- Image preprocessing (deskew, denoise) for accuracy
- HEIC conversion via pillow-heif
- Redis job queue for async processing
Backend (Fastify):
- POST /api/ocr/extract - authenticated proxy to OCR
- POST /api/ocr/jobs - async job submission
- GET /api/ocr/jobs/:jobId - job polling
- Multipart file upload handling
- JWT authentication required
File size limits: 10MB sync, 200MB async
Processing time target: <3 seconds for typical photos
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
177 lines
5.6 KiB
Python
177 lines
5.6 KiB
Python
"""Image preprocessing service for OCR accuracy improvement."""
|
|
import io
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ImagePreprocessor:
|
|
"""Handles image preprocessing for improved OCR accuracy."""
|
|
|
|
def preprocess(
|
|
self,
|
|
image_bytes: bytes,
|
|
deskew: bool = True,
|
|
denoise: bool = True,
|
|
binarize: bool = False,
|
|
) -> bytes:
|
|
"""
|
|
Apply preprocessing to an image for better OCR results.
|
|
|
|
Args:
|
|
image_bytes: Raw image bytes
|
|
deskew: Whether to correct image rotation
|
|
denoise: Whether to apply noise reduction
|
|
binarize: Whether to convert to black and white
|
|
|
|
Returns:
|
|
Preprocessed image as PNG bytes
|
|
"""
|
|
# Convert bytes to numpy array via PIL
|
|
pil_image = Image.open(io.BytesIO(image_bytes))
|
|
|
|
# Convert to RGB if necessary (handles RGBA, grayscale, etc.)
|
|
if pil_image.mode not in ("RGB", "L"):
|
|
pil_image = pil_image.convert("RGB")
|
|
|
|
# Convert PIL to OpenCV format
|
|
cv_image = np.array(pil_image)
|
|
|
|
# Convert RGB to BGR for OpenCV (if color image)
|
|
if len(cv_image.shape) == 3:
|
|
cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR)
|
|
|
|
# Convert to grayscale for processing
|
|
if len(cv_image.shape) == 3:
|
|
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = cv_image
|
|
|
|
# Apply denoising
|
|
if denoise:
|
|
gray = self._denoise(gray)
|
|
|
|
# Apply deskewing
|
|
if deskew:
|
|
gray = self._deskew(gray)
|
|
|
|
# Apply binarization (optional - can help with some documents)
|
|
if binarize:
|
|
gray = self._binarize(gray)
|
|
|
|
# Convert back to PIL and return as PNG bytes
|
|
result_image = Image.fromarray(gray)
|
|
buffer = io.BytesIO()
|
|
result_image.save(buffer, format="PNG")
|
|
return buffer.getvalue()
|
|
|
|
def _denoise(self, image: np.ndarray) -> np.ndarray:
|
|
"""Apply noise reduction using non-local means denoising."""
|
|
try:
|
|
# fastNlMeansDenoising is effective for grayscale images
|
|
return cv2.fastNlMeansDenoising(image, h=10, templateWindowSize=7, searchWindowSize=21)
|
|
except cv2.error as e:
|
|
logger.warning(f"Denoising failed: {e}")
|
|
return image
|
|
|
|
def _deskew(self, image: np.ndarray) -> np.ndarray:
|
|
"""Correct image rotation using Hough transform."""
|
|
try:
|
|
# Detect edges
|
|
edges = cv2.Canny(image, 50, 150, apertureSize=3)
|
|
|
|
# Detect lines using Hough transform
|
|
lines = cv2.HoughLinesP(
|
|
edges,
|
|
rho=1,
|
|
theta=np.pi / 180,
|
|
threshold=100,
|
|
minLineLength=100,
|
|
maxLineGap=10,
|
|
)
|
|
|
|
if lines is None:
|
|
return image
|
|
|
|
# Calculate the average angle of detected lines
|
|
angles = []
|
|
for line in lines:
|
|
x1, y1, x2, y2 = line[0]
|
|
if x2 - x1 != 0: # Avoid division by zero
|
|
angle = np.arctan2(y2 - y1, x2 - x1) * 180 / np.pi
|
|
# Only consider nearly horizontal lines (within 45 degrees)
|
|
if -45 < angle < 45:
|
|
angles.append(angle)
|
|
|
|
if not angles:
|
|
return image
|
|
|
|
# Use median angle to avoid outliers
|
|
median_angle = np.median(angles)
|
|
|
|
# Only correct if skew is significant but not too extreme
|
|
if abs(median_angle) < 0.5 or abs(median_angle) > 15:
|
|
return image
|
|
|
|
# Rotate the image to correct skew
|
|
height, width = image.shape[:2]
|
|
center = (width // 2, height // 2)
|
|
rotation_matrix = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
|
|
|
# Calculate new image bounds to avoid cropping
|
|
cos_val = abs(rotation_matrix[0, 0])
|
|
sin_val = abs(rotation_matrix[0, 1])
|
|
new_width = int(height * sin_val + width * cos_val)
|
|
new_height = int(height * cos_val + width * sin_val)
|
|
|
|
rotation_matrix[0, 2] += (new_width - width) / 2
|
|
rotation_matrix[1, 2] += (new_height - height) / 2
|
|
|
|
rotated = cv2.warpAffine(
|
|
image,
|
|
rotation_matrix,
|
|
(new_width, new_height),
|
|
borderMode=cv2.BORDER_REPLICATE,
|
|
)
|
|
|
|
logger.debug(f"Deskewed image by {median_angle:.2f} degrees")
|
|
return rotated
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Deskewing failed: {e}")
|
|
return image
|
|
|
|
def _binarize(self, image: np.ndarray) -> np.ndarray:
|
|
"""Convert to binary (black and white) using adaptive thresholding."""
|
|
try:
|
|
return cv2.adaptiveThreshold(
|
|
image,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY,
|
|
blockSize=11,
|
|
C=2,
|
|
)
|
|
except cv2.error as e:
|
|
logger.warning(f"Binarization failed: {e}")
|
|
return image
|
|
|
|
def get_image_info(self, image_bytes: bytes) -> dict:
|
|
"""Get basic information about an image."""
|
|
pil_image = Image.open(io.BytesIO(image_bytes))
|
|
return {
|
|
"width": pil_image.width,
|
|
"height": pil_image.height,
|
|
"mode": pil_image.mode,
|
|
"format": pil_image.format,
|
|
}
|
|
|
|
|
|
# Singleton instance
|
|
preprocessor = ImagePreprocessor()
|