#!/usr/bin/env python3 """ Populate the normalized vehicles schema (make/model/model_year/trim/engine) from the JSON sources in data/make-models. Example: PGPASSWORD=$(cat secrets/app/postgres-password.txt) \\ python3 scripts/load_vehicle_data.py \\ --db-user postgres --db-name motovaultpro --db-host 127.0.0.1 """ from __future__ import annotations import argparse import json import os import re import sys from collections import Counter from pathlib import Path from typing import Dict, Iterable, List, Tuple try: import psycopg except ImportError as exc: # pragma: no cover - ease troubleshooting sys.stderr.write( "Error: psycopg is required. Install with `pip install psycopg[binary]`.\n", ) raise DEFAULT_DATA_DIR = Path(__file__).resolve().parents[1] / "data" / "make-models" DEFAULT_TRANSMISSIONS = ("Automatic", "Manual") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Load vehicle dropdown data into Postgres.") parser.add_argument( "--data-dir", default=str(DEFAULT_DATA_DIR), help=f"Directory with make JSON files (default: {DEFAULT_DATA_DIR})", ) parser.add_argument( "--database-url", help="Full postgres URL. Falls back to PG* environment variables if omitted.", ) parser.add_argument("--db-host", default=os.environ.get("PGHOST", "127.0.0.1")) parser.add_argument("--db-port", type=int, default=int(os.environ.get("PGPORT", 5432))) parser.add_argument("--db-name", default=os.environ.get("PGDATABASE", "motovaultpro")) parser.add_argument("--db-user", default=os.environ.get("PGUSER", "postgres")) parser.add_argument("--db-password", default=os.environ.get("PGPASSWORD")) parser.add_argument( "--transmissions", default=",".join(DEFAULT_TRANSMISSIONS), help="Comma-separated list of transmission labels (default: Automatic,Manual)", ) parser.add_argument( "--skip-truncate", action="store_true", help="Do not truncate lookup tables before loading (useful for incremental testing).", ) return parser.parse_args() def build_conninfo(args: argparse.Namespace) -> str: if args.database_url: return args.database_url parts = [ f"host={args.db_host}", f"port={args.db_port}", f"dbname={args.db_name}", f"user={args.db_user}", ] if args.db_password: parts.append(f"password={args.db_password}") return " ".join(parts) def load_json_documents(data_dir: Path) -> List[Tuple[str, dict]]: if not data_dir.exists() or not data_dir.is_dir(): raise FileNotFoundError(f"Data directory not found: {data_dir}") documents: List[Tuple[str, dict]] = [] for file_path in sorted(data_dir.glob("*.json")): with file_path.open("r", encoding="utf-8") as handle: try: documents.append((file_path.name, json.load(handle))) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in {file_path}: {exc}") from exc if not documents: raise RuntimeError(f"No JSON files found under {data_dir}") return documents def clean_label(value: str) -> str: text = str(value or "").replace("_", " ").strip() text = re.sub(r"\s+", " ", text) return text def normalize_key(value: str) -> str: text = clean_label(value).lower() return text def unique_labels(values: Iterable[str]) -> List[str]: seen = set() result: List[str] = [] for value in values: label = clean_label(value) if not label: continue key = normalize_key(label) if key in seen: continue seen.add(key) result.append(label) return result class LoaderCaches: def __init__(self) -> None: self.makes: Dict[str, int] = {} self.models: Dict[Tuple[int, str], int] = {} self.model_years: Dict[Tuple[int, int], int] = {} self.trims: Dict[Tuple[int, str], int] = {} self.engines: Dict[str, int] = {} class LoaderStats: def __init__(self) -> None: self.counter = Counter() def as_dict(self) -> Dict[str, int]: return dict(self.counter) def bump(self, key: str, amount: int = 1) -> None: self.counter[key] += amount def truncate_lookup_tables(cur: psycopg.Cursor) -> None: cur.execute("TRUNCATE vehicles.trim_engine, vehicles.trim_transmission RESTART IDENTITY CASCADE") cur.execute("TRUNCATE vehicles.trim, vehicles.model_year, vehicles.model, vehicles.make RESTART IDENTITY CASCADE") cur.execute("TRUNCATE vehicles.engine, vehicles.transmission RESTART IDENTITY CASCADE") def ensure_transmissions(cur: psycopg.Cursor, names: Iterable[str]) -> None: for name in unique_labels(names): cur.execute( """ INSERT INTO vehicles.transmission (name) VALUES (%s) ON CONFLICT (name) DO NOTHING """, (name,), ) def upsert_make(cur: psycopg.Cursor, caches: LoaderCaches, name: str) -> int: key = normalize_key(name) if key in caches.makes: return caches.makes[key] cur.execute( """ INSERT INTO vehicles.make (name) VALUES (%s) ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (name,), ) make_id = cur.fetchone()[0] caches.makes[key] = make_id return make_id def upsert_model(cur: psycopg.Cursor, caches: LoaderCaches, make_id: int, name: str) -> int: key = (make_id, normalize_key(name)) if key in caches.models: return caches.models[key] cur.execute( """ INSERT INTO vehicles.model (make_id, name) VALUES (%s, %s) ON CONFLICT (make_id, name) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (make_id, name), ) model_id = cur.fetchone()[0] caches.models[key] = model_id return model_id def upsert_model_year(cur: psycopg.Cursor, caches: LoaderCaches, model_id: int, year: int) -> int: key = (model_id, year) if key in caches.model_years: return caches.model_years[key] cur.execute( """ INSERT INTO vehicles.model_year (model_id, year) VALUES (%s, %s) ON CONFLICT (model_id, year) DO UPDATE SET year = EXCLUDED.year RETURNING id """, (model_id, year), ) model_year_id = cur.fetchone()[0] caches.model_years[key] = model_year_id return model_year_id def upsert_trim(cur: psycopg.Cursor, caches: LoaderCaches, model_year_id: int, name: str) -> int: key = (model_year_id, normalize_key(name)) if key in caches.trims: return caches.trims[key] cur.execute( """ INSERT INTO vehicles.trim (model_year_id, name) VALUES (%s, %s) ON CONFLICT (model_year_id, name) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (model_year_id, name), ) trim_id = cur.fetchone()[0] caches.trims[key] = trim_id return trim_id def upsert_engine(cur: psycopg.Cursor, caches: LoaderCaches, name: str) -> int: key = normalize_key(name) if key in caches.engines: return caches.engines[key] cur.execute( """ INSERT INTO vehicles.engine (name) VALUES (%s) ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (name,), ) engine_id = cur.fetchone()[0] caches.engines[key] = engine_id return engine_id def link_trim_engine(cur: psycopg.Cursor, trim_id: int, engine_id: int) -> None: cur.execute( """ INSERT INTO vehicles.trim_engine (trim_id, engine_id) VALUES (%s, %s) ON CONFLICT (trim_id, engine_id) DO NOTHING """, (trim_id, engine_id), ) def process_documents(cur: psycopg.Cursor, documents: List[Tuple[str, dict]], stats: LoaderStats) -> None: caches = LoaderCaches() for filename, payload in documents: if not isinstance(payload, dict): stats.bump("skipped_files_invalid_root") print(f"[WARN] Skipping {filename}: root is not an object") continue for make_key, year_entries in payload.items(): make_name = clean_label(make_key) if not make_name: stats.bump("skipped_makes_invalid_name") continue make_id = upsert_make(cur, caches, make_name) stats.bump("makes") for year_entry in year_entries or []: year_raw = year_entry.get("year") try: year = int(year_raw) except (TypeError, ValueError): stats.bump("skipped_years_invalid") continue models = year_entry.get("models") or [] for model in models: model_name = clean_label(model.get("name", "")) if not model_name: stats.bump("skipped_models_invalid_name") continue engine_names = unique_labels(model.get("engines") or []) if not engine_names: stats.bump("skipped_models_missing_engines") continue trim_names = unique_labels(model.get("submodels") or []) if not trim_names: trim_names = [model_name] model_id = upsert_model(cur, caches, make_id, model_name) model_year_id = upsert_model_year(cur, caches, model_id, year) stats.bump("model_years") trim_ids: List[int] = [] for trim_name in trim_names: trim_id = upsert_trim(cur, caches, model_year_id, trim_name) trim_ids.append(trim_id) stats.bump("trims") for engine_name in engine_names: engine_id = upsert_engine(cur, caches, engine_name) stats.bump("engines") for trim_id in trim_ids: link_trim_engine(cur, trim_id, engine_id) stats.bump("trim_engine_links") def main() -> None: args = parse_args() data_dir = Path(args.data_dir).expanduser().resolve() documents = load_json_documents(data_dir) conninfo = build_conninfo(args) transmissions = unique_labels(args.transmissions.split(",")) with psycopg.connect(conninfo) as conn: with conn.cursor() as cur: if not args.skip_truncate: truncate_lookup_tables(cur) ensure_transmissions(cur, transmissions or DEFAULT_TRANSMISSIONS) stats = LoaderStats() process_documents(cur, documents, stats) print("\nVehicle lookup data load completed.") for key, value in sorted(stats.as_dict().items()): print(f" {key}: {value}") print(f"\nProcessed directory: {data_dir}") if __name__ == "__main__": main()