#!/usr/bin/env python3 """ ETL Script for Automotive Vehicle Selection Database Generates SQL import files for loading into PostgreSQL No database connection required - pure file-based processing """ import json import os import re from pathlib import Path from typing import Dict, List, Set, Tuple, Optional class VehicleSQLGenerator: def __init__(self): self.makes_filter_dir = Path('makes-filter') self.engines_data = [] self.automobiles_data = [] self.brands_data = [] # Year filter - only include vehicles 1980 or newer self.min_year = 1980 # In-memory caches for fast lookups self.engine_cache = {} # Key: (displacement, config) -> engine record self.transmission_cache = {} # Key: (type, speeds, drive) -> transmission record self.vehicle_records = [] self.brand_name_map = {} # Key: lowercase_slug -> canonical brand name # Output SQL files self.engines_sql_file = 'output/01_engines.sql' self.transmissions_sql_file = 'output/02_transmissions.sql' self.vehicles_sql_file = 'output/03_vehicle_options.sql' def load_json_files(self): """Load the large JSON data files""" print("\nšŸ“‚ Loading source JSON files...") print(" Loading engines.json...") with open('engines.json', 'r', encoding='utf-8') as f: self.engines_data = json.load(f) print(f" āœ“ Loaded {len(self.engines_data)} engine records") print(" Loading automobiles.json...") with open('automobiles.json', 'r', encoding='utf-8') as f: self.automobiles_data = json.load(f) print(f" āœ“ Loaded {len(self.automobiles_data)} automobile records") print(" Loading brands.json...") with open('brands.json', 'r', encoding='utf-8') as f: self.brands_data = json.load(f) print(f" āœ“ Loaded {len(self.brands_data)} brand records") # Build brand name mapping for proper casing self.build_brand_name_map() def build_brand_name_map(self): """Build a mapping from lowercase brand names to Title Case brand names""" # Acronyms and special cases that should stay uppercase keep_uppercase = {'BMW', 'GMC', 'AC', 'MG', 'KIA', 'MINI', 'FIAT', 'RAM', 'KTM', 'FSO', 'ARO', 'TVR', 'NIO'} special_cases = { 'delorean': 'DeLorean', 'mclaren': 'McLaren' } for brand in self.brands_data: brand_name_upper = brand.get('name', '').strip() if not brand_name_upper: continue # Create slug from uppercase name slug = brand_name_upper.lower().replace(' ', '_') # Determine canonical name based on special cases if slug in special_cases: canonical_name = special_cases[slug] elif brand_name_upper in keep_uppercase: canonical_name = brand_name_upper else: # Convert to Title Case canonical_name = brand_name_upper.title() # Store both slug and space-separated versions as keys self.brand_name_map[slug] = canonical_name self.brand_name_map[brand_name_upper.lower()] = canonical_name def get_canonical_make_name(self, make_slug: str) -> str: """Get the canonical brand name from a filename slug or lowercase name""" slug_lower = make_slug.lower().strip() # Try direct lookup if slug_lower in self.brand_name_map: return self.brand_name_map[slug_lower] # Try with underscores converted to spaces slug_spaced = slug_lower.replace('_', ' ') if slug_spaced in self.brand_name_map: return self.brand_name_map[slug_spaced] # Fallback: title case (shouldn't reach here if brand mapping is complete) return slug_spaced.title() def format_model_name(self, model_slug: str) -> str: """ Format model name from slug to human-readable format. Examples: "bronco_sport" -> "Bronco Sport" "f-150" -> "F-150" "expedition_max" -> "Expedition Max" "sierra_1500" -> "Sierra 1500" """ if not model_slug: return '' # Replace underscores with spaces model_name = model_slug.replace('_', ' ') # Apply Title Case model_name = model_name.title() return model_name.strip() def normalize_displacement(self, disp_str: str) -> Optional[str]: """Normalize displacement to L format (e.g., '3506 Cm3' -> '3.5L', '3.5L' -> '3.5L')""" if not disp_str: return None disp_str = disp_str.strip() # Check if already in L format if disp_str.upper().endswith('L'): # Extract numeric part and normalize match = re.search(r'(\d+\.?\d*)', disp_str) if match: liters = float(match.group(1)) return f"{liters:.1F}L" return None # Check if in Cm3 format cm3_match = re.search(r'(\d+)\s*Cm3', disp_str, re.IGNORECASE) if cm3_match: cm3 = int(cm3_match.group(1)) liters = cm3 / 1000.0 return f"{liters:.1F}L" return None def parse_engine_string(self, engine_str: str) -> Tuple[Optional[str], Optional[str]]: """Parse engine string like '2.0L I4' into displacement and configuration""" pattern = r'(\d+\.?\d*L?)\s*([IVL]\d+|[A-Z]+\d*)' match = re.search(pattern, engine_str, re.IGNORECASE) if match: displacement = match.group(1).upper() if not displacement.endswith('L'): displacement += 'L' configuration = match.group(2).upper() return (displacement, configuration) return (None, None) def extract_engine_specs(self, engine_record: Dict) -> Dict: """Extract relevant specs from engine JSON record""" specs = engine_record.get('specs', {}) engine_specs = specs.get('Engine Specs', {}) trans_specs = specs.get('Transmission Specs', {}) return { 'name': engine_record.get('name', ''), 'displacement': engine_specs.get('Displacement:', ''), 'configuration': engine_specs.get('Cylinders:', ''), 'horsepower': engine_specs.get('Power:', ''), 'torque': engine_specs.get('Torque:', ''), 'fuel_type': engine_specs.get('Fuel:', ''), 'fuel_system': engine_specs.get('Fuel System:', ''), 'aspiration': engine_specs.get('Aspiration:', ''), 'transmission_type': trans_specs.get('Gearbox:', ''), 'drive_type': trans_specs.get('Drive Type:', ''), 'specs_json': specs } def format_engine_display(self, specs: Dict) -> str: """Format engine display string like 'V8 3.5L Turbo' or 'I4 2.0L Supercharged'""" parts = [] # Configuration (V8, I4, L6, etc.) config = specs.get('configuration', '').strip() if config: parts.append(config) # Displacement in L format disp = self.normalize_displacement(specs.get('displacement', '')) # If displacement not in specs, try to extract from name if not disp: name = specs.get('name', '') disp_match = re.search(r'(\d+\.?\d*)\s*L', name, re.IGNORECASE) if disp_match: disp = f"{float(disp_match.group(1)):.1f}L" if disp: parts.append(disp) # Aspiration (Turbo, Supercharged) - check both aspiration and fuel_system fields aspiration = specs.get('aspiration', '').strip() fuel_system = specs.get('fuel_system', '').strip() combined_text = f"{aspiration} {fuel_system}".lower() if combined_text and 'naturally aspirated' not in combined_text: if 'turbo' in combined_text: parts.append('Turbo') elif 'supercharg' in combined_text: parts.append('Supercharged') return ' '.join(parts) if parts else 'Unknown' def format_transmission_display(self, trans_type: str, speeds: Optional[str]) -> str: """Format transmission display string like '8-Speed Automatic' or '6-Speed Manual'""" parts = [] trans_type_clean = trans_type.strip() if trans_type else '' # Extract speed if provided if speeds: parts.append(f"{speeds}-Speed") elif trans_type_clean: # Try to extract speed from transmission type string speed_match = re.search(r'(\d+)[- ]?[Ss]peed', trans_type_clean) if speed_match: parts.append(f"{speed_match.group(1)}-Speed") # Determine transmission type if trans_type_clean: trans_lower = trans_type_clean.lower() if 'manual' in trans_lower: parts.append('Manual') elif 'automatic' in trans_lower or 'auto' in trans_lower: parts.append('Automatic') elif 'cvt' in trans_lower: # CVT doesn't have speeds return 'CVT' elif 'direct' in trans_lower or 'dct' in trans_lower: parts.append('Dual-Clutch') else: parts.append('Automatic') # Default assumption return ' '.join(parts) if parts else 'Unknown' def sql_escape(self, value): """Escape values for SQL with proper handling of special characters""" if value is None: return 'NULL' if isinstance(value, (int, float)): return str(value) if isinstance(value, dict): # Convert dict to JSON string and escape json_str = json.dumps(value) # Escape single quotes, newlines, carriage returns, and backslashes for PostgreSQL escaped = json_str.replace('\\', '\\\\').replace("'", "''").replace('\n', '\\n').replace('\r', '\\r') return "'" + escaped + "'" # Convert to string str_value = str(value).strip() # Empty strings should be NULL if not str_value: return 'NULL' # String - escape single quotes, newlines, carriage returns, and backslashes escaped = str_value.replace('\\', '\\\\').replace("'", "''").replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t') return "'" + escaped + "'" def generate_engines_sql(self): """Generate SQL file for engines and transmissions""" print("\nāš™ļø Generating engine and transmission SQL...") os.makedirs('output', exist_ok=True) # Process engines engines_insert_values = [] transmissions_set = set() engine_id = 1 for engine_record in self.engines_data: specs = self.extract_engine_specs(engine_record) # Create simplified display name display_name = self.format_engine_display(specs) # Only store ID and name values = ( engine_id, self.sql_escape(display_name) ) engines_insert_values.append(f"({','.join(map(str, values))})") # Build engine cache with normalized displacement if specs['displacement'] and specs['configuration']: disp_norm = self.normalize_displacement(specs['displacement']) config_norm = specs['configuration'].upper().strip() if disp_norm and config_norm: key = (disp_norm, config_norm) if key not in self.engine_cache: self.engine_cache[key] = engine_id # Extract transmission if specs['transmission_type'] or specs['drive_type']: speeds = None if specs['transmission_type']: speed_match = re.search(r'(\d+)', specs['transmission_type']) if speed_match: speeds = speed_match.group(1) trans_tuple = ( specs['transmission_type'] or 'Unknown', speeds, specs['drive_type'] or 'Unknown' ) transmissions_set.add(trans_tuple) engine_id += 1 # Write engines SQL file print(f" Writing {len(engines_insert_values)} engines to SQL file...") with open(self.engines_sql_file, 'w', encoding='utf-8') as f: f.write("-- Engines data import\n") f.write("-- Generated by ETL script\n\n") f.write("BEGIN;\n\n") # Write in batches of 500 for better performance batch_size = 500 for i in range(0, len(engines_insert_values), batch_size): batch = engines_insert_values[i:i+batch_size] f.write("INSERT INTO engines (id, name) VALUES\n") f.write(",\n".join(batch)) f.write(";\n\n") # Reset sequence f.write(f"SELECT setval('engines_id_seq', {engine_id});\n\n") f.write("COMMIT;\n") print(f" āœ“ Wrote engines SQL to {self.engines_sql_file}") # Write transmissions SQL file print(f" Writing {len(transmissions_set)} transmissions to SQL file...") trans_id = 1 with open(self.transmissions_sql_file, 'w', encoding='utf-8') as f: f.write("-- Transmissions data import\n") f.write("-- Generated by ETL script\n\n") f.write("BEGIN;\n\n") f.write("INSERT INTO transmissions (id, type) VALUES\n") trans_values = [] for trans_type, speeds, drive_type in sorted(transmissions_set): # Create simplified display name for transmission display_name = self.format_transmission_display(trans_type, speeds) # Only store ID and type values = ( trans_id, self.sql_escape(display_name) ) trans_values.append(f"({','.join(map(str, values))})") # Build transmission cache key = (trans_type, speeds, drive_type) self.transmission_cache[key] = trans_id trans_id += 1 f.write(",\n".join(trans_values)) f.write(";\n\n") f.write(f"SELECT setval('transmissions_id_seq', {trans_id});\n\n") f.write("COMMIT;\n") print(f" āœ“ Wrote transmissions SQL to {self.transmissions_sql_file}") print(f" āœ“ Built engine cache with {len(self.engine_cache)} combinations") def find_matching_engine_id(self, engine_str: str) -> Optional[int]: """Find engine_id from cache based on engine string""" disp, config = self.parse_engine_string(engine_str) if disp and config: # Normalize displacement to match cache format disp_norm = self.normalize_displacement(disp) if not disp_norm: return None key = (disp_norm, config) if key in self.engine_cache: return self.engine_cache[key] # Try normalized variations for configuration for cached_key, engine_id in self.engine_cache.items(): cached_disp, cached_config = cached_key if cached_disp == disp_norm and self.config_matches(config, cached_config): return engine_id return None def config_matches(self, config1: str, config2: str) -> bool: """Check if two engine configurations match""" c1 = config1.upper().replace('-', '').replace(' ', '') c2 = config2.upper().replace('-', '').replace(' ', '') if c1 == c2: return True if c1.replace('I', 'L') == c2.replace('I', 'L'): return True if 'INLINE' in c1 or 'INLINE' in c2: c1_num = re.search(r'\d+', c1) c2_num = re.search(r'\d+', c2) if c1_num and c2_num and c1_num.group() == c2_num.group(): return True return False def find_transmission_id_for_engine(self, engine_id: int) -> Optional[int]: """Find transmission_id for a given engine_id by looking up the engine's transmission specs""" if engine_id < 1 or engine_id > len(self.engines_data): return None # Engine IDs are 1-indexed, list is 0-indexed engine_record = self.engines_data[engine_id - 1] specs = self.extract_engine_specs(engine_record) # Extract transmission details trans_type = specs.get('transmission_type', '') or 'Unknown' drive_type = specs.get('drive_type', '') or 'Unknown' # Extract speeds from transmission type speeds = None if trans_type: speed_match = re.search(r'(\d+)', trans_type) if speed_match: speeds = speed_match.group(1) # Look up in transmission cache trans_key = (trans_type, speeds, drive_type) return self.transmission_cache.get(trans_key) def process_makes_filter(self): """Process all makes-filter JSON files and build vehicle records""" print(f"\nšŸš— Processing makes-filter JSON files (filtering for {self.min_year}+)...") json_files = list(self.makes_filter_dir.glob('*.json')) print(f" Found {len(json_files)} make files to process") total_records = 0 filtered_records = 0 for json_file in sorted(json_files): make_name = self.get_canonical_make_name(json_file.stem) print(f" Processing {make_name}...") with open(json_file, 'r', encoding='utf-8') as f: make_data = json.load(f) for brand_key, year_entries in make_data.items(): for year_entry in year_entries: year = int(year_entry.get('year', 0)) if year == 0: continue # Filter out vehicles older than min_year if year < self.min_year: filtered_records += 1 continue models = year_entry.get('models', []) for model in models: model_name = self.format_model_name(model.get('name', '')) engines = model.get('engines', []) submodels = model.get('submodels', []) if not submodels: submodels = ['Base'] for trim in submodels: for engine_str in engines: engine_id = self.find_matching_engine_id(engine_str) # Link transmission if engine was matched transmission_id = None if engine_id: transmission_id = self.find_transmission_id_for_engine(engine_id) self.vehicle_records.append({ 'year': year, 'make': make_name, 'model': model_name, 'trim': trim, 'engine_id': engine_id, 'transmission_id': transmission_id }) total_records += 1 print(f" āœ“ Processed {total_records} vehicle configuration records") print(f" āœ“ Filtered out {filtered_records} records older than {self.min_year}") def hybrid_backfill(self): """Hybrid backfill for recent years from automobiles.json""" print("\nšŸ”„ Performing hybrid backfill for recent years...") existing_combos = set() for record in self.vehicle_records: key = (record['year'], record['make'].lower(), record['model'].lower()) existing_combos.add(key) brand_map = {} for brand in self.brands_data: brand_id = brand.get('id') brand_name = brand.get('name', '').lower() brand_map[brand_id] = brand_name filtered_makes = set() for json_file in self.makes_filter_dir.glob('*.json'): make_name = json_file.stem.replace('_', ' ').lower() filtered_makes.add(make_name) backfill_count = 0 recent_years = [2023, 2024, 2025] for auto in self.automobiles_data: brand_id = auto.get('brand_id') brand_name = brand_map.get(brand_id, '').lower() if brand_name not in filtered_makes: continue auto_name = auto.get('name', '') year_match = re.search(r'(202[3-5])', auto_name) if not year_match: continue year = int(year_match.group(1)) if year not in recent_years: continue # Apply year filter to backfill as well if year < self.min_year: continue model_name = auto_name for remove_str in [str(year), brand_name]: model_name = model_name.replace(remove_str, '') model_name = model_name.strip() model_name = self.format_model_name(model_name) key = (year, brand_name, model_name.lower()) if key in existing_combos: continue auto_id = auto.get('id') matching_engines = [e for e in self.engines_data if e.get('automobile_id') == auto_id] if not matching_engines: continue for engine_record in matching_engines[:3]: specs = self.extract_engine_specs(engine_record) engine_id = None if specs['displacement'] and specs['configuration']: disp_norm = self.normalize_displacement(specs['displacement']) config_norm = specs['configuration'].upper().strip() if disp_norm and config_norm: key = (disp_norm, config_norm) engine_id = self.engine_cache.get(key) # Link transmission if engine was matched transmission_id = None if engine_id: transmission_id = self.find_transmission_id_for_engine(engine_id) self.vehicle_records.append({ 'year': year, 'make': self.get_canonical_make_name(brand_name), 'model': model_name, 'trim': 'Base', 'engine_id': engine_id, 'transmission_id': transmission_id }) backfill_count += 1 existing_combos.add((year, brand_name, model_name.lower())) print(f" āœ“ Backfilled {backfill_count} recent vehicle configurations") def generate_vehicles_sql(self): """Generate SQL file for vehicle_options""" print("\nšŸ“ Generating vehicle options SQL...") with open(self.vehicles_sql_file, 'w', encoding='utf-8') as f: f.write("-- Vehicle options data import\n") f.write("-- Generated by ETL script\n\n") f.write("BEGIN;\n\n") # Write in batches of 1000 batch_size = 1000 total_batches = (len(self.vehicle_records) + batch_size - 1) // batch_size for batch_num in range(total_batches): start_idx = batch_num * batch_size end_idx = min(start_idx + batch_size, len(self.vehicle_records)) batch = self.vehicle_records[start_idx:end_idx] f.write("INSERT INTO vehicle_options (year, make, model, trim, engine_id, transmission_id) VALUES\n") values_list = [] for record in batch: values = ( record['year'], self.sql_escape(record['make']), self.sql_escape(record['model']), self.sql_escape(record['trim']), record['engine_id'] if record['engine_id'] else 'NULL', record['transmission_id'] if record['transmission_id'] else 'NULL' ) values_list.append(f"({','.join(map(str, values))})") f.write(",\n".join(values_list)) f.write(";\n\n") print(f" Batch {batch_num + 1}/{total_batches} written ({len(batch)} records)") f.write("COMMIT;\n") print(f" āœ“ Wrote {len(self.vehicle_records)} vehicle options to {self.vehicles_sql_file}") def generate_stats(self): """Generate statistics file""" print("\nšŸ“Š Generating statistics...") stats = { 'total_engines': len(self.engines_data), 'total_transmissions': len(self.transmission_cache), 'total_vehicles': len(self.vehicle_records), 'unique_years': len(set(r['year'] for r in self.vehicle_records)), 'unique_makes': len(set(r['make'] for r in self.vehicle_records)), 'unique_models': len(set(r['model'] for r in self.vehicle_records)), 'year_range': f"{min(r['year'] for r in self.vehicle_records)}-{max(r['year'] for r in self.vehicle_records)}" } with open('output/stats.txt', 'w') as f: f.write("=" * 60 + "\n") f.write("ETL Statistics\n") f.write("=" * 60 + "\n\n") for key, value in stats.items(): formatted_value = f"{value:,}" if isinstance(value, int) else value f.write(f"{key.replace('_', ' ').title()}: {formatted_value}\n") print("\nšŸ“Š Statistics:") for key, value in stats.items(): formatted_value = f"{value:,}" if isinstance(value, int) else value print(f" {key.replace('_', ' ').title()}: {formatted_value}") def run(self): """Execute the complete ETL pipeline""" try: print("=" * 60) print("šŸš€ Automotive Vehicle ETL - SQL Generator") print(f" Year Filter: {self.min_year} and newer") print("=" * 60) self.load_json_files() self.generate_engines_sql() self.process_makes_filter() self.hybrid_backfill() self.generate_vehicles_sql() self.generate_stats() print("\n" + "=" * 60) print("āœ… SQL Files Generated Successfully!") print("=" * 60) print("\nGenerated files:") print(f" - {self.engines_sql_file}") print(f" - {self.transmissions_sql_file}") print(f" - {self.vehicles_sql_file}") print(f" - output/stats.txt") print("\nNext step: Import SQL files into database") print(" cat output/*.sql | docker exec -i mvp-postgres psql -U postgres -d motovaultpro") except Exception as e: print(f"\nāŒ ETL Pipeline Failed: {e}") import traceback traceback.print_exc() raise if __name__ == '__main__': etl = VehicleSQLGenerator() etl.run()