Possible working ETL

This commit is contained in:
Eric Gullickson
2025-12-15 18:19:55 -06:00
parent 1fc69b7779
commit 1e599e334f
110 changed files with 4843 additions and 2078706 deletions

View File

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
Generate SQL import files from a VehAPI snapshot SQLite database.
Reads observed compatibility pairs from the snapshot (trim-filtered engine<->transmission pairs)
and produces:
- output/01_engines.sql
- output/02_transmissions.sql
- output/03_vehicle_options.sql
No legacy JSON or network calls are used. The snapshot path is provided via CLI flag.
"""
import argparse
import os
import sqlite3
from pathlib import Path
from typing import Dict, Iterable, List, Sequence
BATCH_SIZE = 1000
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate SQL files from a VehAPI snapshot (SQLite).",
)
parser.add_argument(
"--snapshot-path",
type=Path,
default=os.environ.get("SNAPSHOT_PATH"),
help="Path to snapshots/<date>/snapshot.sqlite produced by vehapi_fetch_snapshot.py (or env SNAPSHOT_PATH)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=Path("output"),
help="Directory to write SQL output files (default: output)",
)
return parser.parse_args()
def load_pairs(snapshot_path: Path) -> List[sqlite3.Row]:
if not snapshot_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_path}")
conn = sqlite3.connect(snapshot_path)
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"""
SELECT
year,
make,
model,
trim,
engine_display,
engine_canon,
engine_bucket,
trans_display,
trans_canon,
trans_bucket
FROM pairs
ORDER BY year, make, model, trim, engine_canon, trans_canon
"""
)
rows = cursor.fetchall()
except sqlite3.Error as exc:
raise RuntimeError(f"Failed to read pairs from snapshot: {exc}") from exc
finally:
conn.close()
if not rows:
raise ValueError("Snapshot contains no rows in pairs table.")
return rows
def choose_engine_label(engine_display: str, engine_bucket: str, engine_canon: str) -> str:
"""
Use VehAPI display string when present, otherwise fall back to the bucket label,
and finally to the canonical key to avoid empty names.
"""
if engine_display:
return engine_display
if engine_bucket:
return engine_bucket
return engine_canon
def choose_trans_label(trans_display: str, trans_bucket: str, trans_canon: str) -> str:
if trans_display:
return trans_display
if trans_bucket:
return trans_bucket
return trans_canon
def build_engine_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]:
engines: Dict[str, Dict] = {}
for row in rows:
canon = row["engine_canon"]
if canon is None or canon == "":
raise ValueError(f"Missing engine_canon for row: {dict(row)}")
if canon in engines:
continue
engines[canon] = {
"id": len(engines) + 1,
"name": choose_engine_label(row["engine_display"], row["engine_bucket"], canon),
"fuel_type": row["engine_bucket"] or None,
}
return engines
def build_transmission_dimension(rows: Sequence[sqlite3.Row]) -> Dict[str, Dict]:
transmissions: Dict[str, Dict] = {}
for row in rows:
canon = row["trans_canon"]
if canon is None or canon == "":
raise ValueError(f"Missing trans_canon for row: {dict(row)}")
if canon in transmissions:
continue
transmissions[canon] = {
"id": len(transmissions) + 1,
"type": choose_trans_label(row["trans_display"], row["trans_bucket"], canon),
}
return transmissions
def build_vehicle_options(
rows: Sequence[sqlite3.Row],
engine_map: Dict[str, Dict],
trans_map: Dict[str, Dict],
) -> List[Dict]:
options: List[Dict] = []
for row in rows:
engine_canon = row["engine_canon"]
trans_canon = row["trans_canon"]
options.append(
{
"year": int(row["year"]),
"make": row["make"],
"model": row["model"],
"trim": row["trim"],
"engine_id": engine_map[engine_canon]["id"],
"transmission_id": trans_map[trans_canon]["id"],
}
)
return options
def sql_value(value):
if value is None:
return "NULL"
if isinstance(value, str):
return "'" + value.replace("'", "''") + "'"
return str(value)
def chunked(seq: Iterable[Dict], size: int) -> Iterable[List[Dict]]:
chunk: List[Dict] = []
for item in seq:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
def write_insert_file(
path: Path,
table: str,
columns: Sequence[str],
rows: Sequence[Dict],
):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
f.write(f"-- Auto-generated by etl_generate_sql.py\n")
if not rows:
f.write(f"-- No rows for {table}\n")
return
for batch in chunked(rows, BATCH_SIZE):
values_sql = ",\n".join(
"(" + ",".join(sql_value(row[col]) for col in columns) + ")"
for row in batch
)
f.write(f"INSERT INTO {table} ({', '.join(columns)}) VALUES\n{values_sql};\n\n")
def main():
args = parse_args()
snapshot_path: Path = args.snapshot_path
output_dir: Path = args.output_dir
if snapshot_path is None:
raise SystemExit("Snapshot path is required. Pass --snapshot-path or set SNAPSHOT_PATH.")
print(f"Reading snapshot: {snapshot_path}")
rows = load_pairs(snapshot_path)
years = sorted({int(row["year"]) for row in rows})
print(f" Loaded {len(rows):,} observed engine<->transmission pairs across {len(years)} years")
engines = build_engine_dimension(rows)
transmissions = build_transmission_dimension(rows)
vehicle_options = build_vehicle_options(rows, engines, transmissions)
print(f"Engines: {len(engines):,}")
print(f"Transmissions: {len(transmissions):,}")
print(f"Vehicle options (observed pairs): {len(vehicle_options):,}")
write_insert_file(
output_dir / "01_engines.sql",
"engines",
["id", "name", "fuel_type"],
engines.values(),
)
write_insert_file(
output_dir / "02_transmissions.sql",
"transmissions",
["id", "type"],
transmissions.values(),
)
write_insert_file(
output_dir / "03_vehicle_options.sql",
"vehicle_options",
["year", "make", "model", "trim", "engine_id", "transmission_id"],
vehicle_options,
)
print("\nSQL files generated:")
print(f" - {output_dir / '01_engines.sql'}")
print(f" - {output_dir / '02_transmissions.sql'}")
print(f" - {output_dir / '03_vehicle_options.sql'}")
print(f"\nYear coverage: {years[0]}-{years[-1]}")
if __name__ == "__main__":
main()