Files
motovaultpro/data/make-model-import/etl_generate_sql.py
2025-12-14 14:53:45 -06:00

654 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
ETL Script for Automotive Vehicle Selection Database
Generates SQL import files from local scraped data only (no network).
Output is constrained to a configurable year window (default 20002026).
"""
import json
import os
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
class VehicleSQLGenerator:
def __init__(self):
self.makes_filter_dir = Path("makes-filter")
self.engines_data: List[Dict] = []
self.automobiles_data: List[Dict] = []
self.brands_data: List[Dict] = []
# Year window (configurable)
self.min_year = int(os.getenv("MIN_YEAR", "2000"))
self.max_year = int(os.getenv("MAX_YEAR", "2026"))
# Output SQL files
self.engines_sql_file = "output/01_engines.sql"
self.transmissions_sql_file = "output/02_transmissions.sql"
self.vehicles_sql_file = "output/03_vehicle_options.sql"
# Data structures populated during ETL
self.brand_name_map: Dict[str, str] = {}
self.known_models_by_make: Dict[str, Set[str]] = defaultdict(set)
# Baseline records from makes-filter with trims/engines per year/make/model
self.baseline_records: List[Dict] = []
self.evidence_by_model: Dict[Tuple[str, str], List[Dict]] = defaultdict(list)
self.vehicle_records: List[Dict] = []
# Dimension maps (populated after vehicle records are built)
self.engine_name_to_id: Dict[str, int] = {}
self.trans_name_to_id: Dict[str, int] = {}
# ------------------------------------------------------------------
# Loading and helper utilities
# ------------------------------------------------------------------
def load_json_files(self):
print("\n📂 Loading source JSON files...")
with open("engines.json", "r", encoding="utf-8") as f:
self.engines_data = json.load(f)
print(f" ✓ Loaded {len(self.engines_data):,} engine records")
with open("automobiles.json", "r", encoding="utf-8") as f:
self.automobiles_data = json.load(f)
print(f" ✓ Loaded {len(self.automobiles_data):,} automobile records")
with open("brands.json", "r", encoding="utf-8") as f:
self.brands_data = json.load(f)
print(f" ✓ Loaded {len(self.brands_data):,} brand records")
self.build_brand_name_map()
def build_brand_name_map(self):
keep_uppercase = {
"BMW",
"GMC",
"AC",
"MG",
"KIA",
"MINI",
"FIAT",
"RAM",
"KTM",
"FSO",
"ARO",
"TVR",
"NIO",
}
special_cases = {"delorean": "DeLorean", "mclaren": "McLaren"}
for brand in self.brands_data:
raw = brand.get("name", "").strip()
if not raw:
continue
slug = raw.lower().replace(" ", "_")
if slug in special_cases:
canonical = special_cases[slug]
elif raw in keep_uppercase:
canonical = raw
else:
canonical = raw.title()
self.brand_name_map[slug] = canonical
self.brand_name_map[raw.lower()] = canonical
def get_canonical_make_name(self, name: str) -> str:
slug = name.lower().replace(" ", "_")
if slug in self.brand_name_map:
return self.brand_name_map[slug]
spaced = slug.replace("_", " ")
if spaced in self.brand_name_map:
return self.brand_name_map[spaced]
return spaced.title()
def format_model_name(self, model_slug: str) -> str:
if not model_slug:
return ""
return model_slug.replace("_", " ").strip().title()
def classify_fuel_label(self, fuel: str) -> str:
fuel_lower = (fuel or "").lower()
if "electric" in fuel_lower:
return "Electric"
if "diesel" in fuel_lower:
return "Diesel"
if "hybrid" in fuel_lower:
return "Hybrid"
return "Gas"
def normalize_engine_display(self, display: Optional[str]) -> Optional[str]:
if not display:
return None
cleaned = display.strip()
return cleaned if cleaned else None
# ------------------------------------------------------------------
# Parsing helpers for evidence building
# ------------------------------------------------------------------
def parse_year_range_from_name(self, name: str) -> Optional[Tuple[int, int]]:
"""
Extract year or year range from automobile name.
Examples:
"CHEVROLET Corvette C4 Convertible 1984-1996 ..." -> (1984, 1996)
"2021-Present" -> (2021, self.max_year)
"2024 ..." -> (2024, 2024)
"""
match = re.search(r"(19|20)\d{2}(?:-(\d{4}|Present))?", name)
if not match:
return None
start = int(match.group(0).split("-")[0])
end_part = None
if "-" in match.group(0):
end_str = match.group(0).split("-")[1]
if end_str.lower() == "present":
end_part = self.max_year
else:
end_part = int(end_str)
end = end_part if end_part else start
return (max(start, self.min_year), min(end, self.max_year))
def split_model_and_trim(
self, make: str, candidate: str, known_models: Set[str]
) -> Tuple[Optional[str], Optional[str]]:
"""
Split the automobile title (with years removed) into model + trim by matching
the longest known model prefix. Returns (model, trim).
"""
candidate_clean = candidate.strip()
if not candidate_clean:
return (None, None)
# Try longest model name first
for model in sorted(known_models, key=len, reverse=True):
pattern = re.compile(rf"^{re.escape(model)}\b", re.IGNORECASE)
match = pattern.match(candidate_clean)
if match:
remaining = candidate_clean[match.end() :].strip()
model_name = model
trim_name = remaining if remaining else "Base"
return (model_name, trim_name)
# If nothing matched, give up to avoid inventing models
return (None, None)
# ------------------------------------------------------------------
# Engine / transmission formatting helpers
# ------------------------------------------------------------------
def extract_engine_specs(self, engine_record: Dict) -> Dict:
specs = engine_record.get("specs", {})
engine_specs = specs.get("Engine Specs", {})
trans_specs = specs.get("Transmission Specs", {})
return {
"name": engine_record.get("name", ""),
"displacement": engine_specs.get("Displacement:", ""),
"configuration": engine_specs.get("Cylinders:", ""),
"horsepower": engine_specs.get("Power:", ""),
"torque": engine_specs.get("Torque:", ""),
"fuel_type": engine_specs.get("Fuel:", ""),
"fuel_system": engine_specs.get("Fuel System:", ""),
"aspiration": engine_specs.get("Aspiration:", ""),
"transmission_type": trans_specs.get("Gearbox:", ""),
"drive_type": trans_specs.get("Drive Type:", ""),
"specs_json": specs,
}
def normalize_displacement(self, disp_str: str) -> Optional[str]:
if not disp_str:
return None
disp_str = disp_str.strip()
if disp_str.upper().endswith("L"):
match = re.search(r"(\d+\.?\d*)", disp_str)
if match:
liters = float(match.group(1))
return f"{liters:.1f}L"
cm3_match = re.search(r"(\d+)\s*Cm3", disp_str, re.IGNORECASE)
if cm3_match:
cm3 = int(cm3_match.group(1))
liters = cm3 / 1000.0
return f"{liters:.1f}L"
return None
def format_engine_display(self, specs: Dict) -> Optional[str]:
parts: List[str] = []
config = specs.get("configuration", "").strip()
if config:
parts.append(config.upper())
disp = self.normalize_displacement(specs.get("displacement", ""))
if not disp:
name = specs.get("name", "")
disp_match = re.search(r"(\d+\.?\d*)\s*L", name, re.IGNORECASE)
if disp_match:
disp = f"{float(disp_match.group(1)):.1f}L"
if disp:
parts.append(disp)
aspiration = specs.get("aspiration", "").strip()
fuel_system = specs.get("fuel_system", "").strip()
combined = f"{aspiration} {fuel_system}".lower()
if combined and "naturally aspirated" not in combined:
if "turbo" in combined:
parts.append("Turbo")
elif "supercharg" in combined:
parts.append("Supercharged")
if not parts:
return None
return " ".join(parts)
def format_transmission_display(self, trans_type: str, speeds: Optional[str]) -> str:
trans_clean = trans_type.strip() if trans_type else ""
if trans_clean:
trans_lower = trans_clean.lower()
if "cvt" in trans_lower:
return "CVT"
speed_text = None
if speeds:
speed_text = f"{speeds}-Speed"
elif trans_clean:
speed_match = re.search(r"(\d+)[- ]?[Ss]peed", trans_clean)
if speed_match:
speed_text = f"{speed_match.group(1)}-Speed"
kind = None
if trans_clean:
lower = trans_clean.lower()
if "manual" in lower:
kind = "Manual"
elif "automatic" in lower or "auto" in lower:
kind = "Automatic"
elif "direct" in lower or "dct" in lower:
kind = "Dual-Clutch"
if speed_text and kind:
return f"{speed_text} {kind}"
if kind:
return kind
if speed_text:
return f"{speed_text} Automatic"
return "Automatic"
def normalize_engine_string(self, engine: str) -> Optional[str]:
if not engine:
return None
eng = engine.strip()
if not eng:
return None
return eng
# ------------------------------------------------------------------
# Phase 1: Build baseline from makes-filter (year/make/model + trims/engines)
# ------------------------------------------------------------------
def build_known_models(self):
print("\n📖 Building known models from makes-filter...")
if not self.makes_filter_dir.exists():
raise FileNotFoundError("makes-filter directory not found")
for json_file in sorted(self.makes_filter_dir.glob("*.json")):
make_name = self.get_canonical_make_name(json_file.stem)
with open(json_file, "r", encoding="utf-8") as f:
make_data = json.load(f)
for _, year_entries in make_data.items():
for year_entry in year_entries:
year = int(year_entry.get("year", 0))
if year < self.min_year or year > self.max_year:
continue
for model in year_entry.get("models", []):
model_name = self.format_model_name(model.get("name", ""))
if model_name:
self.known_models_by_make[make_name].add(model_name)
total_models = sum(len(v) for v in self.known_models_by_make.values())
print(f" ✓ Collected {total_models} model names across makes")
def build_baseline_records(self):
print("\n🧩 Building baseline records (year/make/model + trims/engines)...")
records: List[Dict] = []
for json_file in sorted(self.makes_filter_dir.glob("*.json")):
make_name = self.get_canonical_make_name(json_file.stem)
with open(json_file, "r", encoding="utf-8") as f:
make_data = json.load(f)
for _, year_entries in make_data.items():
for year_entry in year_entries:
year = int(year_entry.get("year", 0))
if year < self.min_year or year > self.max_year:
continue
for model in year_entry.get("models", []):
model_name = self.format_model_name(model.get("name", ""))
if not model_name:
continue
engines = [self.normalize_engine_string(e) for e in model.get("engines", [])]
engines = [e for e in engines if e]
submodels = model.get("submodels", [])
if not submodels:
submodels = ["Base"]
trims_payload = []
for trim in submodels:
trim_name = self.format_model_name(trim)
trims_payload.append({"trim": trim_name or "Base", "engines": engines})
records.append({"year": year, "make": make_name, "model": model_name, "trims": trims_payload})
self.baseline_records = sorted(records, key=lambda r: (r["year"], r["make"].lower(), r["model"].lower()))
print(f" ✓ Baseline records: {len(self.baseline_records):,}")
# ------------------------------------------------------------------
# Phase 2: Build evidence from automobiles + engines
# ------------------------------------------------------------------
def build_automobile_evidence(self):
print("\n🔎 Building automobile evidence (trims/years/engines/transmissions)...")
brand_lookup = {b.get("id"): self.get_canonical_make_name(b.get("name", "")) for b in self.brands_data}
# Build quick index: automobile_id -> engines
engines_by_auto: Dict[int, List[Dict]] = defaultdict(list)
for engine in self.engines_data:
auto_id = engine.get("automobile_id")
if auto_id:
engines_by_auto[auto_id].append(engine)
for auto in self.automobiles_data:
auto_id = auto.get("id")
brand_id = auto.get("brand_id")
make = brand_lookup.get(brand_id)
if not make:
continue
year_range = self.parse_year_range_from_name(auto.get("name", ""))
if not year_range:
continue
year_start, year_end = year_range
if year_end < self.min_year or year_start > self.max_year:
continue
known_models = self.known_models_by_make.get(make, set())
if not known_models:
continue
name_clean = auto.get("name", "")
# Remove make prefix if present
name_clean = re.sub(rf"^{re.escape(make)}\s+", "", name_clean, flags=re.IGNORECASE)
# Remove year substring
name_clean = re.sub(r"(19|20)\d{2}(-\d{4}|-Present)?", "", name_clean).strip()
model, trim = self.split_model_and_trim(make, self.format_model_name(name_clean), known_models)
if not model:
continue
trim = self.format_model_name(trim or "Base")
engine_displays: Set[str] = set()
fuel_labels: Set[str] = set()
transmission_displays: Set[str] = set()
for engine_record in engines_by_auto.get(auto_id, []):
specs = self.extract_engine_specs(engine_record)
display = self.normalize_engine_display(self.format_engine_display(specs))
fuel_label = self.classify_fuel_label(specs.get("fuel_type", ""))
fuel_labels.add(fuel_label)
if display:
engine_displays.add(display)
trans_type = specs.get("transmission_type", "")
speed_match = re.search(r"(\d+)", trans_type) if trans_type else None
speeds = speed_match.group(1) if speed_match else None
trans_display = self.format_transmission_display(trans_type, speeds)
if trans_display:
transmission_displays.add(trans_display)
self.evidence_by_model[(make, model)].append(
{
"year_start": year_start,
"year_end": year_end,
"trim": trim,
"engines": engine_displays,
"transmissions": transmission_displays,
"fuel_labels": fuel_labels,
}
)
total_entries = sum(len(v) for v in self.evidence_by_model.values())
print(f" ✓ Evidence entries: {total_entries:,}")
# ------------------------------------------------------------------
# Phase 3: Build vehicle records combining baseline + evidence
# ------------------------------------------------------------------
def build_vehicle_records(self):
print("\n🚗 Building vehicle option records...")
records: List[Dict] = []
for baseline in self.baseline_records:
year = baseline["year"]
make = baseline["make"]
model = baseline["model"]
trims_payload = baseline["trims"]
evidence_entries = self.evidence_by_model.get((make, model), [])
applicable = [e for e in evidence_entries if e["year_start"] <= year <= e["year_end"]]
# Build evidence map per trim (case-insensitive match)
evidence_by_trim: Dict[str, Dict[str, Set[str]]] = defaultdict(lambda: {"engines": set(), "transmissions": set(), "fuel_labels": set()})
for entry in applicable:
trim_key = entry["trim"].lower()
evidence_by_trim[trim_key]["engines"].update(entry["engines"])
evidence_by_trim[trim_key]["transmissions"].update(entry["transmissions"])
evidence_by_trim[trim_key]["fuel_labels"].update(entry["fuel_labels"])
# Include evidence-only trims not present in makes-filter (if any)
all_trim_keys = set()
for t in trims_payload:
all_trim_keys.add(t["trim"].lower())
for trim_key in evidence_by_trim.keys():
all_trim_keys.add(trim_key)
if not all_trim_keys:
all_trim_keys.add("base")
for trim_key in sorted(all_trim_keys):
# If evidence exists for this trim but none cover this year, skip (avoids impossible year/trim combos)
evidence_for_trim = [e for e in evidence_entries if e["trim"].lower() == trim_key]
if evidence_for_trim and trim_key not in evidence_by_trim:
continue
# Determine trim display
trim_display = None
for t in trims_payload:
if t["trim"].lower() == trim_key:
trim_display = t["trim"]
break
if not trim_display:
# Use evidence trim name if not in makes-filter
for entry in applicable:
if entry["trim"].lower() == trim_key:
trim_display = entry["trim"]
break
if not trim_display:
trim_display = "Base"
# Engines: start from makes-filter engines for this trim
engines_set: Set[str] = set()
for t in trims_payload:
if t["trim"].lower() == trim_key:
for eng in t.get("engines", []):
norm = self.normalize_engine_string(eng)
if norm:
engines_set.add(norm)
# Overlay evidence engines for this trim
if trim_key in evidence_by_trim:
engines_set.update(evidence_by_trim[trim_key]["engines"])
# Fuel labels from evidence for fallback
fuel_labels = evidence_by_trim.get(trim_key, {}).get("fuel_labels", set())
if engines_set:
engine_names = sorted(engines_set)
else:
fallback_fuel = None
if fuel_labels:
if "Electric" in fuel_labels:
fallback_fuel = "Electric"
elif "Diesel" in fuel_labels:
fallback_fuel = "Diesel"
elif "Hybrid" in fuel_labels:
fallback_fuel = "Hybrid"
engine_names = [fallback_fuel or "Gas"]
transmissions_set: Set[str] = set()
if trim_key in evidence_by_trim:
transmissions_set.update(evidence_by_trim[trim_key]["transmissions"])
trans_names = sorted(transmissions_set) if transmissions_set else ["Manual", "Automatic"]
for engine_name in engine_names:
for trans_name in trans_names:
records.append(
{
"year": year,
"make": make,
"model": model,
"trim": trim_display,
"engine_name": engine_name,
"trans_name": trans_name,
}
)
# Deduplicate fact rows
unique_set = set()
deduped_records = []
for r in records:
key = (r["year"], r["make"].lower(), r["model"].lower(), r["trim"].lower(), r["engine_name"].lower(), r["trans_name"].lower())
if key in unique_set:
continue
unique_set.add(key)
deduped_records.append(r)
self.vehicle_records = deduped_records
print(f" ✓ Vehicle records after dedupe: {len(self.vehicle_records):,}")
# ------------------------------------------------------------------
# Phase 4: Assign dimension IDs and write SQL
# ------------------------------------------------------------------
def assign_dimension_ids(self):
engine_names = sorted({r["engine_name"] for r in self.vehicle_records})
trans_names = sorted({r["trans_name"] for r in self.vehicle_records})
self.engine_name_to_id = {name: idx + 1 for idx, name in enumerate(engine_names)}
self.trans_name_to_id = {name: idx + 1 for idx, name in enumerate(trans_names)}
def write_engines_sql(self):
os.makedirs("output", exist_ok=True)
with open(self.engines_sql_file, "w", encoding="utf-8") as f:
f.write("-- Engines data import\n-- Generated by ETL script\n\nBEGIN;\n\n")
values = []
for name, idx in sorted(self.engine_name_to_id.items(), key=lambda x: x[1]):
values.append(f"({idx},'{self.sql_escape_literal(name)}')")
f.write("INSERT INTO engines (id, name) VALUES\n")
f.write(",\n".join(values))
f.write(";\n\n")
f.write(f"SELECT setval('engines_id_seq', {len(self.engine_name_to_id)});\n\nCOMMIT;\n")
def write_transmissions_sql(self):
os.makedirs("output", exist_ok=True)
with open(self.transmissions_sql_file, "w", encoding="utf-8") as f:
f.write("-- Transmissions data import\n-- Generated by ETL script\n\nBEGIN;\n\n")
values = []
for name, idx in sorted(self.trans_name_to_id.items(), key=lambda x: x[1]):
values.append(f"({idx},'{self.sql_escape_literal(name)}')")
f.write("INSERT INTO transmissions (id, type) VALUES\n")
f.write(",\n".join(values))
f.write(";\n\n")
f.write(f"SELECT setval('transmissions_id_seq', {len(self.trans_name_to_id)});\n\nCOMMIT;\n")
def write_vehicle_options_sql(self):
os.makedirs("output", exist_ok=True)
with open(self.vehicles_sql_file, "w", encoding="utf-8") as f:
f.write("-- Vehicle options data import\n-- Generated by ETL script\n\nBEGIN;\n\n")
batch_size = 1000
total = len(self.vehicle_records)
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch = self.vehicle_records[start:end]
f.write("INSERT INTO vehicle_options (year, make, model, trim, engine_id, transmission_id) VALUES\n")
values = []
for record in batch:
engine_id = self.engine_name_to_id[record["engine_name"]]
trans_id = self.trans_name_to_id[record["trans_name"]]
values.append(
f"({record['year']},'{self.sql_escape_literal(record['make'])}','{self.sql_escape_literal(record['model'])}','{self.sql_escape_literal(record['trim'])}',{engine_id},{trans_id})"
)
f.write(",\n".join(values))
f.write(";\n\n")
f.write("COMMIT;\n")
# ------------------------------------------------------------------
# Utility
# ------------------------------------------------------------------
def sql_escape_literal(self, value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "''")
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
def generate_stats(self):
stats = {
"min_year": min(r["year"] for r in self.vehicle_records) if self.vehicle_records else None,
"max_year": max(r["year"] for r in self.vehicle_records) if self.vehicle_records else None,
"vehicle_records": len(self.vehicle_records),
"engines": len(self.engine_name_to_id),
"transmissions": len(self.trans_name_to_id),
"makes": len({r["make"] for r in self.vehicle_records}),
"models": len({(r["make"], r["model"]) for r in self.vehicle_records}),
}
with open("output/stats.txt", "w", encoding="utf-8") as f:
f.write("=" * 60 + "\n")
f.write("ETL Statistics\n")
f.write("=" * 60 + "\n\n")
for key, value in stats.items():
formatted = f"{value:,}" if isinstance(value, int) else value
f.write(f"{key.replace('_', ' ').title()}: {formatted}\n")
print("\n📊 Statistics:")
for key, value in stats.items():
formatted = f"{value:,}" if isinstance(value, int) else value
print(f" {key.replace('_', ' ').title()}: {formatted}")
# ------------------------------------------------------------------
# Run
# ------------------------------------------------------------------
def run(self):
try:
print("=" * 60)
print("🚀 Automotive Vehicle ETL - SQL Generator")
print(f" Year Window: {self.min_year}{self.max_year}")
print("=" * 60)
self.load_json_files()
self.build_known_models()
self.build_baseline_records()
self.build_automobile_evidence()
self.build_vehicle_records()
self.assign_dimension_ids()
self.write_engines_sql()
self.write_transmissions_sql()
self.write_vehicle_options_sql()
self.generate_stats()
print("\n" + "=" * 60)
print("✅ SQL Files Generated Successfully!")
print("=" * 60)
print("\nGenerated files:")
print(f" - {self.engines_sql_file}")
print(f" - {self.transmissions_sql_file}")
print(f" - {self.vehicles_sql_file}")
print(f" - output/stats.txt")
print("\nNext step: Import SQL files into database")
print(" ./import_data.sh")
except Exception as e:
print(f"\n❌ ETL Pipeline Failed: {e}")
import traceback
traceback.print_exc()
raise
if __name__ == "__main__":
VehicleSQLGenerator().run()