#!/usr/bin/env python3 """ Generate SQL import files from a VehAPI snapshot SQLite database. Reads observed compatibility pairs from the snapshot (trim-filtered engine<->transmission pairs) and produces: - output/01_engines.sql - output/02_transmissions.sql - output/03_vehicle_options.sql No legacy JSON or network calls are used. The snapshot path is provided via CLI flag. """ import argparse import os import sqlite3 from pathlib import Path from typing import Dict, Iterable, List, Sequence BATCH_SIZE = 1000 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate SQL files from a VehAPI snapshot (SQLite).", ) parser.add_argument( "--snapshot-path", type=Path, default=os.environ.get("SNAPSHOT_PATH"), help="Path to snapshots//snapshot.sqlite produced by vehapi_fetch_snapshot.py (or env SNAPSHOT_PATH)", ) parser.add_argument( "--output-dir", type=Path, default=Path("output"), help="Directory to write SQL output files (default: output)", ) return parser.parse_args() def load_pairs(snapshot_path: Path) -> List[sqlite3.Row]: if not snapshot_path.exists(): raise FileNotFoundError(f"Snapshot not found: {snapshot_path}") # Open in immutable mode to prevent any write attempts on read-only filesystems absolute_path = snapshot_path.resolve() uri = f"file:{absolute_path}?immutable=1" conn = sqlite3.connect(uri, uri=True) conn.row_factory = sqlite3.Row try: cursor = conn.execute( """ SELECT year, make, model, trim, engine_display, engine_canon, engine_bucket, trans_display, trans_canon, trans_bucket FROM pairs ORDER BY year, make, model, trim, engine_canon, trans_canon """ ) rows = cursor.fetchall() except sqlite3.Error as exc: raise RuntimeError(f"Failed to read pairs from snapshot: {exc}") from exc finally: conn.close() if not rows: raise ValueError("Snapshot contains no rows in pairs table.") return rows def choose_engine_label(engine_display: str, engine_bucket: str, engine_canon: str) -> str: """ Use VehAPI display string when present, otherwise fall back to the bucket label, and finally to the canonical key to avoid empty names. """ if engine_display: return engine_display if engine_bucket: return engine_bucket return engine_canon def choose_trans_label(trans_display: str, trans_bucket: str, trans_canon: str) -> str: if trans_display: return trans_display if trans_bucket: return trans_bucket return trans_canon def build_engine_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]: engines: Dict[str, Dict] = {} for row in rows: canon = row["engine_canon"] if canon is None or canon == "": raise ValueError(f"Missing engine_canon for row: {dict(row)}") if canon in engines: continue engines[canon] = { "id": len(engines) + 1, "name": choose_engine_label(row["engine_display"], row["engine_bucket"], canon), "fuel_type": row["engine_bucket"] or None, } return engines def build_transmission_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]: transmissions: Dict[str, Dict] = {} for row in rows: canon = row["trans_canon"] if canon is None or canon == "": raise ValueError(f"Missing trans_canon for row: {dict(row)}") if canon in transmissions: continue transmissions[canon] = { "id": len(transmissions) + 1, "type": choose_trans_label(row["trans_display"], row["trans_bucket"], canon), } return transmissions def build_vehicle_options( rows: Sequence[sqlite3.Row], engine_map: Dict[str, Dict], trans_map: Dict[str, Dict], ) -> List[Dict]: options: List[Dict] = [] for row in rows: engine_canon = row["engine_canon"] trans_canon = row["trans_canon"] options.append( { "year": int(row["year"]), "make": row["make"], "model": row["model"], "trim": row["trim"], "engine_id": engine_map[engine_canon]["id"], "transmission_id": trans_map[trans_canon]["id"], } ) return options def sql_value(value): if value is None: return "NULL" if isinstance(value, str): return "'" + value.replace("'", "''") + "'" return str(value) def chunked(seq: Iterable[Dict], size: int) -> Iterable[List[Dict]]: chunk: List[Dict] = [] for item in seq: chunk.append(item) if len(chunk) >= size: yield chunk chunk = [] if chunk: yield chunk def write_insert_file( path: Path, table: str, columns: Sequence[str], rows: Sequence[Dict], ): path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: f.write(f"-- Auto-generated by etl_generate_sql.py\n") if not rows: f.write(f"-- No rows for {table}\n") return for batch in chunked(rows, BATCH_SIZE): values_sql = ",\n".join( "(" + ",".join(sql_value(row[col]) for col in columns) + ")" for row in batch ) f.write(f"INSERT INTO {table} ({', '.join(columns)}) VALUES\n{values_sql};\n\n") def main(): args = parse_args() snapshot_path: Path = args.snapshot_path output_dir: Path = args.output_dir if snapshot_path is None: raise SystemExit("Snapshot path is required. Pass --snapshot-path or set SNAPSHOT_PATH.") print(f"Reading snapshot: {snapshot_path}") rows = load_pairs(snapshot_path) years = sorted({int(row["year"]) for row in rows}) print(f" Loaded {len(rows):,} observed engine<->transmission pairs across {len(years)} years") engines = build_engine_dimension(rows) transmissions = build_transmission_dimension(rows) vehicle_options = build_vehicle_options(rows, engines, transmissions) print(f"Engines: {len(engines):,}") print(f"Transmissions: {len(transmissions):,}") print(f"Vehicle options (observed pairs): {len(vehicle_options):,}") write_insert_file( output_dir / "01_engines.sql", "engines", ["id", "name", "fuel_type"], engines.values(), ) write_insert_file( output_dir / "02_transmissions.sql", "transmissions", ["id", "type"], transmissions.values(), ) write_insert_file( output_dir / "03_vehicle_options.sql", "vehicle_options", ["year", "make", "model", "trim", "engine_id", "transmission_id"], vehicle_options, ) print("\nSQL files generated:") print(f" - {output_dir / '01_engines.sql'}") print(f" - {output_dir / '02_transmissions.sql'}") print(f" - {output_dir / '03_vehicle_options.sql'}") print(f"\nYear coverage: {years[0]}-{years[-1]}") if __name__ == "__main__": main()