Files
motovaultpro/scripts/load_vehicle_data.py
2025-11-07 13:51:47 -06:00

343 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Populate the normalized vehicles schema (make/model/model_year/trim/engine)
from the JSON sources in data/make-models.
Example:
PGPASSWORD=$(cat secrets/app/postgres-password.txt) \\
python3 scripts/load_vehicle_data.py \\
--db-user postgres --db-name motovaultpro --db-host 127.0.0.1
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from collections import Counter
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
try:
import psycopg
except ImportError as exc: # pragma: no cover - ease troubleshooting
sys.stderr.write(
"Error: psycopg is required. Install with `pip install psycopg[binary]`.\n",
)
raise
DEFAULT_DATA_DIR = Path(__file__).resolve().parents[1] / "data" / "make-models"
DEFAULT_TRANSMISSIONS = ("Automatic", "Manual")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Load vehicle dropdown data into Postgres.")
parser.add_argument(
"--data-dir",
default=str(DEFAULT_DATA_DIR),
help=f"Directory with make JSON files (default: {DEFAULT_DATA_DIR})",
)
parser.add_argument(
"--database-url",
help="Full postgres URL. Falls back to PG* environment variables if omitted.",
)
parser.add_argument("--db-host", default=os.environ.get("PGHOST", "127.0.0.1"))
parser.add_argument("--db-port", type=int, default=int(os.environ.get("PGPORT", 5432)))
parser.add_argument("--db-name", default=os.environ.get("PGDATABASE", "motovaultpro"))
parser.add_argument("--db-user", default=os.environ.get("PGUSER", "postgres"))
parser.add_argument("--db-password", default=os.environ.get("PGPASSWORD"))
parser.add_argument(
"--transmissions",
default=",".join(DEFAULT_TRANSMISSIONS),
help="Comma-separated list of transmission labels (default: Automatic,Manual)",
)
parser.add_argument(
"--skip-truncate",
action="store_true",
help="Do not truncate lookup tables before loading (useful for incremental testing).",
)
return parser.parse_args()
def build_conninfo(args: argparse.Namespace) -> str:
if args.database_url:
return args.database_url
parts = [
f"host={args.db_host}",
f"port={args.db_port}",
f"dbname={args.db_name}",
f"user={args.db_user}",
]
if args.db_password:
parts.append(f"password={args.db_password}")
return " ".join(parts)
def load_json_documents(data_dir: Path) -> List[Tuple[str, dict]]:
if not data_dir.exists() or not data_dir.is_dir():
raise FileNotFoundError(f"Data directory not found: {data_dir}")
documents: List[Tuple[str, dict]] = []
for file_path in sorted(data_dir.glob("*.json")):
with file_path.open("r", encoding="utf-8") as handle:
try:
documents.append((file_path.name, json.load(handle)))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in {file_path}: {exc}") from exc
if not documents:
raise RuntimeError(f"No JSON files found under {data_dir}")
return documents
def clean_label(value: str) -> str:
text = str(value or "").replace("_", " ").strip()
text = re.sub(r"\s+", " ", text)
return text
def normalize_key(value: str) -> str:
text = clean_label(value).lower()
return text
def unique_labels(values: Iterable[str]) -> List[str]:
seen = set()
result: List[str] = []
for value in values:
label = clean_label(value)
if not label:
continue
key = normalize_key(label)
if key in seen:
continue
seen.add(key)
result.append(label)
return result
class LoaderCaches:
def __init__(self) -> None:
self.makes: Dict[str, int] = {}
self.models: Dict[Tuple[int, str], int] = {}
self.model_years: Dict[Tuple[int, int], int] = {}
self.trims: Dict[Tuple[int, str], int] = {}
self.engines: Dict[str, int] = {}
class LoaderStats:
def __init__(self) -> None:
self.counter = Counter()
def as_dict(self) -> Dict[str, int]:
return dict(self.counter)
def bump(self, key: str, amount: int = 1) -> None:
self.counter[key] += amount
def truncate_lookup_tables(cur: psycopg.Cursor) -> None:
cur.execute("TRUNCATE vehicles.trim_engine, vehicles.trim_transmission RESTART IDENTITY CASCADE")
cur.execute("TRUNCATE vehicles.trim, vehicles.model_year, vehicles.model, vehicles.make RESTART IDENTITY CASCADE")
cur.execute("TRUNCATE vehicles.engine, vehicles.transmission RESTART IDENTITY CASCADE")
def ensure_transmissions(cur: psycopg.Cursor, names: Iterable[str]) -> None:
for name in unique_labels(names):
cur.execute(
"""
INSERT INTO vehicles.transmission (name)
VALUES (%s)
ON CONFLICT (name) DO NOTHING
""",
(name,),
)
def upsert_make(cur: psycopg.Cursor, caches: LoaderCaches, name: str) -> int:
key = normalize_key(name)
if key in caches.makes:
return caches.makes[key]
cur.execute(
"""
INSERT INTO vehicles.make (name)
VALUES (%s)
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
RETURNING id
""",
(name,),
)
make_id = cur.fetchone()[0]
caches.makes[key] = make_id
return make_id
def upsert_model(cur: psycopg.Cursor, caches: LoaderCaches, make_id: int, name: str) -> int:
key = (make_id, normalize_key(name))
if key in caches.models:
return caches.models[key]
cur.execute(
"""
INSERT INTO vehicles.model (make_id, name)
VALUES (%s, %s)
ON CONFLICT (make_id, name) DO UPDATE SET name = EXCLUDED.name
RETURNING id
""",
(make_id, name),
)
model_id = cur.fetchone()[0]
caches.models[key] = model_id
return model_id
def upsert_model_year(cur: psycopg.Cursor, caches: LoaderCaches, model_id: int, year: int) -> int:
key = (model_id, year)
if key in caches.model_years:
return caches.model_years[key]
cur.execute(
"""
INSERT INTO vehicles.model_year (model_id, year)
VALUES (%s, %s)
ON CONFLICT (model_id, year) DO UPDATE SET year = EXCLUDED.year
RETURNING id
""",
(model_id, year),
)
model_year_id = cur.fetchone()[0]
caches.model_years[key] = model_year_id
return model_year_id
def upsert_trim(cur: psycopg.Cursor, caches: LoaderCaches, model_year_id: int, name: str) -> int:
key = (model_year_id, normalize_key(name))
if key in caches.trims:
return caches.trims[key]
cur.execute(
"""
INSERT INTO vehicles.trim (model_year_id, name)
VALUES (%s, %s)
ON CONFLICT (model_year_id, name) DO UPDATE SET name = EXCLUDED.name
RETURNING id
""",
(model_year_id, name),
)
trim_id = cur.fetchone()[0]
caches.trims[key] = trim_id
return trim_id
def upsert_engine(cur: psycopg.Cursor, caches: LoaderCaches, name: str) -> int:
key = normalize_key(name)
if key in caches.engines:
return caches.engines[key]
cur.execute(
"""
INSERT INTO vehicles.engine (name)
VALUES (%s)
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
RETURNING id
""",
(name,),
)
engine_id = cur.fetchone()[0]
caches.engines[key] = engine_id
return engine_id
def link_trim_engine(cur: psycopg.Cursor, trim_id: int, engine_id: int) -> None:
cur.execute(
"""
INSERT INTO vehicles.trim_engine (trim_id, engine_id)
VALUES (%s, %s)
ON CONFLICT (trim_id, engine_id) DO NOTHING
""",
(trim_id, engine_id),
)
def process_documents(cur: psycopg.Cursor, documents: List[Tuple[str, dict]], stats: LoaderStats) -> None:
caches = LoaderCaches()
for filename, payload in documents:
if not isinstance(payload, dict):
stats.bump("skipped_files_invalid_root")
print(f"[WARN] Skipping {filename}: root is not an object")
continue
for make_key, year_entries in payload.items():
make_name = clean_label(make_key)
if not make_name:
stats.bump("skipped_makes_invalid_name")
continue
make_id = upsert_make(cur, caches, make_name)
stats.bump("makes")
for year_entry in year_entries or []:
year_raw = year_entry.get("year")
try:
year = int(year_raw)
except (TypeError, ValueError):
stats.bump("skipped_years_invalid")
continue
models = year_entry.get("models") or []
for model in models:
model_name = clean_label(model.get("name", ""))
if not model_name:
stats.bump("skipped_models_invalid_name")
continue
engine_names = unique_labels(model.get("engines") or [])
if not engine_names:
stats.bump("skipped_models_missing_engines")
continue
trim_names = unique_labels(model.get("submodels") or [])
if not trim_names:
trim_names = [model_name]
model_id = upsert_model(cur, caches, make_id, model_name)
model_year_id = upsert_model_year(cur, caches, model_id, year)
stats.bump("model_years")
trim_ids: List[int] = []
for trim_name in trim_names:
trim_id = upsert_trim(cur, caches, model_year_id, trim_name)
trim_ids.append(trim_id)
stats.bump("trims")
for engine_name in engine_names:
engine_id = upsert_engine(cur, caches, engine_name)
stats.bump("engines")
for trim_id in trim_ids:
link_trim_engine(cur, trim_id, engine_id)
stats.bump("trim_engine_links")
def main() -> None:
args = parse_args()
data_dir = Path(args.data_dir).expanduser().resolve()
documents = load_json_documents(data_dir)
conninfo = build_conninfo(args)
transmissions = unique_labels(args.transmissions.split(","))
with psycopg.connect(conninfo) as conn:
with conn.cursor() as cur:
if not args.skip_truncate:
truncate_lookup_tables(cur)
ensure_transmissions(cur, transmissions or DEFAULT_TRANSMISSIONS)
stats = LoaderStats()
process_documents(cur, documents, stats)
print("\nVehicle lookup data load completed.")
for key, value in sorted(stats.as_dict().items()):
print(f" {key}: {value}")
print(f"\nProcessed directory: {data_dir}")
if __name__ == "__main__":
main()