239 lines
7.1 KiB
Python
239 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Generate SQL import files from a VehAPI snapshot SQLite database.
|
|
|
|
Reads observed compatibility pairs from the snapshot (trim-filtered engine<->transmission pairs)
|
|
and produces:
|
|
- output/01_engines.sql
|
|
- output/02_transmissions.sql
|
|
- output/03_vehicle_options.sql
|
|
|
|
No legacy JSON or network calls are used. The snapshot path is provided via CLI flag.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Sequence
|
|
|
|
BATCH_SIZE = 1000
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate SQL files from a VehAPI snapshot (SQLite).",
|
|
)
|
|
parser.add_argument(
|
|
"--snapshot-path",
|
|
type=Path,
|
|
default=os.environ.get("SNAPSHOT_PATH"),
|
|
help="Path to snapshots/<date>/snapshot.sqlite produced by vehapi_fetch_snapshot.py (or env SNAPSHOT_PATH)",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
type=Path,
|
|
default=Path("output"),
|
|
help="Directory to write SQL output files (default: output)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_pairs(snapshot_path: Path) -> List[sqlite3.Row]:
|
|
if not snapshot_path.exists():
|
|
raise FileNotFoundError(f"Snapshot not found: {snapshot_path}")
|
|
|
|
conn = sqlite3.connect(snapshot_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT
|
|
year,
|
|
make,
|
|
model,
|
|
trim,
|
|
engine_display,
|
|
engine_canon,
|
|
engine_bucket,
|
|
trans_display,
|
|
trans_canon,
|
|
trans_bucket
|
|
FROM pairs
|
|
ORDER BY year, make, model, trim, engine_canon, trans_canon
|
|
"""
|
|
)
|
|
rows = cursor.fetchall()
|
|
except sqlite3.Error as exc:
|
|
raise RuntimeError(f"Failed to read pairs from snapshot: {exc}") from exc
|
|
finally:
|
|
conn.close()
|
|
|
|
if not rows:
|
|
raise ValueError("Snapshot contains no rows in pairs table.")
|
|
|
|
return rows
|
|
|
|
|
|
def choose_engine_label(engine_display: str, engine_bucket: str, engine_canon: str) -> str:
|
|
"""
|
|
Use VehAPI display string when present, otherwise fall back to the bucket label,
|
|
and finally to the canonical key to avoid empty names.
|
|
"""
|
|
if engine_display:
|
|
return engine_display
|
|
if engine_bucket:
|
|
return engine_bucket
|
|
return engine_canon
|
|
|
|
|
|
def choose_trans_label(trans_display: str, trans_bucket: str, trans_canon: str) -> str:
|
|
if trans_display:
|
|
return trans_display
|
|
if trans_bucket:
|
|
return trans_bucket
|
|
return trans_canon
|
|
|
|
|
|
def build_engine_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]:
|
|
engines: Dict[str, Dict] = {}
|
|
for row in rows:
|
|
canon = row["engine_canon"]
|
|
if canon is None or canon == "":
|
|
raise ValueError(f"Missing engine_canon for row: {dict(row)}")
|
|
if canon in engines:
|
|
continue
|
|
engines[canon] = {
|
|
"id": len(engines) + 1,
|
|
"name": choose_engine_label(row["engine_display"], row["engine_bucket"], canon),
|
|
"fuel_type": row["engine_bucket"] or None,
|
|
}
|
|
return engines
|
|
|
|
|
|
def build_transmission_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]:
|
|
transmissions: Dict[str, Dict] = {}
|
|
for row in rows:
|
|
canon = row["trans_canon"]
|
|
if canon is None or canon == "":
|
|
raise ValueError(f"Missing trans_canon for row: {dict(row)}")
|
|
if canon in transmissions:
|
|
continue
|
|
transmissions[canon] = {
|
|
"id": len(transmissions) + 1,
|
|
"type": choose_trans_label(row["trans_display"], row["trans_bucket"], canon),
|
|
}
|
|
return transmissions
|
|
|
|
|
|
def build_vehicle_options(
|
|
rows: Sequence[sqlite3.Row],
|
|
engine_map: Dict[str, Dict],
|
|
trans_map: Dict[str, Dict],
|
|
) -> List[Dict]:
|
|
options: List[Dict] = []
|
|
for row in rows:
|
|
engine_canon = row["engine_canon"]
|
|
trans_canon = row["trans_canon"]
|
|
options.append(
|
|
{
|
|
"year": int(row["year"]),
|
|
"make": row["make"],
|
|
"model": row["model"],
|
|
"trim": row["trim"],
|
|
"engine_id": engine_map[engine_canon]["id"],
|
|
"transmission_id": trans_map[trans_canon]["id"],
|
|
}
|
|
)
|
|
return options
|
|
|
|
|
|
def sql_value(value):
|
|
if value is None:
|
|
return "NULL"
|
|
if isinstance(value, str):
|
|
return "'" + value.replace("'", "''") + "'"
|
|
return str(value)
|
|
|
|
|
|
def chunked(seq: Iterable[Dict], size: int) -> Iterable[List[Dict]]:
|
|
chunk: List[Dict] = []
|
|
for item in seq:
|
|
chunk.append(item)
|
|
if len(chunk) >= size:
|
|
yield chunk
|
|
chunk = []
|
|
if chunk:
|
|
yield chunk
|
|
|
|
|
|
def write_insert_file(
|
|
path: Path,
|
|
table: str,
|
|
columns: Sequence[str],
|
|
rows: Sequence[Dict],
|
|
):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as f:
|
|
f.write(f"-- Auto-generated by etl_generate_sql.py\n")
|
|
if not rows:
|
|
f.write(f"-- No rows for {table}\n")
|
|
return
|
|
|
|
for batch in chunked(rows, BATCH_SIZE):
|
|
values_sql = ",\n".join(
|
|
"(" + ",".join(sql_value(row[col]) for col in columns) + ")"
|
|
for row in batch
|
|
)
|
|
f.write(f"INSERT INTO {table} ({', '.join(columns)}) VALUES\n{values_sql};\n\n")
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
snapshot_path: Path = args.snapshot_path
|
|
output_dir: Path = args.output_dir
|
|
if snapshot_path is None:
|
|
raise SystemExit("Snapshot path is required. Pass --snapshot-path or set SNAPSHOT_PATH.")
|
|
|
|
print(f"Reading snapshot: {snapshot_path}")
|
|
rows = load_pairs(snapshot_path)
|
|
years = sorted({int(row["year"]) for row in rows})
|
|
print(f" Loaded {len(rows):,} observed engine<->transmission pairs across {len(years)} years")
|
|
|
|
engines = build_engine_dimension(rows)
|
|
transmissions = build_transmission_dimension(rows)
|
|
vehicle_options = build_vehicle_options(rows, engines, transmissions)
|
|
|
|
print(f"Engines: {len(engines):,}")
|
|
print(f"Transmissions: {len(transmissions):,}")
|
|
print(f"Vehicle options (observed pairs): {len(vehicle_options):,}")
|
|
|
|
write_insert_file(
|
|
output_dir / "01_engines.sql",
|
|
"engines",
|
|
["id", "name", "fuel_type"],
|
|
engines.values(),
|
|
)
|
|
write_insert_file(
|
|
output_dir / "02_transmissions.sql",
|
|
"transmissions",
|
|
["id", "type"],
|
|
transmissions.values(),
|
|
)
|
|
write_insert_file(
|
|
output_dir / "03_vehicle_options.sql",
|
|
"vehicle_options",
|
|
["year", "make", "model", "trim", "engine_id", "transmission_id"],
|
|
vehicle_options,
|
|
)
|
|
|
|
print("\nSQL files generated:")
|
|
print(f" - {output_dir / '01_engines.sql'}")
|
|
print(f" - {output_dir / '02_transmissions.sql'}")
|
|
print(f" - {output_dir / '03_vehicle_options.sql'}")
|
|
print(f"\nYear coverage: {years[0]}-{years[-1]}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|