Some checks failed
Deploy to Staging / Build Images (pull_request) Failing after 4m14s
Deploy to Staging / Deploy to Staging (pull_request) Has been skipped
Deploy to Staging / Verify Staging (pull_request) Has been skipped
Deploy to Staging / Notify Staging Ready (pull_request) Has been skipped
Deploy to Staging / Notify Staging Failure (pull_request) Successful in 8s
323 lines
10 KiB
Python
323 lines
10 KiB
Python
"""Table detection for maintenance schedule extraction."""
|
|
import io
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class DetectedTable:
|
|
"""A detected table in a document."""
|
|
|
|
page_number: int
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
confidence: float
|
|
is_maintenance_table: bool
|
|
header_row: Optional[list[str]] = None
|
|
raw_content: list[list[str]] = field(default_factory=list)
|
|
|
|
|
|
class TableDetector:
|
|
"""Detect tables in document pages.
|
|
|
|
Uses computer vision techniques to identify table regions:
|
|
1. Line detection for bordered tables
|
|
2. Text alignment analysis for borderless tables
|
|
3. Header keyword matching for maintenance schedule identification
|
|
"""
|
|
|
|
# Keywords indicating maintenance schedule table headers
|
|
MAINTENANCE_HEADERS = [
|
|
"service", "maintenance", "item", "operation",
|
|
"miles", "mi", "km", "kilometers",
|
|
"months", "mo", "interval",
|
|
"check", "replace", "inspect", "change",
|
|
"schedule", "frequency",
|
|
]
|
|
|
|
# Keywords in content that indicate maintenance
|
|
MAINTENANCE_CONTENT_KEYWORDS = [
|
|
"oil", "filter", "brake", "tire", "coolant",
|
|
"fluid", "spark plug", "belt", "hose",
|
|
"inspect", "replace", "change", "check",
|
|
]
|
|
|
|
def detect_tables_in_image(
|
|
self, image_bytes: bytes, page_number: int = 0
|
|
) -> list[DetectedTable]:
|
|
"""
|
|
Detect tables in an image using line detection.
|
|
|
|
Args:
|
|
image_bytes: PNG/JPEG image bytes
|
|
page_number: Page number for the result
|
|
|
|
Returns:
|
|
List of DetectedTable objects
|
|
"""
|
|
# Load image
|
|
nparr = np.frombuffer(image_bytes, np.uint8)
|
|
img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
|
|
|
|
if img is None:
|
|
logger.warning("Failed to decode image for table detection")
|
|
return []
|
|
|
|
# Apply threshold
|
|
_, binary = cv2.threshold(img, 150, 255, cv2.THRESH_BINARY_INV)
|
|
|
|
# Detect horizontal lines
|
|
horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1))
|
|
horizontal_lines = cv2.morphologyEx(
|
|
binary, cv2.MORPH_OPEN, horizontal_kernel, iterations=2
|
|
)
|
|
|
|
# Detect vertical lines
|
|
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40))
|
|
vertical_lines = cv2.morphologyEx(
|
|
binary, cv2.MORPH_OPEN, vertical_kernel, iterations=2
|
|
)
|
|
|
|
# Combine lines
|
|
table_mask = cv2.add(horizontal_lines, vertical_lines)
|
|
|
|
# Find contours
|
|
contours, _ = cv2.findContours(
|
|
table_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
|
|
)
|
|
|
|
tables = []
|
|
height, width = img.shape[:2]
|
|
|
|
for contour in contours:
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
|
|
# Filter by size (tables should be reasonably large)
|
|
if w < width * 0.3 or h < height * 0.05:
|
|
continue
|
|
if w > width * 0.95 and h > height * 0.95:
|
|
continue # Skip full-page rectangles
|
|
|
|
# Calculate confidence based on aspect ratio and size
|
|
aspect_ratio = w / h if h > 0 else 0
|
|
size_ratio = (w * h) / (width * height)
|
|
|
|
# Tables typically have reasonable aspect ratios
|
|
if 0.5 <= aspect_ratio <= 10 and 0.01 <= size_ratio <= 0.8:
|
|
confidence = min(0.9, 0.5 + size_ratio + (1 - abs(aspect_ratio - 2) / 10))
|
|
|
|
tables.append(
|
|
DetectedTable(
|
|
page_number=page_number,
|
|
x=x,
|
|
y=y,
|
|
width=w,
|
|
height=h,
|
|
confidence=confidence,
|
|
is_maintenance_table=False, # Will be determined later
|
|
)
|
|
)
|
|
|
|
logger.debug(f"Detected {len(tables)} potential tables on page {page_number}")
|
|
return tables
|
|
|
|
def detect_tables_in_text(
|
|
self, text: str, page_number: int = 0
|
|
) -> list[DetectedTable]:
|
|
"""
|
|
Detect table-like structures in text using pattern analysis.
|
|
|
|
Useful for native PDFs where text is available.
|
|
|
|
Args:
|
|
text: Extracted text content
|
|
page_number: Page number
|
|
|
|
Returns:
|
|
List of DetectedTable with content populated
|
|
"""
|
|
tables = []
|
|
lines = text.split("\n")
|
|
|
|
# Look for patterns that suggest tabular data
|
|
# - Multiple columns separated by whitespace or tabs
|
|
# - Consistent column alignment across rows
|
|
|
|
current_table_lines: list[str] = []
|
|
in_table = False
|
|
table_start_idx = 0
|
|
|
|
for i, line in enumerate(lines):
|
|
# Check if line looks like table row
|
|
is_table_row = self._is_table_row(line)
|
|
|
|
if is_table_row:
|
|
if not in_table:
|
|
in_table = True
|
|
table_start_idx = i
|
|
current_table_lines = []
|
|
current_table_lines.append(line)
|
|
else:
|
|
if in_table and len(current_table_lines) >= 3:
|
|
# End of table, process it
|
|
table = self._process_text_table(
|
|
current_table_lines, page_number, table_start_idx
|
|
)
|
|
if table:
|
|
tables.append(table)
|
|
in_table = False
|
|
current_table_lines = []
|
|
|
|
# Handle table at end of text
|
|
if in_table and len(current_table_lines) >= 3:
|
|
table = self._process_text_table(
|
|
current_table_lines, page_number, table_start_idx
|
|
)
|
|
if table:
|
|
tables.append(table)
|
|
|
|
return tables
|
|
|
|
def is_maintenance_table(
|
|
self, table: DetectedTable, full_text: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Determine if a detected table is a maintenance schedule.
|
|
|
|
Args:
|
|
table: Detected table to analyze
|
|
full_text: Optional surrounding text for context
|
|
|
|
Returns:
|
|
True if likely a maintenance schedule table
|
|
"""
|
|
# Check header row for maintenance keywords
|
|
if table.header_row:
|
|
header_text = " ".join(table.header_row).lower()
|
|
header_matches = sum(
|
|
1 for kw in self.MAINTENANCE_HEADERS if kw in header_text
|
|
)
|
|
if header_matches >= 2:
|
|
return True
|
|
|
|
# Check content for maintenance keywords
|
|
if table.raw_content:
|
|
content_text = " ".join(
|
|
" ".join(row) for row in table.raw_content
|
|
).lower()
|
|
content_matches = sum(
|
|
1 for kw in self.MAINTENANCE_CONTENT_KEYWORDS if kw in content_text
|
|
)
|
|
if content_matches >= 3:
|
|
return True
|
|
|
|
# Check surrounding text
|
|
if full_text:
|
|
text_lower = full_text.lower()
|
|
context_keywords = [
|
|
"maintenance schedule",
|
|
"service schedule",
|
|
"maintenance interval",
|
|
"recommended maintenance",
|
|
]
|
|
if any(kw in text_lower for kw in context_keywords):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _is_table_row(self, line: str) -> bool:
|
|
"""Check if a line looks like a table row."""
|
|
# Skip empty lines
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
return False
|
|
|
|
# Check for multiple whitespace-separated columns
|
|
parts = re.split(r"\s{2,}|\t", stripped)
|
|
if len(parts) >= 2:
|
|
# At least 2 columns with content
|
|
non_empty = [p for p in parts if p.strip()]
|
|
return len(non_empty) >= 2
|
|
|
|
# Check for common table patterns
|
|
# e.g., "Service Item 5,000 miles 6 months"
|
|
if re.search(r"\d+[,.]?\d*\s*(miles?|mi\.?|km|months?|mo\.?)", stripped, re.I):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _process_text_table(
|
|
self, lines: list[str], page_number: int, start_line: int
|
|
) -> Optional[DetectedTable]:
|
|
"""Process extracted text lines into a table structure."""
|
|
if not lines:
|
|
return None
|
|
|
|
# Parse rows
|
|
rows = []
|
|
for line in lines:
|
|
# Split on multiple whitespace or tabs
|
|
parts = re.split(r"\s{2,}|\t", line.strip())
|
|
cells = [p.strip() for p in parts if p.strip()]
|
|
if cells:
|
|
rows.append(cells)
|
|
|
|
if len(rows) < 2:
|
|
return None
|
|
|
|
# First row is likely header
|
|
header_row = rows[0]
|
|
|
|
# Check if this looks like a maintenance table
|
|
table = DetectedTable(
|
|
page_number=page_number,
|
|
x=0, # Text tables don't have coordinates
|
|
y=start_line,
|
|
width=0,
|
|
height=len(rows),
|
|
confidence=0.7,
|
|
is_maintenance_table=False,
|
|
header_row=header_row,
|
|
raw_content=rows[1:],
|
|
)
|
|
|
|
# Determine if it's a maintenance table
|
|
table.is_maintenance_table = self.is_maintenance_table(table)
|
|
|
|
if table.is_maintenance_table:
|
|
table.confidence = 0.85
|
|
|
|
return table
|
|
|
|
def extract_table_text_from_region(
|
|
self, image_bytes: bytes, table: DetectedTable
|
|
) -> list[list[str]]:
|
|
"""
|
|
Extract text from a table region using OCR.
|
|
|
|
Args:
|
|
image_bytes: Full page image
|
|
table: Detected table with coordinates
|
|
|
|
Returns:
|
|
2D list of cell contents
|
|
"""
|
|
# This would use OCR on the cropped region
|
|
# For now, return empty - actual OCR will be done in manual_extractor
|
|
logger.debug(f"Table region: ({table.x}, {table.y}) {table.width}x{table.height}")
|
|
return []
|
|
|
|
|
|
# Singleton instance
|
|
table_detector = TableDetector()
|