690 lines
27 KiB
Python
Executable File
690 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ETL Script for Automotive Vehicle Selection Database
|
|
Generates SQL import files for loading into PostgreSQL
|
|
No database connection required - pure file-based processing
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Set, Tuple, Optional
|
|
|
|
class VehicleSQLGenerator:
|
|
def __init__(self):
|
|
self.makes_filter_dir = Path('makes-filter')
|
|
self.engines_data = []
|
|
self.automobiles_data = []
|
|
self.brands_data = []
|
|
|
|
# Year filter - only include vehicles 1980 or newer
|
|
self.min_year = 1980
|
|
|
|
# In-memory caches for fast lookups
|
|
self.engine_cache = {} # Key: (displacement, config) -> engine record
|
|
self.transmission_cache = {} # Key: (type, speeds, drive) -> transmission record
|
|
self.vehicle_records = []
|
|
self.brand_name_map = {} # Key: lowercase_slug -> canonical brand name
|
|
|
|
# Output SQL files
|
|
self.engines_sql_file = 'output/01_engines.sql'
|
|
self.transmissions_sql_file = 'output/02_transmissions.sql'
|
|
self.vehicles_sql_file = 'output/03_vehicle_options.sql'
|
|
|
|
def load_json_files(self):
|
|
"""Load the large JSON data files"""
|
|
print("\n📂 Loading source JSON files...")
|
|
|
|
print(" Loading engines.json...")
|
|
with open('engines.json', 'r', encoding='utf-8') as f:
|
|
self.engines_data = json.load(f)
|
|
print(f" ✓ Loaded {len(self.engines_data)} engine records")
|
|
|
|
print(" Loading automobiles.json...")
|
|
with open('automobiles.json', 'r', encoding='utf-8') as f:
|
|
self.automobiles_data = json.load(f)
|
|
print(f" ✓ Loaded {len(self.automobiles_data)} automobile records")
|
|
|
|
print(" Loading brands.json...")
|
|
with open('brands.json', 'r', encoding='utf-8') as f:
|
|
self.brands_data = json.load(f)
|
|
print(f" ✓ Loaded {len(self.brands_data)} brand records")
|
|
|
|
# Build brand name mapping for proper casing
|
|
self.build_brand_name_map()
|
|
|
|
def build_brand_name_map(self):
|
|
"""Build a mapping from lowercase brand names to Title Case brand names"""
|
|
# Acronyms and special cases that should stay uppercase
|
|
keep_uppercase = {'BMW', 'GMC', 'AC', 'MG', 'KIA', 'MINI', 'FIAT', 'RAM', 'KTM', 'FSO', 'ARO', 'TVR', 'NIO'}
|
|
special_cases = {
|
|
'delorean': 'DeLorean',
|
|
'mclaren': 'McLaren'
|
|
}
|
|
|
|
for brand in self.brands_data:
|
|
brand_name_upper = brand.get('name', '').strip()
|
|
if not brand_name_upper:
|
|
continue
|
|
|
|
# Create slug from uppercase name
|
|
slug = brand_name_upper.lower().replace(' ', '_')
|
|
|
|
# Determine canonical name based on special cases
|
|
if slug in special_cases:
|
|
canonical_name = special_cases[slug]
|
|
elif brand_name_upper in keep_uppercase:
|
|
canonical_name = brand_name_upper
|
|
else:
|
|
# Convert to Title Case
|
|
canonical_name = brand_name_upper.title()
|
|
|
|
# Store both slug and space-separated versions as keys
|
|
self.brand_name_map[slug] = canonical_name
|
|
self.brand_name_map[brand_name_upper.lower()] = canonical_name
|
|
|
|
def get_canonical_make_name(self, make_slug: str) -> str:
|
|
"""Get the canonical brand name from a filename slug or lowercase name"""
|
|
slug_lower = make_slug.lower().strip()
|
|
|
|
# Try direct lookup
|
|
if slug_lower in self.brand_name_map:
|
|
return self.brand_name_map[slug_lower]
|
|
|
|
# Try with underscores converted to spaces
|
|
slug_spaced = slug_lower.replace('_', ' ')
|
|
if slug_spaced in self.brand_name_map:
|
|
return self.brand_name_map[slug_spaced]
|
|
|
|
# Fallback: title case (shouldn't reach here if brand mapping is complete)
|
|
return slug_spaced.title()
|
|
|
|
def format_model_name(self, model_slug: str) -> str:
|
|
"""
|
|
Format model name from slug to human-readable format.
|
|
Examples:
|
|
"bronco_sport" -> "Bronco Sport"
|
|
"f-150" -> "F-150"
|
|
"expedition_max" -> "Expedition Max"
|
|
"sierra_1500" -> "Sierra 1500"
|
|
"""
|
|
if not model_slug:
|
|
return ''
|
|
|
|
# Replace underscores with spaces
|
|
model_name = model_slug.replace('_', ' ')
|
|
|
|
# Apply Title Case
|
|
model_name = model_name.title()
|
|
|
|
return model_name.strip()
|
|
|
|
def normalize_displacement(self, disp_str: str) -> Optional[str]:
|
|
"""Normalize displacement to L format (e.g., '3506 Cm3' -> '3.5L', '3.5L' -> '3.5L')"""
|
|
if not disp_str:
|
|
return None
|
|
|
|
disp_str = disp_str.strip()
|
|
|
|
# Check if already in L format
|
|
if disp_str.upper().endswith('L'):
|
|
# Extract numeric part and normalize
|
|
match = re.search(r'(\d+\.?\d*)', disp_str)
|
|
if match:
|
|
liters = float(match.group(1))
|
|
return f"{liters:.1F}L"
|
|
return None
|
|
|
|
# Check if in Cm3 format
|
|
cm3_match = re.search(r'(\d+)\s*Cm3', disp_str, re.IGNORECASE)
|
|
if cm3_match:
|
|
cm3 = int(cm3_match.group(1))
|
|
liters = cm3 / 1000.0
|
|
return f"{liters:.1F}L"
|
|
|
|
return None
|
|
|
|
def parse_engine_string(self, engine_str: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""Parse engine string like '2.0L I4' into displacement and configuration"""
|
|
pattern = r'(\d+\.?\d*L?)\s*([IVL]\d+|[A-Z]+\d*)'
|
|
match = re.search(pattern, engine_str, re.IGNORECASE)
|
|
|
|
if match:
|
|
displacement = match.group(1).upper()
|
|
if not displacement.endswith('L'):
|
|
displacement += 'L'
|
|
configuration = match.group(2).upper()
|
|
return (displacement, configuration)
|
|
|
|
return (None, None)
|
|
|
|
def extract_engine_specs(self, engine_record: Dict) -> Dict:
|
|
"""Extract relevant specs from engine JSON record"""
|
|
specs = engine_record.get('specs', {})
|
|
engine_specs = specs.get('Engine Specs', {})
|
|
trans_specs = specs.get('Transmission Specs', {})
|
|
|
|
return {
|
|
'name': engine_record.get('name', ''),
|
|
'displacement': engine_specs.get('Displacement:', ''),
|
|
'configuration': engine_specs.get('Cylinders:', ''),
|
|
'horsepower': engine_specs.get('Power:', ''),
|
|
'torque': engine_specs.get('Torque:', ''),
|
|
'fuel_type': engine_specs.get('Fuel:', ''),
|
|
'fuel_system': engine_specs.get('Fuel System:', ''),
|
|
'aspiration': engine_specs.get('Aspiration:', ''),
|
|
'transmission_type': trans_specs.get('Gearbox:', ''),
|
|
'drive_type': trans_specs.get('Drive Type:', ''),
|
|
'specs_json': specs
|
|
}
|
|
|
|
def format_engine_display(self, specs: Dict) -> str:
|
|
"""Format engine display string like 'V8 3.5L Turbo' or 'I4 2.0L Supercharged'"""
|
|
parts = []
|
|
|
|
# Configuration (V8, I4, L6, etc.)
|
|
config = specs.get('configuration', '').strip()
|
|
if config:
|
|
parts.append(config)
|
|
|
|
# Displacement in L format
|
|
disp = self.normalize_displacement(specs.get('displacement', ''))
|
|
|
|
# If displacement not in specs, try to extract from name
|
|
if not disp:
|
|
name = specs.get('name', '')
|
|
disp_match = re.search(r'(\d+\.?\d*)\s*L', name, re.IGNORECASE)
|
|
if disp_match:
|
|
disp = f"{float(disp_match.group(1)):.1f}L"
|
|
|
|
if disp:
|
|
parts.append(disp)
|
|
|
|
# Aspiration (Turbo, Supercharged) - check both aspiration and fuel_system fields
|
|
aspiration = specs.get('aspiration', '').strip()
|
|
fuel_system = specs.get('fuel_system', '').strip()
|
|
combined_text = f"{aspiration} {fuel_system}".lower()
|
|
|
|
if combined_text and 'naturally aspirated' not in combined_text:
|
|
if 'turbo' in combined_text:
|
|
parts.append('Turbo')
|
|
elif 'supercharg' in combined_text:
|
|
parts.append('Supercharged')
|
|
|
|
return ' '.join(parts) if parts else 'Unknown'
|
|
|
|
def format_transmission_display(self, trans_type: str, speeds: Optional[str]) -> str:
|
|
"""Format transmission display string like '8-Speed Automatic' or '6-Speed Manual'"""
|
|
parts = []
|
|
|
|
trans_type_clean = trans_type.strip() if trans_type else ''
|
|
|
|
# Extract speed if provided
|
|
if speeds:
|
|
parts.append(f"{speeds}-Speed")
|
|
elif trans_type_clean:
|
|
# Try to extract speed from transmission type string
|
|
speed_match = re.search(r'(\d+)[- ]?[Ss]peed', trans_type_clean)
|
|
if speed_match:
|
|
parts.append(f"{speed_match.group(1)}-Speed")
|
|
|
|
# Determine transmission type
|
|
if trans_type_clean:
|
|
trans_lower = trans_type_clean.lower()
|
|
if 'manual' in trans_lower:
|
|
parts.append('Manual')
|
|
elif 'automatic' in trans_lower or 'auto' in trans_lower:
|
|
parts.append('Automatic')
|
|
elif 'cvt' in trans_lower:
|
|
# CVT doesn't have speeds
|
|
return 'CVT'
|
|
elif 'direct' in trans_lower or 'dct' in trans_lower:
|
|
parts.append('Dual-Clutch')
|
|
else:
|
|
parts.append('Automatic') # Default assumption
|
|
|
|
return ' '.join(parts) if parts else 'Unknown'
|
|
|
|
def sql_escape(self, value):
|
|
"""Escape values for SQL with proper handling of special characters"""
|
|
if value is None:
|
|
return 'NULL'
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
if isinstance(value, dict):
|
|
# Convert dict to JSON string and escape
|
|
json_str = json.dumps(value)
|
|
# Escape single quotes, newlines, carriage returns, and backslashes for PostgreSQL
|
|
escaped = json_str.replace('\\', '\\\\').replace("'", "''").replace('\n', '\\n').replace('\r', '\\r')
|
|
return "'" + escaped + "'"
|
|
# Convert to string
|
|
str_value = str(value).strip()
|
|
# Empty strings should be NULL
|
|
if not str_value:
|
|
return 'NULL'
|
|
# String - escape single quotes, newlines, carriage returns, and backslashes
|
|
escaped = str_value.replace('\\', '\\\\').replace("'", "''").replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
|
|
return "'" + escaped + "'"
|
|
|
|
def generate_engines_sql(self):
|
|
"""Generate SQL file for engines and transmissions"""
|
|
print("\n⚙️ Generating engine and transmission SQL...")
|
|
|
|
os.makedirs('output', exist_ok=True)
|
|
|
|
# Process engines
|
|
engines_insert_values = []
|
|
transmissions_set = set()
|
|
engine_id = 1
|
|
|
|
for engine_record in self.engines_data:
|
|
specs = self.extract_engine_specs(engine_record)
|
|
|
|
# Create simplified display name
|
|
display_name = self.format_engine_display(specs)
|
|
|
|
# Only store ID and name
|
|
values = (
|
|
engine_id,
|
|
self.sql_escape(display_name)
|
|
)
|
|
|
|
engines_insert_values.append(f"({','.join(map(str, values))})")
|
|
|
|
# Build engine cache with normalized displacement
|
|
if specs['displacement'] and specs['configuration']:
|
|
disp_norm = self.normalize_displacement(specs['displacement'])
|
|
config_norm = specs['configuration'].upper().strip()
|
|
if disp_norm and config_norm:
|
|
key = (disp_norm, config_norm)
|
|
if key not in self.engine_cache:
|
|
self.engine_cache[key] = engine_id
|
|
|
|
# Extract transmission
|
|
if specs['transmission_type'] or specs['drive_type']:
|
|
speeds = None
|
|
if specs['transmission_type']:
|
|
speed_match = re.search(r'(\d+)', specs['transmission_type'])
|
|
if speed_match:
|
|
speeds = speed_match.group(1)
|
|
|
|
trans_tuple = (
|
|
specs['transmission_type'] or 'Unknown',
|
|
speeds,
|
|
specs['drive_type'] or 'Unknown'
|
|
)
|
|
transmissions_set.add(trans_tuple)
|
|
|
|
engine_id += 1
|
|
|
|
# Write engines SQL file
|
|
print(f" Writing {len(engines_insert_values)} engines to SQL file...")
|
|
with open(self.engines_sql_file, 'w', encoding='utf-8') as f:
|
|
f.write("-- Engines data import\n")
|
|
f.write("-- Generated by ETL script\n\n")
|
|
f.write("BEGIN;\n\n")
|
|
|
|
# Write in batches of 500 for better performance
|
|
batch_size = 500
|
|
for i in range(0, len(engines_insert_values), batch_size):
|
|
batch = engines_insert_values[i:i+batch_size]
|
|
f.write("INSERT INTO engines (id, name) VALUES\n")
|
|
f.write(",\n".join(batch))
|
|
f.write(";\n\n")
|
|
|
|
# Reset sequence
|
|
f.write(f"SELECT setval('engines_id_seq', {engine_id});\n\n")
|
|
f.write("COMMIT;\n")
|
|
|
|
print(f" ✓ Wrote engines SQL to {self.engines_sql_file}")
|
|
|
|
# Write transmissions SQL file
|
|
print(f" Writing {len(transmissions_set)} transmissions to SQL file...")
|
|
trans_id = 1
|
|
with open(self.transmissions_sql_file, 'w', encoding='utf-8') as f:
|
|
f.write("-- Transmissions data import\n")
|
|
f.write("-- Generated by ETL script\n\n")
|
|
f.write("BEGIN;\n\n")
|
|
f.write("INSERT INTO transmissions (id, type) VALUES\n")
|
|
|
|
trans_values = []
|
|
for trans_type, speeds, drive_type in sorted(transmissions_set):
|
|
# Create simplified display name for transmission
|
|
display_name = self.format_transmission_display(trans_type, speeds)
|
|
|
|
# Only store ID and type
|
|
values = (
|
|
trans_id,
|
|
self.sql_escape(display_name)
|
|
)
|
|
trans_values.append(f"({','.join(map(str, values))})")
|
|
|
|
# Build transmission cache
|
|
key = (trans_type, speeds, drive_type)
|
|
self.transmission_cache[key] = trans_id
|
|
trans_id += 1
|
|
|
|
f.write(",\n".join(trans_values))
|
|
f.write(";\n\n")
|
|
f.write(f"SELECT setval('transmissions_id_seq', {trans_id});\n\n")
|
|
f.write("COMMIT;\n")
|
|
|
|
print(f" ✓ Wrote transmissions SQL to {self.transmissions_sql_file}")
|
|
print(f" ✓ Built engine cache with {len(self.engine_cache)} combinations")
|
|
|
|
def find_matching_engine_id(self, engine_str: str) -> Optional[int]:
|
|
"""Find engine_id from cache based on engine string"""
|
|
disp, config = self.parse_engine_string(engine_str)
|
|
if disp and config:
|
|
# Normalize displacement to match cache format
|
|
disp_norm = self.normalize_displacement(disp)
|
|
if not disp_norm:
|
|
return None
|
|
|
|
key = (disp_norm, config)
|
|
if key in self.engine_cache:
|
|
return self.engine_cache[key]
|
|
|
|
# Try normalized variations for configuration
|
|
for cached_key, engine_id in self.engine_cache.items():
|
|
cached_disp, cached_config = cached_key
|
|
if cached_disp == disp_norm and self.config_matches(config, cached_config):
|
|
return engine_id
|
|
|
|
return None
|
|
|
|
def config_matches(self, config1: str, config2: str) -> bool:
|
|
"""Check if two engine configurations match"""
|
|
c1 = config1.upper().replace('-', '').replace(' ', '')
|
|
c2 = config2.upper().replace('-', '').replace(' ', '')
|
|
|
|
if c1 == c2:
|
|
return True
|
|
|
|
if c1.replace('I', 'L') == c2.replace('I', 'L'):
|
|
return True
|
|
|
|
if 'INLINE' in c1 or 'INLINE' in c2:
|
|
c1_num = re.search(r'\d+', c1)
|
|
c2_num = re.search(r'\d+', c2)
|
|
if c1_num and c2_num and c1_num.group() == c2_num.group():
|
|
return True
|
|
|
|
return False
|
|
|
|
def find_transmission_id_for_engine(self, engine_id: int) -> Optional[int]:
|
|
"""Find transmission_id for a given engine_id by looking up the engine's transmission specs"""
|
|
if engine_id < 1 or engine_id > len(self.engines_data):
|
|
return None
|
|
|
|
# Engine IDs are 1-indexed, list is 0-indexed
|
|
engine_record = self.engines_data[engine_id - 1]
|
|
specs = self.extract_engine_specs(engine_record)
|
|
|
|
# Extract transmission details
|
|
trans_type = specs.get('transmission_type', '') or 'Unknown'
|
|
drive_type = specs.get('drive_type', '') or 'Unknown'
|
|
|
|
# Extract speeds from transmission type
|
|
speeds = None
|
|
if trans_type:
|
|
speed_match = re.search(r'(\d+)', trans_type)
|
|
if speed_match:
|
|
speeds = speed_match.group(1)
|
|
|
|
# Look up in transmission cache
|
|
trans_key = (trans_type, speeds, drive_type)
|
|
return self.transmission_cache.get(trans_key)
|
|
|
|
def process_makes_filter(self):
|
|
"""Process all makes-filter JSON files and build vehicle records"""
|
|
print(f"\n🚗 Processing makes-filter JSON files (filtering for {self.min_year}+)...")
|
|
|
|
json_files = list(self.makes_filter_dir.glob('*.json'))
|
|
print(f" Found {len(json_files)} make files to process")
|
|
|
|
total_records = 0
|
|
filtered_records = 0
|
|
|
|
for json_file in sorted(json_files):
|
|
make_name = self.get_canonical_make_name(json_file.stem)
|
|
print(f" Processing {make_name}...")
|
|
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
make_data = json.load(f)
|
|
|
|
for brand_key, year_entries in make_data.items():
|
|
for year_entry in year_entries:
|
|
year = int(year_entry.get('year', 0))
|
|
if year == 0:
|
|
continue
|
|
|
|
# Filter out vehicles older than min_year
|
|
if year < self.min_year:
|
|
filtered_records += 1
|
|
continue
|
|
|
|
models = year_entry.get('models', [])
|
|
for model in models:
|
|
model_name = self.format_model_name(model.get('name', ''))
|
|
engines = model.get('engines', [])
|
|
submodels = model.get('submodels', [])
|
|
|
|
if not submodels:
|
|
submodels = ['Base']
|
|
|
|
for trim in submodels:
|
|
for engine_str in engines:
|
|
engine_id = self.find_matching_engine_id(engine_str)
|
|
|
|
# Link transmission if engine was matched
|
|
transmission_id = None
|
|
if engine_id:
|
|
transmission_id = self.find_transmission_id_for_engine(engine_id)
|
|
|
|
self.vehicle_records.append({
|
|
'year': year,
|
|
'make': make_name,
|
|
'model': model_name,
|
|
'trim': trim,
|
|
'engine_id': engine_id,
|
|
'transmission_id': transmission_id
|
|
})
|
|
total_records += 1
|
|
|
|
print(f" ✓ Processed {total_records} vehicle configuration records")
|
|
print(f" ✓ Filtered out {filtered_records} records older than {self.min_year}")
|
|
|
|
def hybrid_backfill(self):
|
|
"""Hybrid backfill for recent years from automobiles.json"""
|
|
print("\n🔄 Performing hybrid backfill for recent years...")
|
|
|
|
existing_combos = set()
|
|
for record in self.vehicle_records:
|
|
key = (record['year'], record['make'].lower(), record['model'].lower())
|
|
existing_combos.add(key)
|
|
|
|
brand_map = {}
|
|
for brand in self.brands_data:
|
|
brand_id = brand.get('id')
|
|
brand_name = brand.get('name', '').lower()
|
|
brand_map[brand_id] = brand_name
|
|
|
|
filtered_makes = set()
|
|
for json_file in self.makes_filter_dir.glob('*.json'):
|
|
make_name = json_file.stem.replace('_', ' ').lower()
|
|
filtered_makes.add(make_name)
|
|
|
|
backfill_count = 0
|
|
recent_years = [2023, 2024, 2025]
|
|
|
|
for auto in self.automobiles_data:
|
|
brand_id = auto.get('brand_id')
|
|
brand_name = brand_map.get(brand_id, '').lower()
|
|
|
|
if brand_name not in filtered_makes:
|
|
continue
|
|
|
|
auto_name = auto.get('name', '')
|
|
year_match = re.search(r'(202[3-5])', auto_name)
|
|
if not year_match:
|
|
continue
|
|
|
|
year = int(year_match.group(1))
|
|
if year not in recent_years:
|
|
continue
|
|
|
|
# Apply year filter to backfill as well
|
|
if year < self.min_year:
|
|
continue
|
|
|
|
model_name = auto_name
|
|
for remove_str in [str(year), brand_name]:
|
|
model_name = model_name.replace(remove_str, '')
|
|
model_name = model_name.strip()
|
|
model_name = self.format_model_name(model_name)
|
|
|
|
key = (year, brand_name, model_name.lower())
|
|
if key in existing_combos:
|
|
continue
|
|
|
|
auto_id = auto.get('id')
|
|
matching_engines = [e for e in self.engines_data if e.get('automobile_id') == auto_id]
|
|
|
|
if not matching_engines:
|
|
continue
|
|
|
|
for engine_record in matching_engines[:3]:
|
|
specs = self.extract_engine_specs(engine_record)
|
|
|
|
engine_id = None
|
|
if specs['displacement'] and specs['configuration']:
|
|
disp_norm = self.normalize_displacement(specs['displacement'])
|
|
config_norm = specs['configuration'].upper().strip()
|
|
if disp_norm and config_norm:
|
|
key = (disp_norm, config_norm)
|
|
engine_id = self.engine_cache.get(key)
|
|
|
|
# Link transmission if engine was matched
|
|
transmission_id = None
|
|
if engine_id:
|
|
transmission_id = self.find_transmission_id_for_engine(engine_id)
|
|
|
|
self.vehicle_records.append({
|
|
'year': year,
|
|
'make': self.get_canonical_make_name(brand_name),
|
|
'model': model_name,
|
|
'trim': 'Base',
|
|
'engine_id': engine_id,
|
|
'transmission_id': transmission_id
|
|
})
|
|
backfill_count += 1
|
|
existing_combos.add((year, brand_name, model_name.lower()))
|
|
|
|
print(f" ✓ Backfilled {backfill_count} recent vehicle configurations")
|
|
|
|
def generate_vehicles_sql(self):
|
|
"""Generate SQL file for vehicle_options"""
|
|
print("\n📝 Generating vehicle options SQL...")
|
|
|
|
with open(self.vehicles_sql_file, 'w', encoding='utf-8') as f:
|
|
f.write("-- Vehicle options data import\n")
|
|
f.write("-- Generated by ETL script\n\n")
|
|
f.write("BEGIN;\n\n")
|
|
|
|
# Write in batches of 1000
|
|
batch_size = 1000
|
|
total_batches = (len(self.vehicle_records) + batch_size - 1) // batch_size
|
|
|
|
for batch_num in range(total_batches):
|
|
start_idx = batch_num * batch_size
|
|
end_idx = min(start_idx + batch_size, len(self.vehicle_records))
|
|
batch = self.vehicle_records[start_idx:end_idx]
|
|
|
|
f.write("INSERT INTO vehicle_options (year, make, model, trim, engine_id, transmission_id) VALUES\n")
|
|
|
|
values_list = []
|
|
for record in batch:
|
|
values = (
|
|
record['year'],
|
|
self.sql_escape(record['make']),
|
|
self.sql_escape(record['model']),
|
|
self.sql_escape(record['trim']),
|
|
record['engine_id'] if record['engine_id'] else 'NULL',
|
|
record['transmission_id'] if record['transmission_id'] else 'NULL'
|
|
)
|
|
values_list.append(f"({','.join(map(str, values))})")
|
|
|
|
f.write(",\n".join(values_list))
|
|
f.write(";\n\n")
|
|
|
|
print(f" Batch {batch_num + 1}/{total_batches} written ({len(batch)} records)")
|
|
|
|
f.write("COMMIT;\n")
|
|
|
|
print(f" ✓ Wrote {len(self.vehicle_records)} vehicle options to {self.vehicles_sql_file}")
|
|
|
|
def generate_stats(self):
|
|
"""Generate statistics file"""
|
|
print("\n📊 Generating statistics...")
|
|
|
|
stats = {
|
|
'total_engines': len(self.engines_data),
|
|
'total_transmissions': len(self.transmission_cache),
|
|
'total_vehicles': len(self.vehicle_records),
|
|
'unique_years': len(set(r['year'] for r in self.vehicle_records)),
|
|
'unique_makes': len(set(r['make'] for r in self.vehicle_records)),
|
|
'unique_models': len(set(r['model'] for r in self.vehicle_records)),
|
|
'year_range': f"{min(r['year'] for r in self.vehicle_records)}-{max(r['year'] for r in self.vehicle_records)}"
|
|
}
|
|
|
|
with open('output/stats.txt', 'w') as f:
|
|
f.write("=" * 60 + "\n")
|
|
f.write("ETL Statistics\n")
|
|
f.write("=" * 60 + "\n\n")
|
|
for key, value in stats.items():
|
|
formatted_value = f"{value:,}" if isinstance(value, int) else value
|
|
f.write(f"{key.replace('_', ' ').title()}: {formatted_value}\n")
|
|
|
|
print("\n📊 Statistics:")
|
|
for key, value in stats.items():
|
|
formatted_value = f"{value:,}" if isinstance(value, int) else value
|
|
print(f" {key.replace('_', ' ').title()}: {formatted_value}")
|
|
|
|
def run(self):
|
|
"""Execute the complete ETL pipeline"""
|
|
try:
|
|
print("=" * 60)
|
|
print("🚀 Automotive Vehicle ETL - SQL Generator")
|
|
print(f" Year Filter: {self.min_year} and newer")
|
|
print("=" * 60)
|
|
|
|
self.load_json_files()
|
|
self.generate_engines_sql()
|
|
self.process_makes_filter()
|
|
self.hybrid_backfill()
|
|
self.generate_vehicles_sql()
|
|
self.generate_stats()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("✅ SQL Files Generated Successfully!")
|
|
print("=" * 60)
|
|
print("\nGenerated files:")
|
|
print(f" - {self.engines_sql_file}")
|
|
print(f" - {self.transmissions_sql_file}")
|
|
print(f" - {self.vehicles_sql_file}")
|
|
print(f" - output/stats.txt")
|
|
print("\nNext step: Import SQL files into database")
|
|
print(" ./import_data.sh")
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ ETL Pipeline Failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
if __name__ == '__main__':
|
|
etl = VehicleSQLGenerator()
|
|
etl.run()
|