"""Table detection for maintenance schedule extraction.""" import io import logging import re from dataclasses import dataclass, field from typing import Optional import cv2 import numpy as np from PIL import Image logger = logging.getLogger(__name__) @dataclass class DetectedTable: """A detected table in a document.""" page_number: int x: int y: int width: int height: int confidence: float is_maintenance_table: bool header_row: Optional[list[str]] = None raw_content: list[list[str]] = field(default_factory=list) class TableDetector: """Detect tables in document pages. Uses computer vision techniques to identify table regions: 1. Line detection for bordered tables 2. Text alignment analysis for borderless tables 3. Header keyword matching for maintenance schedule identification """ # Keywords indicating maintenance schedule table headers MAINTENANCE_HEADERS = [ "service", "maintenance", "item", "operation", "miles", "mi", "km", "kilometers", "months", "mo", "interval", "check", "replace", "inspect", "change", "schedule", "frequency", ] # Keywords in content that indicate maintenance MAINTENANCE_CONTENT_KEYWORDS = [ "oil", "filter", "brake", "tire", "coolant", "fluid", "spark plug", "belt", "hose", "inspect", "replace", "change", "check", ] def detect_tables_in_image( self, image_bytes: bytes, page_number: int = 0 ) -> list[DetectedTable]: """ Detect tables in an image using line detection. Args: image_bytes: PNG/JPEG image bytes page_number: Page number for the result Returns: List of DetectedTable objects """ # Load image nparr = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE) if img is None: logger.warning("Failed to decode image for table detection") return [] # Apply threshold _, binary = cv2.threshold(img, 150, 255, cv2.THRESH_BINARY_INV) # Detect horizontal lines horizontal_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (40, 1)) horizontal_lines = cv2.morphologyEx( binary, cv2.MORPH_OPEN, horizontal_kernel, iterations=2 ) # Detect vertical lines vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 40)) vertical_lines = cv2.morphologyEx( binary, cv2.MORPH_OPEN, vertical_kernel, iterations=2 ) # Combine lines table_mask = cv2.add(horizontal_lines, vertical_lines) # Find contours contours, _ = cv2.findContours( table_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) tables = [] height, width = img.shape[:2] for contour in contours: x, y, w, h = cv2.boundingRect(contour) # Filter by size (tables should be reasonably large) if w < width * 0.3 or h < height * 0.05: continue if w > width * 0.95 and h > height * 0.95: continue # Skip full-page rectangles # Calculate confidence based on aspect ratio and size aspect_ratio = w / h if h > 0 else 0 size_ratio = (w * h) / (width * height) # Tables typically have reasonable aspect ratios if 0.5 <= aspect_ratio <= 10 and 0.01 <= size_ratio <= 0.8: confidence = min(0.9, 0.5 + size_ratio + (1 - abs(aspect_ratio - 2) / 10)) tables.append( DetectedTable( page_number=page_number, x=x, y=y, width=w, height=h, confidence=confidence, is_maintenance_table=False, # Will be determined later ) ) logger.debug(f"Detected {len(tables)} potential tables on page {page_number}") return tables def detect_tables_in_text( self, text: str, page_number: int = 0 ) -> list[DetectedTable]: """ Detect table-like structures in text using pattern analysis. Useful for native PDFs where text is available. Args: text: Extracted text content page_number: Page number Returns: List of DetectedTable with content populated """ tables = [] lines = text.split("\n") # Look for patterns that suggest tabular data # - Multiple columns separated by whitespace or tabs # - Consistent column alignment across rows current_table_lines: list[str] = [] in_table = False table_start_idx = 0 for i, line in enumerate(lines): # Check if line looks like table row is_table_row = self._is_table_row(line) if is_table_row: if not in_table: in_table = True table_start_idx = i current_table_lines = [] current_table_lines.append(line) else: if in_table and len(current_table_lines) >= 3: # End of table, process it table = self._process_text_table( current_table_lines, page_number, table_start_idx ) if table: tables.append(table) in_table = False current_table_lines = [] # Handle table at end of text if in_table and len(current_table_lines) >= 3: table = self._process_text_table( current_table_lines, page_number, table_start_idx ) if table: tables.append(table) return tables def is_maintenance_table( self, table: DetectedTable, full_text: Optional[str] = None ) -> bool: """ Determine if a detected table is a maintenance schedule. Args: table: Detected table to analyze full_text: Optional surrounding text for context Returns: True if likely a maintenance schedule table """ # Check header row for maintenance keywords if table.header_row: header_text = " ".join(table.header_row).lower() header_matches = sum( 1 for kw in self.MAINTENANCE_HEADERS if kw in header_text ) if header_matches >= 2: return True # Check content for maintenance keywords if table.raw_content: content_text = " ".join( " ".join(row) for row in table.raw_content ).lower() content_matches = sum( 1 for kw in self.MAINTENANCE_CONTENT_KEYWORDS if kw in content_text ) if content_matches >= 3: return True # Check surrounding text if full_text: text_lower = full_text.lower() context_keywords = [ "maintenance schedule", "service schedule", "maintenance interval", "recommended maintenance", ] if any(kw in text_lower for kw in context_keywords): return True return False def _is_table_row(self, line: str) -> bool: """Check if a line looks like a table row.""" # Skip empty lines stripped = line.strip() if not stripped: return False # Check for multiple whitespace-separated columns parts = re.split(r"\s{2,}|\t", stripped) if len(parts) >= 2: # At least 2 columns with content non_empty = [p for p in parts if p.strip()] return len(non_empty) >= 2 # Check for common table patterns # e.g., "Service Item 5,000 miles 6 months" if re.search(r"\d+[,.]?\d*\s*(miles?|mi\.?|km|months?|mo\.?)", stripped, re.I): return True return False def _process_text_table( self, lines: list[str], page_number: int, start_line: int ) -> Optional[DetectedTable]: """Process extracted text lines into a table structure.""" if not lines: return None # Parse rows rows = [] for line in lines: # Split on multiple whitespace or tabs parts = re.split(r"\s{2,}|\t", line.strip()) cells = [p.strip() for p in parts if p.strip()] if cells: rows.append(cells) if len(rows) < 2: return None # First row is likely header header_row = rows[0] # Check if this looks like a maintenance table table = DetectedTable( page_number=page_number, x=0, # Text tables don't have coordinates y=start_line, width=0, height=len(rows), confidence=0.7, is_maintenance_table=False, header_row=header_row, raw_content=rows[1:], ) # Determine if it's a maintenance table table.is_maintenance_table = self.is_maintenance_table(table) if table.is_maintenance_table: table.confidence = 0.85 return table def extract_table_text_from_region( self, image_bytes: bytes, table: DetectedTable ) -> list[list[str]]: """ Extract text from a table region using OCR. Args: image_bytes: Full page image table: Detected table with coordinates Returns: 2D list of cell contents """ # This would use Tesseract on the cropped region # For now, return empty - actual OCR will be done in manual_extractor logger.debug(f"Table region: ({table.x}, {table.y}) {table.width}x{table.height}") return [] # Singleton instance table_detector = TableDetector()