Files
motovaultpro/data/vehicle-etl/etl_generate_sql.py

242 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
Generate SQL import files from a VehAPI snapshot SQLite database.
Reads observed compatibility pairs from the snapshot (trim-filtered engine<->transmission pairs)
and produces:
- output/01_engines.sql
- output/02_transmissions.sql
- output/03_vehicle_options.sql
No legacy JSON or network calls are used. The snapshot path is provided via CLI flag.
"""
import argparse
import os
import sqlite3
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
BATCH_SIZE = 1000
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate SQL files from a VehAPI snapshot (SQLite).",
)
parser.add_argument(
"--snapshot-path",
type=Path,
default=os.environ.get("SNAPSHOT_PATH"),
help="Path to snapshots/<date>/snapshot.sqlite produced by vehapi_fetch_snapshot.py (or env SNAPSHOT_PATH)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=Path("output"),
help="Directory to write SQL output files (default: output)",
)
return parser.parse_args()
def load_pairs(snapshot_path: Path) -> List[sqlite3.Row]:
if not snapshot_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_path}")
# Open in immutable mode to prevent any write attempts on read-only filesystems
absolute_path = snapshot_path.resolve()
uri = f"file:{absolute_path}?immutable=1"
conn = sqlite3.connect(uri, uri=True)
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"""
SELECT
year,
make,
model,
trim,
engine_display,
engine_canon,
engine_bucket,
trans_display,
trans_canon,
trans_bucket
FROM pairs
ORDER BY year, make, model, trim, engine_canon, trans_canon
"""
)
rows = cursor.fetchall()
except sqlite3.Error as exc:
raise RuntimeError(f"Failed to read pairs from snapshot: {exc}") from exc
finally:
conn.close()
if not rows:
raise ValueError("Snapshot contains no rows in pairs table.")
return rows
def choose_engine_label(engine_display: str, engine_bucket: str, engine_canon: str) -> str:
"""
Use VehAPI display string when present, otherwise fall back to the bucket label,
and finally to the canonical key to avoid empty names.
"""
if engine_display:
return engine_display
if engine_bucket:
return engine_bucket
return engine_canon
def choose_trans_label(trans_display: str, trans_bucket: str, trans_canon: str) -> str:
if trans_display:
return trans_display
if trans_bucket:
return trans_bucket
return trans_canon
def build_engine_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]:
engines: Dict[str, Dict] = {}
for row in rows:
canon = row["engine_canon"]
if canon is None or canon == "":
raise ValueError(f"Missing engine_canon for row: {dict(row)}")
if canon in engines:
continue
engines[canon] = {
"id": len(engines) + 1,
"name": choose_engine_label(row["engine_display"], row["engine_bucket"], canon),
"fuel_type": row["engine_bucket"] or None,
}
return engines
def build_transmission_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]:
transmissions: Dict[str, Dict] = {}
for row in rows:
canon = row["trans_canon"]
if canon is None or canon == "":
raise ValueError(f"Missing trans_canon for row: {dict(row)}")
if canon in transmissions:
continue
transmissions[canon] = {
"id": len(transmissions) + 1,
"type": choose_trans_label(row["trans_display"], row["trans_bucket"], canon),
}
return transmissions
def build_vehicle_options(
rows: Sequence[sqlite3.Row],
engine_map: Dict[str, Dict],
trans_map: Dict[str, Dict],
) -> List[Dict]:
options: List[Dict] = []
for row in rows:
engine_canon = row["engine_canon"]
trans_canon = row["trans_canon"]
options.append(
{
"year": int(row["year"]),
"make": row["make"],
"model": row["model"],
"trim": row["trim"],
"engine_id": engine_map[engine_canon]["id"],
"transmission_id": trans_map[trans_canon]["id"],
}
)
return options
def sql_value(value):
if value is None:
return "NULL"
if isinstance(value, str):
return "'" + value.replace("'", "''") + "'"
return str(value)
def chunked(seq: Iterable[Dict], size: int) -> Iterable[List[Dict]]:
chunk: List[Dict] = []
for item in seq:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
def write_insert_file(
path: Path,
table: str,
columns: Sequence[str],
rows: Sequence[Dict],
):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
f.write(f"-- Auto-generated by etl_generate_sql.py\n")
if not rows:
f.write(f"-- No rows for {table}\n")
return
for batch in chunked(rows, BATCH_SIZE):
values_sql = ",\n".join(
"(" + ",".join(sql_value(row[col]) for col in columns) + ")"
for row in batch
)
f.write(f"INSERT INTO {table} ({', '.join(columns)}) VALUES\n{values_sql};\n\n")
def main():
args = parse_args()
snapshot_path: Path = args.snapshot_path
output_dir: Path = args.output_dir
if snapshot_path is None:
raise SystemExit("Snapshot path is required. Pass --snapshot-path or set SNAPSHOT_PATH.")
print(f"Reading snapshot: {snapshot_path}")
rows = load_pairs(snapshot_path)
years = sorted({int(row["year"]) for row in rows})
print(f" Loaded {len(rows):,} observed engine<->transmission pairs across {len(years)} years")
engines = build_engine_dimension(rows)
transmissions = build_transmission_dimension(rows)
vehicle_options = build_vehicle_options(rows, engines, transmissions)
print(f"Engines: {len(engines):,}")
print(f"Transmissions: {len(transmissions):,}")
print(f"Vehicle options (observed pairs): {len(vehicle_options):,}")
write_insert_file(
output_dir / "01_engines.sql",
"engines",
["id", "name", "fuel_type"],
engines.values(),
)
write_insert_file(
output_dir / "02_transmissions.sql",
"transmissions",
["id", "type"],
transmissions.values(),
)
write_insert_file(
output_dir / "03_vehicle_options.sql",
"vehicle_options",
["year", "make", "model", "trim", "engine_id", "transmission_id"],
vehicle_options,
)
print("\nSQL files generated:")
print(f" - {output_dir / '01_engines.sql'}")
print(f" - {output_dir / '02_transmissions.sql'}")
print(f" - {output_dir / '03_vehicle_options.sql'}")
print(f"\nYear coverage: {years[0]}-{years[-1]}")
if __name__ == "__main__":
main()