Files
motovaultpro/mvp-platform-services/vehicles/etl/builders/normalized_vehicle_builder.py
Eric Gullickson a052040e3a Initial Commit
2025-09-17 16:09:15 -05:00

376 lines
17 KiB
Python

import logging
from typing import Dict, List, Set, Optional
from datetime import datetime
from dateutil import tz
from tqdm import tqdm
from ..connections import db_connections
from ..extractors.mssql_extractor import MSSQLExtractor
from ..loaders.postgres_loader import PostgreSQLLoader
from ..config import config
from ..utils.make_filter import MakeFilter
logger = logging.getLogger(__name__)
class NormalizedVehicleBuilder:
"""Build normalized vehicle schema from pattern-based NHTSA source data"""
def __init__(self, make_filter: Optional[MakeFilter] = None):
self.make_filter = make_filter or MakeFilter()
self.extractor = MSSQLExtractor(self.make_filter)
self.loader = PostgreSQLLoader()
logger.info(
f"Initialized normalized vehicle builder with make filtering: {len(self.make_filter.get_allowed_makes())} allowed makes"
)
def build(self):
"""Main normalized vehicle schema building process"""
logger.info("Starting normalized vehicle schema build")
try:
# Step 1: Clear and load reference tables
logger.info("Step 1: Loading reference tables (makes, models, relationships)")
self._load_reference_tables()
# Step 2: Extract year availability from WMI data
logger.info("Step 2: Building model-year availability from WMI data")
self._build_model_year_availability()
# Step 3: Extract trims and engines from pattern analysis
logger.info("Step 3: Extracting trims and engines from pattern data")
self._extract_trims_and_engines()
logger.info("Normalized vehicle schema build completed successfully")
return True
except Exception as e:
logger.error(f"Normalized schema build failed: {e}")
raise e
def _load_reference_tables(self):
"""Load basic reference tables: makes, models with proper relationships"""
# Load makes (filtered by make_filter)
makes_data = self.extractor.extract_reference_table('Make')
if makes_data:
self.loader.load_reference_table('make', makes_data)
logger.info(f"Loaded {len(makes_data)} makes")
# Get make-model relationships first
make_model_rels = self.extractor.extract_make_model_relationships()
# Load models with make_id populated from relationships
models_data = self.extractor.extract_reference_table('Model')
if models_data and make_model_rels:
# Create mapping: model_id -> make_id
model_to_make = {}
for rel in make_model_rels:
model_to_make[rel['ModelId']] = rel['MakeId']
# Add make_id to each model record
for model in models_data:
model['MakeId'] = model_to_make.get(model['Id'])
# Filter out models without make_id (orphaned models)
valid_models = [m for m in models_data if m.get('MakeId') is not None]
self.loader.load_reference_table('model', valid_models)
logger.info(f"Loaded {len(valid_models)} models with make relationships")
logger.info(f"Filtered out {len(models_data) - len(valid_models)} orphaned models")
else:
logger.warning("No models or relationships loaded")
def _build_model_year_availability(self):
"""Build model-year availability from WMI year ranges with realistic constraints"""
logger.info("Extracting model-year availability from WMI data with realistic year bounds")
# Define realistic year constraints
current_year = datetime.now().year
max_year = current_year + 1 # Allow next model year
min_year = current_year - 40 # Reasonable historical range (40 years back)
logger.info(f"Using realistic year range: {min_year} to {max_year}")
# Get WMI data with year ranges
wmi_data = self.extractor.extract_wmi_vin_schema_mappings()
# Get make-model relationships to map WMI to models
make_model_rels = self.extractor.extract_make_model_relationships()
wmi_make_rels = self.extractor.extract_wmi_make_relationships()
# Build mapping: WMI -> Make -> Models
wmi_to_models = {}
make_to_models = {}
# Build make -> models mapping
for rel in make_model_rels:
make_id = rel['MakeId']
if make_id not in make_to_models:
make_to_models[make_id] = []
make_to_models[make_id].append(rel['ModelId'])
# Build WMI -> models mapping via makes
for wmi_make in wmi_make_rels:
wmi_id = wmi_make['WmiId']
make_id = wmi_make['MakeId']
if make_id in make_to_models:
if wmi_id not in wmi_to_models:
wmi_to_models[wmi_id] = []
wmi_to_models[wmi_id].extend(make_to_models[make_id])
# Extremely conservative approach: Only allow models with explicit recent year ranges
logger.info("Building model-year availability - using only models with EXPLICIT recent VIN pattern evidence")
model_years = []
current_year = datetime.now().year
# Strategy: Only include models that have VIN patterns with explicit recent year ranges (not open-ended)
recent_threshold = current_year - 5 # Only patterns from last 5 years
# Find models that have EXPLICIT recent VIN pattern evidence (both YearFrom and YearTo defined)
recent_models_with_years = {} # model_id -> set of years with evidence
for wmi_mapping in wmi_data:
year_from = wmi_mapping['YearFrom']
year_to = wmi_mapping['YearTo']
# Skip patterns without explicit year ranges (YearTo=None means open-ended, likely old discontinued models)
if year_from is None or year_to is None:
continue
# Only consider WMI patterns that have recent, explicit activity
if year_to >= recent_threshold and year_from <= current_year + 1:
wmi_id = wmi_mapping['WmiId']
if wmi_id in wmi_to_models:
models = wmi_to_models[wmi_id]
for model_id in models:
if model_id not in recent_models_with_years:
recent_models_with_years[model_id] = set()
# Add the actual years with evidence (constrained to reasonable range)
evidence_start = max(year_from, recent_threshold)
evidence_end = min(year_to, current_year + 1)
for year in range(evidence_start, evidence_end + 1):
recent_models_with_years[model_id].add(year)
logger.info(f"Found {len(recent_models_with_years)} models with explicit recent VIN pattern evidence (patterns with defined year ranges since {recent_threshold})")
# Create model-year combinations only for years with actual VIN pattern evidence
# Apply business rules to exclude historically discontinued models
discontinued_models = self._get_discontinued_models()
for model_id, years_with_evidence in recent_models_with_years.items():
# Check if this model is in our discontinued list
if model_id in discontinued_models:
max_year = discontinued_models[model_id]
logger.info(f"Applying discontinuation rule: Model ID {model_id} discontinued after {max_year}")
# Only include years up to discontinuation year
years_with_evidence = {y for y in years_with_evidence if y <= max_year}
for year in years_with_evidence:
model_years.append({
'model_id': model_id,
'year': year
})
logger.info(f"Created {len(model_years)} model-year combinations based on explicit VIN pattern evidence")
# Remove duplicates
unique_model_years = []
seen = set()
for my in model_years:
key = (my['model_id'], my['year'])
if key not in seen:
seen.add(key)
unique_model_years.append(my)
# Load to database
if unique_model_years:
self.loader.load_model_years(unique_model_years)
logger.info(f"Generated {len(unique_model_years)} model-year availability records")
def _extract_trims_and_engines(self):
"""Extract trims and engines from pattern analysis"""
logger.info("Extracting trims and engines from pattern data")
# Get model-year IDs for mapping
model_year_mapping = self._get_model_year_mapping()
trims_data = []
engines_data = []
engine_names = set()
# Process patterns in batches
total_trims = 0
total_engines = 0
for pattern_batch in self.extractor.extract_patterns_data():
logger.info(f"Processing pattern batch: {len(pattern_batch)} patterns")
# Group patterns by (year, make, model) combination
vehicle_combinations = {}
for pattern in pattern_batch:
element_id = pattern['ElementId']
attribute_id = pattern.get('AttributeId', '')
make_name = pattern.get('MakeName', '')
# Skip if not allowed make
if not self.make_filter.is_make_allowed(make_name):
continue
# Create vehicle combination key
# We'll derive year from WMI data associated with this pattern
vin_schema_id = pattern['VinSchemaId']
key = (vin_schema_id, make_name)
if key not in vehicle_combinations:
vehicle_combinations[key] = {
'make_name': make_name,
'vin_schema_id': vin_schema_id,
'trims': set(),
'engines': set()
}
# Extract trim and engine data
if element_id == 28 and attribute_id: # Trim
vehicle_combinations[key]['trims'].add(attribute_id)
elif element_id == 18 and attribute_id: # Engine
vehicle_combinations[key]['engines'].add(attribute_id)
# Convert to trim/engine records
for combo in vehicle_combinations.values():
make_name = combo['make_name']
# For now, create generic records
# In a full implementation, you'd map these to specific model-years
for trim_name in combo['trims']:
if trim_name and len(trim_name.strip()) > 0:
# We'll need to associate these with specific model_year_ids
# For now, create a placeholder structure
trims_data.append({
'name': trim_name.strip(),
'make_name': make_name, # temporary for mapping
'source_schema': combo['vin_schema_id']
})
total_trims += 1
for engine_name in combo['engines']:
if engine_name and len(engine_name.strip()) > 0 and engine_name not in engine_names:
engine_names.add(engine_name)
engines_data.append({
'name': engine_name.strip(),
'code': None,
'displacement_l': None,
'cylinders': None,
'fuel_type': None,
'aspiration': None
})
total_engines += 1
# Load engines first (they're independent)
if engines_data:
self.loader.load_engines(engines_data)
logger.info(f"Loaded {total_engines} unique engines")
# For trims, we need to map them to actual model_year records
# This is a simplified approach - in practice you'd need more sophisticated mapping
if trims_data:
simplified_trims = self._map_trims_to_model_years(trims_data, model_year_mapping)
if simplified_trims:
self.loader.load_trims(simplified_trims)
logger.info(f"Loaded {len(simplified_trims)} trims")
def _get_model_year_mapping(self) -> Dict:
"""Get mapping of model_year records for trim association"""
with db_connections.postgres_connection() as conn:
cursor = conn.cursor()
query = """
SELECT my.id, my.model_id, my.year, m.name as model_name, mk.name as make_name
FROM vehicles.model_year my
JOIN vehicles.model m ON my.model_id = m.id
JOIN vehicles.make mk ON m.make_id = mk.id
"""
cursor.execute(query)
rows = cursor.fetchall()
mapping = {}
for row in rows:
key = (row['make_name'] if isinstance(row, dict) else row[4],
row['year'] if isinstance(row, dict) else row[2])
mapping[key] = row['id'] if isinstance(row, dict) else row[0]
return mapping
def _map_trims_to_model_years(self, trims_data: List[Dict], model_year_mapping: Dict) -> List[Dict]:
"""Map extracted trims to actual model_year records"""
mapped_trims = []
# For now, create a simplified mapping
# Associate trims with all model_years of the same make
for trim in trims_data:
make_name = trim['make_name']
trim_name = trim['name']
# Find all model_year_ids for this make
model_year_ids = []
for (mapped_make, year), model_year_id in model_year_mapping.items():
if mapped_make == make_name:
model_year_ids.append(model_year_id)
# Create trim record for each model_year (simplified approach)
# In practice, you'd need more sophisticated pattern-to-vehicle mapping
for model_year_id in model_year_ids[:5]: # Limit to avoid explosion
mapped_trims.append({
'model_year_id': model_year_id,
'name': trim_name
})
return mapped_trims
def _get_discontinued_models(self) -> Dict[int, int]:
"""Get mapping of discontinued model IDs to their last production year
This method identifies models that were historically discontinued
and should not appear in recent model year combinations.
"""
with db_connections.postgres_connection() as conn:
cursor = conn.cursor()
# Query for specific discontinued models by name patterns
# These are well-known discontinued models that should not appear in recent years
discontinued_patterns = [
('Jimmy%', 1991), # GMC Jimmy discontinued 1991
('S-10%', 2004), # Chevrolet S-10 discontinued 2004
('Blazer%', 2005), # Chevrolet Blazer discontinued 2005 (before recent revival)
('Astro%', 2005), # Chevrolet Astro discontinued 2005
('Safari%', 2005), # GMC Safari discontinued 2005
('Jimmy Utility%', 1991), # GMC Jimmy Utility discontinued 1991
]
discontinued_models = {}
for pattern, last_year in discontinued_patterns:
query = """
SELECT m.id, m.name, mk.name as make_name
FROM vehicles.model m
JOIN vehicles.make mk ON m.make_id = mk.id
WHERE m.name ILIKE %s
AND mk.name IN ('Chevrolet', 'GMC')
"""
cursor.execute(query, (pattern,))
rows = cursor.fetchall()
for row in rows:
model_id = row['id'] if isinstance(row, dict) else row[0]
model_name = row['name'] if isinstance(row, dict) else row[1]
make_name = row['make_name'] if isinstance(row, dict) else row[2]
discontinued_models[model_id] = last_year
logger.info(f"Marked {make_name} {model_name} (ID: {model_id}) as discontinued after {last_year}")
return discontinued_models