feat: add owner's manual OCR pipeline (refs #71)
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 3m1s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
All checks were successful
Deploy to Staging / Build Images (pull_request) Successful in 3m1s
Deploy to Staging / Deploy to Staging (pull_request) Successful in 31s
Deploy to Staging / Verify Staging (pull_request) Successful in 2m19s
Deploy to Staging / Notify Staging Ready (pull_request) Successful in 7s
Deploy to Staging / Notify Staging Failure (pull_request) Has been skipped
Implement async PDF processing for owner's manuals with maintenance schedule extraction: - Add PDF preprocessor with PyMuPDF for text/scanned PDF handling - Add maintenance pattern matching (mileage, time, fluid specs) - Add service name mapping to maintenance subtypes - Add table detection and parsing for schedule tables - Add manual extractor orchestrating the complete pipeline - Add POST /extract/manual endpoint for async job submission - Add Redis job queue support for manual extraction jobs - Add progress tracking during processing Processing pipeline: 1. Analyze PDF structure (text layer vs scanned) 2. Find maintenance schedule sections 3. Extract text or OCR scanned pages at 300 DPI 4. Detect and parse maintenance tables 5. Normalize service names and extract intervals 6. Return structured maintenance schedules with confidence scores Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
322
ocr/app/table_extraction/detector.py
Normal file
322
ocr/app/table_extraction/detector.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""Table detection for maintenance schedule extraction."""
|
||||
import io
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectedTable:
|
||||
"""A detected table in a document."""
|
||||
|
||||
page_number: int
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
confidence: float
|
||||
is_maintenance_table: bool
|
||||
header_row: Optional[list[str]] = None
|
||||
raw_content: list[list[str]] = field(default_factory=list)
|
||||
|
||||
|
||||
class TableDetector:
|
||||
"""Detect tables in document pages.
|
||||
|
||||
Uses computer vision techniques to identify table regions:
|
||||
1. Line detection for bordered tables
|
||||
2. Text alignment analysis for borderless tables
|
||||
3. Header keyword matching for maintenance schedule identification
|
||||
"""
|
||||
|
||||
# Keywords indicating maintenance schedule table headers
|
||||
MAINTENANCE_HEADERS = [
|
||||
"service", "maintenance", "item", "operation",
|
||||
"miles", "mi", "km", "kilometers",
|
||||
"months", "mo", "interval",
|
||||
"check", "replace", "inspect", "change",
|
||||
"schedule", "frequency",
|
||||
]
|
||||
|
||||
# Keywords in content that indicate maintenance
|
||||
MAINTENANCE_CONTENT_KEYWORDS = [
|
||||
"oil", "filter", "brake", "tire", "coolant",
|
||||
"fluid", "spark plug", "belt", "hose",
|
||||
"inspect", "replace", "change", "check",
|
||||
]
|
||||
|
||||
def detect_tables_in_image(
|
||||
self, image_bytes: bytes, page_number: int = 0
|
||||
) -> list[DetectedTable]:
|
||||
"""
|
||||
Detect tables in an image using line detection.
|
||||
|
||||
Args:
|
||||
image_bytes: PNG/JPEG image bytes
|
||||
page_number: Page number for the result
|
||||
|
||||
Returns:
|
||||
List of DetectedTable objects
|
||||
"""
|
||||
# Load image
|
||||
nparr = np.frombuffer(image_bytes, np.uint8)
|
||||
img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
|
||||
|
||||
if img is None:
|
||||
logger.warning("Failed to decode image for table detection")
|
||||
return []
|
||||
|
||||
# Apply threshold
|
||||
_, binary = cv2.threshold(img, 150, 255, cv2.THRESH_BINARY_INV)
|
||||
|
||||
# Detect horizontal lines
|
||||
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
|
||||
horizontal_lines = cv2.morphologyEx(
|
||||
binary, cv2.MORPH_OPEN, horizontal_kernel, iterations=2
|
||||
)
|
||||
|
||||
# Detect vertical lines
|
||||
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40))
|
||||
vertical_lines = cv2.morphologyEx(
|
||||
binary, cv2.MORPH_OPEN, vertical_kernel, iterations=2
|
||||
)
|
||||
|
||||
# Combine lines
|
||||
table_mask = cv2.add(horizontal_lines, vertical_lines)
|
||||
|
||||
# Find contours
|
||||
contours, _ = cv2.findContours(
|
||||
table_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
|
||||
)
|
||||
|
||||
tables = []
|
||||
height, width = img.shape[:2]
|
||||
|
||||
for contour in contours:
|
||||
x, y, w, h = cv2.boundingRect(contour)
|
||||
|
||||
# Filter by size (tables should be reasonably large)
|
||||
if w < width * 0.3 or h < height * 0.05:
|
||||
continue
|
||||
if w > width * 0.95 and h > height * 0.95:
|
||||
continue # Skip full-page rectangles
|
||||
|
||||
# Calculate confidence based on aspect ratio and size
|
||||
aspect_ratio = w / h if h > 0 else 0
|
||||
size_ratio = (w * h) / (width * height)
|
||||
|
||||
# Tables typically have reasonable aspect ratios
|
||||
if 0.5 <= aspect_ratio <= 10 and 0.01 <= size_ratio <= 0.8:
|
||||
confidence = min(0.9, 0.5 + size_ratio + (1 - abs(aspect_ratio - 2) / 10))
|
||||
|
||||
tables.append(
|
||||
DetectedTable(
|
||||
page_number=page_number,
|
||||
x=x,
|
||||
y=y,
|
||||
width=w,
|
||||
height=h,
|
||||
confidence=confidence,
|
||||
is_maintenance_table=False, # Will be determined later
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f"Detected {len(tables)} potential tables on page {page_number}")
|
||||
return tables
|
||||
|
||||
def detect_tables_in_text(
|
||||
self, text: str, page_number: int = 0
|
||||
) -> list[DetectedTable]:
|
||||
"""
|
||||
Detect table-like structures in text using pattern analysis.
|
||||
|
||||
Useful for native PDFs where text is available.
|
||||
|
||||
Args:
|
||||
text: Extracted text content
|
||||
page_number: Page number
|
||||
|
||||
Returns:
|
||||
List of DetectedTable with content populated
|
||||
"""
|
||||
tables = []
|
||||
lines = text.split("\n")
|
||||
|
||||
# Look for patterns that suggest tabular data
|
||||
# - Multiple columns separated by whitespace or tabs
|
||||
# - Consistent column alignment across rows
|
||||
|
||||
current_table_lines: list[str] = []
|
||||
in_table = False
|
||||
table_start_idx = 0
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
# Check if line looks like table row
|
||||
is_table_row = self._is_table_row(line)
|
||||
|
||||
if is_table_row:
|
||||
if not in_table:
|
||||
in_table = True
|
||||
table_start_idx = i
|
||||
current_table_lines = []
|
||||
current_table_lines.append(line)
|
||||
else:
|
||||
if in_table and len(current_table_lines) >= 3:
|
||||
# End of table, process it
|
||||
table = self._process_text_table(
|
||||
current_table_lines, page_number, table_start_idx
|
||||
)
|
||||
if table:
|
||||
tables.append(table)
|
||||
in_table = False
|
||||
current_table_lines = []
|
||||
|
||||
# Handle table at end of text
|
||||
if in_table and len(current_table_lines) >= 3:
|
||||
table = self._process_text_table(
|
||||
current_table_lines, page_number, table_start_idx
|
||||
)
|
||||
if table:
|
||||
tables.append(table)
|
||||
|
||||
return tables
|
||||
|
||||
def is_maintenance_table(
|
||||
self, table: DetectedTable, full_text: Optional[str] = None
|
||||
) -> bool:
|
||||
"""
|
||||
Determine if a detected table is a maintenance schedule.
|
||||
|
||||
Args:
|
||||
table: Detected table to analyze
|
||||
full_text: Optional surrounding text for context
|
||||
|
||||
Returns:
|
||||
True if likely a maintenance schedule table
|
||||
"""
|
||||
# Check header row for maintenance keywords
|
||||
if table.header_row:
|
||||
header_text = " ".join(table.header_row).lower()
|
||||
header_matches = sum(
|
||||
1 for kw in self.MAINTENANCE_HEADERS if kw in header_text
|
||||
)
|
||||
if header_matches >= 2:
|
||||
return True
|
||||
|
||||
# Check content for maintenance keywords
|
||||
if table.raw_content:
|
||||
content_text = " ".join(
|
||||
" ".join(row) for row in table.raw_content
|
||||
).lower()
|
||||
content_matches = sum(
|
||||
1 for kw in self.MAINTENANCE_CONTENT_KEYWORDS if kw in content_text
|
||||
)
|
||||
if content_matches >= 3:
|
||||
return True
|
||||
|
||||
# Check surrounding text
|
||||
if full_text:
|
||||
text_lower = full_text.lower()
|
||||
context_keywords = [
|
||||
"maintenance schedule",
|
||||
"service schedule",
|
||||
"maintenance interval",
|
||||
"recommended maintenance",
|
||||
]
|
||||
if any(kw in text_lower for kw in context_keywords):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _is_table_row(self, line: str) -> bool:
|
||||
"""Check if a line looks like a table row."""
|
||||
# Skip empty lines
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
return False
|
||||
|
||||
# Check for multiple whitespace-separated columns
|
||||
parts = re.split(r"\s{2,}|\t", stripped)
|
||||
if len(parts) >= 2:
|
||||
# At least 2 columns with content
|
||||
non_empty = [p for p in parts if p.strip()]
|
||||
return len(non_empty) >= 2
|
||||
|
||||
# Check for common table patterns
|
||||
# e.g., "Service Item 5,000 miles 6 months"
|
||||
if re.search(r"\d+[,.]?\d*\s*(miles?|mi\.?|km|months?|mo\.?)", stripped, re.I):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _process_text_table(
|
||||
self, lines: list[str], page_number: int, start_line: int
|
||||
) -> Optional[DetectedTable]:
|
||||
"""Process extracted text lines into a table structure."""
|
||||
if not lines:
|
||||
return None
|
||||
|
||||
# Parse rows
|
||||
rows = []
|
||||
for line in lines:
|
||||
# Split on multiple whitespace or tabs
|
||||
parts = re.split(r"\s{2,}|\t", line.strip())
|
||||
cells = [p.strip() for p in parts if p.strip()]
|
||||
if cells:
|
||||
rows.append(cells)
|
||||
|
||||
if len(rows) < 2:
|
||||
return None
|
||||
|
||||
# First row is likely header
|
||||
header_row = rows[0]
|
||||
|
||||
# Check if this looks like a maintenance table
|
||||
table = DetectedTable(
|
||||
page_number=page_number,
|
||||
x=0, # Text tables don't have coordinates
|
||||
y=start_line,
|
||||
width=0,
|
||||
height=len(rows),
|
||||
confidence=0.7,
|
||||
is_maintenance_table=False,
|
||||
header_row=header_row,
|
||||
raw_content=rows[1:],
|
||||
)
|
||||
|
||||
# Determine if it's a maintenance table
|
||||
table.is_maintenance_table = self.is_maintenance_table(table)
|
||||
|
||||
if table.is_maintenance_table:
|
||||
table.confidence = 0.85
|
||||
|
||||
return table
|
||||
|
||||
def extract_table_text_from_region(
|
||||
self, image_bytes: bytes, table: DetectedTable
|
||||
) -> list[list[str]]:
|
||||
"""
|
||||
Extract text from a table region using OCR.
|
||||
|
||||
Args:
|
||||
image_bytes: Full page image
|
||||
table: Detected table with coordinates
|
||||
|
||||
Returns:
|
||||
2D list of cell contents
|
||||
"""
|
||||
# This would use Tesseract on the cropped region
|
||||
# For now, return empty - actual OCR will be done in manual_extractor
|
||||
logger.debug(f"Table region: ({table.x}, {table.y}) {table.width}x{table.height}")
|
||||
return []
|
||||
|
||||
|
||||
# Singleton instance
|
||||
table_detector = TableDetector()
|
||||
Reference in New Issue
Block a user