Initial Commit
This commit is contained in:
117
mvp-platform-services/vehicles/etl/transformers/pattern_analyzer.py
Executable file
117
mvp-platform-services/vehicles/etl/transformers/pattern_analyzer.py
Executable file
@@ -0,0 +1,117 @@
|
||||
from typing import Dict, List, Set, Optional
|
||||
import re
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class PatternAnalyzer:
|
||||
"""Analyze VIN patterns to extract vehicle information"""
|
||||
|
||||
def __init__(self):
|
||||
self.pattern_cache = {}
|
||||
|
||||
def analyze_pattern_keys(self, keys: str) -> Dict:
|
||||
"""
|
||||
Analyze pattern keys format
|
||||
Example: "ABCDE|FGHIJKLM" means positions 4-8 and 10-17
|
||||
"""
|
||||
parts = keys.split('|') if '|' in keys else [keys]
|
||||
|
||||
analysis = {
|
||||
'vds': parts[0] if len(parts) > 0 else '', # Vehicle Descriptor Section
|
||||
'vis': parts[1] if len(parts) > 1 else '', # Vehicle Identifier Section
|
||||
'positions': []
|
||||
}
|
||||
|
||||
# Calculate actual VIN positions
|
||||
if analysis['vds']:
|
||||
analysis['positions'].extend(range(4, 4 + len(analysis['vds'])))
|
||||
if analysis['vis']:
|
||||
analysis['positions'].extend(range(10, 10 + len(analysis['vis'])))
|
||||
|
||||
return analysis
|
||||
|
||||
def extract_make_model_from_patterns(self, patterns: List[Dict]) -> Dict:
|
||||
"""
|
||||
Extract make/model combinations from pattern data
|
||||
"""
|
||||
vehicle_combinations = defaultdict(lambda: {
|
||||
'makes': set(),
|
||||
'models': set(),
|
||||
'trims': set(),
|
||||
'engines': set(),
|
||||
'transmissions': set(),
|
||||
'confidence': 0
|
||||
})
|
||||
|
||||
for pattern in patterns:
|
||||
key = (pattern.get('vin_schema_id'), pattern.get('keys'))
|
||||
element_id = pattern.get('element_id')
|
||||
attribute_id = pattern.get('attribute_id', '')
|
||||
weight = pattern.get('weight', 0)
|
||||
|
||||
# Map element IDs to vehicle attributes
|
||||
if element_id == 26: # Make
|
||||
vehicle_combinations[key]['makes'].add(attribute_id)
|
||||
elif element_id == 27: # Model
|
||||
vehicle_combinations[key]['models'].add(attribute_id)
|
||||
elif element_id == 28: # Trim
|
||||
vehicle_combinations[key]['trims'].add(attribute_id)
|
||||
elif element_id == 18: # Engine Model
|
||||
vehicle_combinations[key]['engines'].add(attribute_id)
|
||||
elif element_id == 24: # Transmission
|
||||
vehicle_combinations[key]['transmissions'].add(attribute_id)
|
||||
|
||||
vehicle_combinations[key]['confidence'] += weight
|
||||
|
||||
return dict(vehicle_combinations)
|
||||
|
||||
def match_vin_to_pattern(self, vin: str, pattern_keys: str) -> bool:
|
||||
"""
|
||||
Check if VIN matches pattern using SQL LIKE syntax
|
||||
Pattern uses '*' as wildcard, convert to regex
|
||||
"""
|
||||
if not vin or not pattern_keys:
|
||||
return False
|
||||
|
||||
# Extract VIN segments based on pattern format
|
||||
if '|' in pattern_keys:
|
||||
vds_pattern, vis_pattern = pattern_keys.split('|')
|
||||
|
||||
# Extract corresponding VIN segments
|
||||
if len(vin) >= 17:
|
||||
vds_actual = vin[3:3+len(vds_pattern)]
|
||||
vis_actual = vin[9:9+len(vis_pattern)]
|
||||
|
||||
# Convert SQL LIKE pattern to regex
|
||||
vds_regex = vds_pattern.replace('*', '.')
|
||||
vis_regex = vis_pattern.replace('*', '.')
|
||||
|
||||
return (re.match(vds_regex, vds_actual) is not None and
|
||||
re.match(vis_regex, vis_actual) is not None)
|
||||
else:
|
||||
# Single segment pattern (positions 4-8)
|
||||
if len(vin) >= 8:
|
||||
segment = vin[3:3+len(pattern_keys)]
|
||||
pattern_regex = pattern_keys.replace('*', '.')
|
||||
return re.match(pattern_regex, segment) is not None
|
||||
|
||||
return False
|
||||
|
||||
def calculate_pattern_confidence(self,
|
||||
pattern_matches: List[Dict],
|
||||
element_weights: Dict[int, int]) -> int:
|
||||
"""
|
||||
Calculate confidence score based on matched patterns and element weights
|
||||
"""
|
||||
total_weight = 0
|
||||
matched_elements = set()
|
||||
|
||||
for match in pattern_matches:
|
||||
element_id = match.get('element_id')
|
||||
if element_id not in matched_elements:
|
||||
matched_elements.add(element_id)
|
||||
total_weight += element_weights.get(element_id, 0)
|
||||
|
||||
return total_weight
|
||||
Reference in New Issue
Block a user