New Vehicle Database
This commit is contained in:
480
data/make-model-import/etl_generate_sql.py
Executable file
480
data/make-model-import/etl_generate_sql.py
Executable file
@@ -0,0 +1,480 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ETL Script for Automotive Vehicle Selection Database
|
||||
Generates SQL import files for loading into PostgreSQL
|
||||
No database connection required - pure file-based processing
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Set, Tuple, Optional
|
||||
|
||||
class VehicleSQLGenerator:
|
||||
def __init__(self):
|
||||
self.makes_filter_dir = Path('makes-filter')
|
||||
self.engines_data = []
|
||||
self.automobiles_data = []
|
||||
self.brands_data = []
|
||||
|
||||
# Year filter - only include vehicles 1980 or newer
|
||||
self.min_year = 1980
|
||||
|
||||
# In-memory caches for fast lookups
|
||||
self.engine_cache = {} # Key: (displacement, config) -> engine record
|
||||
self.transmission_cache = {} # Key: (type, speeds, drive) -> transmission record
|
||||
self.vehicle_records = []
|
||||
|
||||
# Output SQL files
|
||||
self.engines_sql_file = 'output/01_engines.sql'
|
||||
self.transmissions_sql_file = 'output/02_transmissions.sql'
|
||||
self.vehicles_sql_file = 'output/03_vehicle_options.sql'
|
||||
|
||||
def load_json_files(self):
|
||||
"""Load the large JSON data files"""
|
||||
print("\n📂 Loading source JSON files...")
|
||||
|
||||
print(" Loading engines.json...")
|
||||
with open('engines.json', 'r', encoding='utf-8') as f:
|
||||
self.engines_data = json.load(f)
|
||||
print(f" ✓ Loaded {len(self.engines_data)} engine records")
|
||||
|
||||
print(" Loading automobiles.json...")
|
||||
with open('automobiles.json', 'r', encoding='utf-8') as f:
|
||||
self.automobiles_data = json.load(f)
|
||||
print(f" ✓ Loaded {len(self.automobiles_data)} automobile records")
|
||||
|
||||
print(" Loading brands.json...")
|
||||
with open('brands.json', 'r', encoding='utf-8') as f:
|
||||
self.brands_data = json.load(f)
|
||||
print(f" ✓ Loaded {len(self.brands_data)} brand records")
|
||||
|
||||
def parse_engine_string(self, engine_str: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Parse engine string like '2.0L I4' into displacement and configuration"""
|
||||
pattern = r'(\d+\.?\d*L?)\s*([IVL]\d+|[A-Z]+\d*)'
|
||||
match = re.search(pattern, engine_str, re.IGNORECASE)
|
||||
|
||||
if match:
|
||||
displacement = match.group(1).upper()
|
||||
if not displacement.endswith('L'):
|
||||
displacement += 'L'
|
||||
configuration = match.group(2).upper()
|
||||
return (displacement, configuration)
|
||||
|
||||
return (None, None)
|
||||
|
||||
def extract_engine_specs(self, engine_record: Dict) -> Dict:
|
||||
"""Extract relevant specs from engine JSON record"""
|
||||
specs = engine_record.get('specs', {})
|
||||
engine_specs = specs.get('Engine Specs', {})
|
||||
trans_specs = specs.get('Transmission Specs', {})
|
||||
|
||||
return {
|
||||
'name': engine_record.get('name', ''),
|
||||
'displacement': engine_specs.get('Displacement', ''),
|
||||
'configuration': engine_specs.get('Cylinders', ''),
|
||||
'horsepower': engine_specs.get('Power', ''),
|
||||
'torque': engine_specs.get('Torque', ''),
|
||||
'fuel_type': engine_specs.get('Fuel', ''),
|
||||
'fuel_system': engine_specs.get('Fuel System', ''),
|
||||
'aspiration': engine_specs.get('Aspiration', ''),
|
||||
'transmission_type': trans_specs.get('Gearbox', ''),
|
||||
'drive_type': trans_specs.get('Drive Type', ''),
|
||||
'specs_json': specs
|
||||
}
|
||||
|
||||
def sql_escape(self, value):
|
||||
"""Escape values for SQL"""
|
||||
if value is None:
|
||||
return 'NULL'
|
||||
if isinstance(value, (int, float)):
|
||||
return str(value)
|
||||
if isinstance(value, dict):
|
||||
# Convert dict to JSON string and escape
|
||||
json_str = json.dumps(value)
|
||||
return "'" + json_str.replace("'", "''") + "'"
|
||||
# String - escape single quotes
|
||||
return "'" + str(value).replace("'", "''") + "'"
|
||||
|
||||
def generate_engines_sql(self):
|
||||
"""Generate SQL file for engines and transmissions"""
|
||||
print("\n⚙️ Generating engine and transmission SQL...")
|
||||
|
||||
os.makedirs('output', exist_ok=True)
|
||||
|
||||
# Process engines
|
||||
engines_insert_values = []
|
||||
transmissions_set = set()
|
||||
engine_id = 1
|
||||
|
||||
for engine_record in self.engines_data:
|
||||
specs = self.extract_engine_specs(engine_record)
|
||||
|
||||
values = (
|
||||
engine_id,
|
||||
self.sql_escape(specs['name']),
|
||||
self.sql_escape(specs['displacement']),
|
||||
self.sql_escape(specs['configuration']),
|
||||
self.sql_escape(specs['horsepower']),
|
||||
self.sql_escape(specs['torque']),
|
||||
self.sql_escape(specs['fuel_type']),
|
||||
self.sql_escape(specs['fuel_system']),
|
||||
self.sql_escape(specs['aspiration']),
|
||||
self.sql_escape(specs['specs_json'])
|
||||
)
|
||||
|
||||
engines_insert_values.append(f"({','.join(map(str, values))})")
|
||||
|
||||
# Build engine cache
|
||||
if specs['displacement'] and specs['configuration']:
|
||||
disp_norm = specs['displacement'].upper().strip()
|
||||
config_norm = specs['configuration'].upper().strip()
|
||||
key = (disp_norm, config_norm)
|
||||
if key not in self.engine_cache:
|
||||
self.engine_cache[key] = engine_id
|
||||
|
||||
# Extract transmission
|
||||
if specs['transmission_type'] or specs['drive_type']:
|
||||
speeds = None
|
||||
if specs['transmission_type']:
|
||||
speed_match = re.search(r'(\d+)', specs['transmission_type'])
|
||||
if speed_match:
|
||||
speeds = speed_match.group(1)
|
||||
|
||||
trans_tuple = (
|
||||
specs['transmission_type'] or 'Unknown',
|
||||
speeds,
|
||||
specs['drive_type'] or 'Unknown'
|
||||
)
|
||||
transmissions_set.add(trans_tuple)
|
||||
|
||||
engine_id += 1
|
||||
|
||||
# Write engines SQL file
|
||||
print(f" Writing {len(engines_insert_values)} engines to SQL file...")
|
||||
with open(self.engines_sql_file, 'w', encoding='utf-8') as f:
|
||||
f.write("-- Engines data import\n")
|
||||
f.write("-- Generated by ETL script\n\n")
|
||||
f.write("BEGIN;\n\n")
|
||||
|
||||
# Write in batches of 500 for better performance
|
||||
batch_size = 500
|
||||
for i in range(0, len(engines_insert_values), batch_size):
|
||||
batch = engines_insert_values[i:i+batch_size]
|
||||
f.write("INSERT INTO engines (id, name, displacement, configuration, horsepower, torque, fuel_type, fuel_system, aspiration, specs_json) VALUES\n")
|
||||
f.write(",\n".join(batch))
|
||||
f.write(";\n\n")
|
||||
|
||||
# Reset sequence
|
||||
f.write(f"SELECT setval('engines_id_seq', {engine_id});\n\n")
|
||||
f.write("COMMIT;\n")
|
||||
|
||||
print(f" ✓ Wrote engines SQL to {self.engines_sql_file}")
|
||||
|
||||
# Write transmissions SQL file
|
||||
print(f" Writing {len(transmissions_set)} transmissions to SQL file...")
|
||||
trans_id = 1
|
||||
with open(self.transmissions_sql_file, 'w', encoding='utf-8') as f:
|
||||
f.write("-- Transmissions data import\n")
|
||||
f.write("-- Generated by ETL script\n\n")
|
||||
f.write("BEGIN;\n\n")
|
||||
f.write("INSERT INTO transmissions (id, type, speeds, drive_type) VALUES\n")
|
||||
|
||||
trans_values = []
|
||||
for trans_type, speeds, drive_type in sorted(transmissions_set):
|
||||
values = (
|
||||
trans_id,
|
||||
self.sql_escape(trans_type),
|
||||
self.sql_escape(speeds),
|
||||
self.sql_escape(drive_type)
|
||||
)
|
||||
trans_values.append(f"({','.join(map(str, values))})")
|
||||
|
||||
# Build transmission cache
|
||||
key = (trans_type, speeds, drive_type)
|
||||
self.transmission_cache[key] = trans_id
|
||||
trans_id += 1
|
||||
|
||||
f.write(",\n".join(trans_values))
|
||||
f.write(";\n\n")
|
||||
f.write(f"SELECT setval('transmissions_id_seq', {trans_id});\n\n")
|
||||
f.write("COMMIT;\n")
|
||||
|
||||
print(f" ✓ Wrote transmissions SQL to {self.transmissions_sql_file}")
|
||||
print(f" ✓ Built engine cache with {len(self.engine_cache)} combinations")
|
||||
|
||||
def find_matching_engine_id(self, engine_str: str) -> Optional[int]:
|
||||
"""Find engine_id from cache based on engine string"""
|
||||
disp, config = self.parse_engine_string(engine_str)
|
||||
if disp and config:
|
||||
key = (disp, config)
|
||||
if key in self.engine_cache:
|
||||
return self.engine_cache[key]
|
||||
|
||||
# Try normalized variations
|
||||
for cached_key, engine_id in self.engine_cache.items():
|
||||
cached_disp, cached_config = cached_key
|
||||
if cached_disp == disp and self.config_matches(config, cached_config):
|
||||
return engine_id
|
||||
|
||||
return None
|
||||
|
||||
def config_matches(self, config1: str, config2: str) -> bool:
|
||||
"""Check if two engine configurations match"""
|
||||
c1 = config1.upper().replace('-', '').replace(' ', '')
|
||||
c2 = config2.upper().replace('-', '').replace(' ', '')
|
||||
|
||||
if c1 == c2:
|
||||
return True
|
||||
|
||||
if c1.replace('I', 'L') == c2.replace('I', 'L'):
|
||||
return True
|
||||
|
||||
if 'INLINE' in c1 or 'INLINE' in c2:
|
||||
c1_num = re.search(r'\d+', c1)
|
||||
c2_num = re.search(r'\d+', c2)
|
||||
if c1_num and c2_num and c1_num.group() == c2_num.group():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def process_makes_filter(self):
|
||||
"""Process all makes-filter JSON files and build vehicle records"""
|
||||
print(f"\n🚗 Processing makes-filter JSON files (filtering for {self.min_year}+)...")
|
||||
|
||||
json_files = list(self.makes_filter_dir.glob('*.json'))
|
||||
print(f" Found {len(json_files)} make files to process")
|
||||
|
||||
total_records = 0
|
||||
filtered_records = 0
|
||||
|
||||
for json_file in sorted(json_files):
|
||||
make_name = json_file.stem.replace('_', ' ').title()
|
||||
print(f" Processing {make_name}...")
|
||||
|
||||
with open(json_file, 'r', encoding='utf-8') as f:
|
||||
make_data = json.load(f)
|
||||
|
||||
for brand_key, year_entries in make_data.items():
|
||||
for year_entry in year_entries:
|
||||
year = int(year_entry.get('year', 0))
|
||||
if year == 0:
|
||||
continue
|
||||
|
||||
# Filter out vehicles older than min_year
|
||||
if year < self.min_year:
|
||||
filtered_records += 1
|
||||
continue
|
||||
|
||||
models = year_entry.get('models', [])
|
||||
for model in models:
|
||||
model_name = model.get('name', '')
|
||||
engines = model.get('engines', [])
|
||||
submodels = model.get('submodels', [])
|
||||
|
||||
if not submodels:
|
||||
submodels = ['Base']
|
||||
|
||||
for trim in submodels:
|
||||
for engine_str in engines:
|
||||
engine_id = self.find_matching_engine_id(engine_str)
|
||||
transmission_id = None
|
||||
|
||||
self.vehicle_records.append({
|
||||
'year': year,
|
||||
'make': make_name,
|
||||
'model': model_name,
|
||||
'trim': trim,
|
||||
'engine_id': engine_id,
|
||||
'transmission_id': transmission_id
|
||||
})
|
||||
total_records += 1
|
||||
|
||||
print(f" ✓ Processed {total_records} vehicle configuration records")
|
||||
print(f" ✓ Filtered out {filtered_records} records older than {self.min_year}")
|
||||
|
||||
def hybrid_backfill(self):
|
||||
"""Hybrid backfill for recent years from automobiles.json"""
|
||||
print("\n🔄 Performing hybrid backfill for recent years...")
|
||||
|
||||
existing_combos = set()
|
||||
for record in self.vehicle_records:
|
||||
key = (record['year'], record['make'].lower(), record['model'].lower())
|
||||
existing_combos.add(key)
|
||||
|
||||
brand_map = {}
|
||||
for brand in self.brands_data:
|
||||
brand_id = brand.get('id')
|
||||
brand_name = brand.get('name', '').lower()
|
||||
brand_map[brand_id] = brand_name
|
||||
|
||||
filtered_makes = set()
|
||||
for json_file in self.makes_filter_dir.glob('*.json'):
|
||||
make_name = json_file.stem.replace('_', ' ').lower()
|
||||
filtered_makes.add(make_name)
|
||||
|
||||
backfill_count = 0
|
||||
recent_years = [2023, 2024, 2025]
|
||||
|
||||
for auto in self.automobiles_data:
|
||||
brand_id = auto.get('brand_id')
|
||||
brand_name = brand_map.get(brand_id, '').lower()
|
||||
|
||||
if brand_name not in filtered_makes:
|
||||
continue
|
||||
|
||||
auto_name = auto.get('name', '')
|
||||
year_match = re.search(r'(202[3-5])', auto_name)
|
||||
if not year_match:
|
||||
continue
|
||||
|
||||
year = int(year_match.group(1))
|
||||
if year not in recent_years:
|
||||
continue
|
||||
|
||||
# Apply year filter to backfill as well
|
||||
if year < self.min_year:
|
||||
continue
|
||||
|
||||
model_name = auto_name
|
||||
for remove_str in [str(year), brand_name]:
|
||||
model_name = model_name.replace(remove_str, '')
|
||||
model_name = model_name.strip()
|
||||
|
||||
key = (year, brand_name, model_name.lower())
|
||||
if key in existing_combos:
|
||||
continue
|
||||
|
||||
auto_id = auto.get('id')
|
||||
matching_engines = [e for e in self.engines_data if e.get('automobile_id') == auto_id]
|
||||
|
||||
if not matching_engines:
|
||||
continue
|
||||
|
||||
for engine_record in matching_engines[:3]:
|
||||
specs = self.extract_engine_specs(engine_record)
|
||||
|
||||
engine_id = None
|
||||
if specs['displacement'] and specs['configuration']:
|
||||
disp_norm = specs['displacement'].upper().strip()
|
||||
config_norm = specs['configuration'].upper().strip()
|
||||
key = (disp_norm, config_norm)
|
||||
engine_id = self.engine_cache.get(key)
|
||||
|
||||
self.vehicle_records.append({
|
||||
'year': year,
|
||||
'make': brand_name.title(),
|
||||
'model': model_name,
|
||||
'trim': 'Base',
|
||||
'engine_id': engine_id,
|
||||
'transmission_id': None
|
||||
})
|
||||
backfill_count += 1
|
||||
existing_combos.add((year, brand_name, model_name.lower()))
|
||||
|
||||
print(f" ✓ Backfilled {backfill_count} recent vehicle configurations")
|
||||
|
||||
def generate_vehicles_sql(self):
|
||||
"""Generate SQL file for vehicle_options"""
|
||||
print("\n📝 Generating vehicle options SQL...")
|
||||
|
||||
with open(self.vehicles_sql_file, 'w', encoding='utf-8') as f:
|
||||
f.write("-- Vehicle options data import\n")
|
||||
f.write("-- Generated by ETL script\n\n")
|
||||
f.write("BEGIN;\n\n")
|
||||
|
||||
# Write in batches of 1000
|
||||
batch_size = 1000
|
||||
total_batches = (len(self.vehicle_records) + batch_size - 1) // batch_size
|
||||
|
||||
for batch_num in range(total_batches):
|
||||
start_idx = batch_num * batch_size
|
||||
end_idx = min(start_idx + batch_size, len(self.vehicle_records))
|
||||
batch = self.vehicle_records[start_idx:end_idx]
|
||||
|
||||
f.write("INSERT INTO vehicle_options (year, make, model, trim, engine_id, transmission_id) VALUES\n")
|
||||
|
||||
values_list = []
|
||||
for record in batch:
|
||||
values = (
|
||||
record['year'],
|
||||
self.sql_escape(record['make']),
|
||||
self.sql_escape(record['model']),
|
||||
self.sql_escape(record['trim']),
|
||||
record['engine_id'] if record['engine_id'] else 'NULL',
|
||||
record['transmission_id'] if record['transmission_id'] else 'NULL'
|
||||
)
|
||||
values_list.append(f"({','.join(map(str, values))})")
|
||||
|
||||
f.write(",\n".join(values_list))
|
||||
f.write(";\n\n")
|
||||
|
||||
print(f" Batch {batch_num + 1}/{total_batches} written ({len(batch)} records)")
|
||||
|
||||
f.write("COMMIT;\n")
|
||||
|
||||
print(f" ✓ Wrote {len(self.vehicle_records)} vehicle options to {self.vehicles_sql_file}")
|
||||
|
||||
def generate_stats(self):
|
||||
"""Generate statistics file"""
|
||||
print("\n📊 Generating statistics...")
|
||||
|
||||
stats = {
|
||||
'total_engines': len(self.engines_data),
|
||||
'total_transmissions': len(self.transmission_cache),
|
||||
'total_vehicles': len(self.vehicle_records),
|
||||
'unique_years': len(set(r['year'] for r in self.vehicle_records)),
|
||||
'unique_makes': len(set(r['make'] for r in self.vehicle_records)),
|
||||
'unique_models': len(set(r['model'] for r in self.vehicle_records)),
|
||||
'year_range': f"{min(r['year'] for r in self.vehicle_records)}-{max(r['year'] for r in self.vehicle_records)}"
|
||||
}
|
||||
|
||||
with open('output/stats.txt', 'w') as f:
|
||||
f.write("=" * 60 + "\n")
|
||||
f.write("ETL Statistics\n")
|
||||
f.write("=" * 60 + "\n\n")
|
||||
for key, value in stats.items():
|
||||
formatted_value = f"{value:,}" if isinstance(value, int) else value
|
||||
f.write(f"{key.replace('_', ' ').title()}: {formatted_value}\n")
|
||||
|
||||
print("\n📊 Statistics:")
|
||||
for key, value in stats.items():
|
||||
formatted_value = f"{value:,}" if isinstance(value, int) else value
|
||||
print(f" {key.replace('_', ' ').title()}: {formatted_value}")
|
||||
|
||||
def run(self):
|
||||
"""Execute the complete ETL pipeline"""
|
||||
try:
|
||||
print("=" * 60)
|
||||
print("🚀 Automotive Vehicle ETL - SQL Generator")
|
||||
print(f" Year Filter: {self.min_year} and newer")
|
||||
print("=" * 60)
|
||||
|
||||
self.load_json_files()
|
||||
self.generate_engines_sql()
|
||||
self.process_makes_filter()
|
||||
self.hybrid_backfill()
|
||||
self.generate_vehicles_sql()
|
||||
self.generate_stats()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("✅ SQL Files Generated Successfully!")
|
||||
print("=" * 60)
|
||||
print("\nGenerated files:")
|
||||
print(f" - {self.engines_sql_file}")
|
||||
print(f" - {self.transmissions_sql_file}")
|
||||
print(f" - {self.vehicles_sql_file}")
|
||||
print(f" - output/stats.txt")
|
||||
print("\nNext step: Import SQL files into database")
|
||||
print(" cat output/*.sql | docker exec -i mvp-postgres psql -U postgres -d motovaultpro")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ ETL Pipeline Failed: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
if __name__ == '__main__':
|
||||
etl = VehicleSQLGenerator()
|
||||
etl.run()
|
||||
Reference in New Issue
Block a user