343 lines
11 KiB
Python
Executable File
343 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Populate the normalized vehicles schema (make/model/model_year/trim/engine)
|
|
from the JSON sources in data/make-models.
|
|
|
|
Example:
|
|
PGPASSWORD=$(cat secrets/app/postgres-password.txt) \\
|
|
python3 scripts/load_vehicle_data.py \\
|
|
--db-user postgres --db-name motovaultpro --db-host 127.0.0.1
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Tuple
|
|
|
|
try:
|
|
import psycopg
|
|
except ImportError as exc: # pragma: no cover - ease troubleshooting
|
|
sys.stderr.write(
|
|
"Error: psycopg is required. Install with `pip install psycopg[binary]`.\n",
|
|
)
|
|
raise
|
|
|
|
|
|
DEFAULT_DATA_DIR = Path(__file__).resolve().parents[1] / "data" / "make-models"
|
|
DEFAULT_TRANSMISSIONS = ("Automatic", "Manual")
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Load vehicle dropdown data into Postgres.")
|
|
parser.add_argument(
|
|
"--data-dir",
|
|
default=str(DEFAULT_DATA_DIR),
|
|
help=f"Directory with make JSON files (default: {DEFAULT_DATA_DIR})",
|
|
)
|
|
parser.add_argument(
|
|
"--database-url",
|
|
help="Full postgres URL. Falls back to PG* environment variables if omitted.",
|
|
)
|
|
parser.add_argument("--db-host", default=os.environ.get("PGHOST", "127.0.0.1"))
|
|
parser.add_argument("--db-port", type=int, default=int(os.environ.get("PGPORT", 5432)))
|
|
parser.add_argument("--db-name", default=os.environ.get("PGDATABASE", "motovaultpro"))
|
|
parser.add_argument("--db-user", default=os.environ.get("PGUSER", "postgres"))
|
|
parser.add_argument("--db-password", default=os.environ.get("PGPASSWORD"))
|
|
parser.add_argument(
|
|
"--transmissions",
|
|
default=",".join(DEFAULT_TRANSMISSIONS),
|
|
help="Comma-separated list of transmission labels (default: Automatic,Manual)",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-truncate",
|
|
action="store_true",
|
|
help="Do not truncate lookup tables before loading (useful for incremental testing).",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def build_conninfo(args: argparse.Namespace) -> str:
|
|
if args.database_url:
|
|
return args.database_url
|
|
|
|
parts = [
|
|
f"host={args.db_host}",
|
|
f"port={args.db_port}",
|
|
f"dbname={args.db_name}",
|
|
f"user={args.db_user}",
|
|
]
|
|
if args.db_password:
|
|
parts.append(f"password={args.db_password}")
|
|
return " ".join(parts)
|
|
|
|
|
|
def load_json_documents(data_dir: Path) -> List[Tuple[str, dict]]:
|
|
if not data_dir.exists() or not data_dir.is_dir():
|
|
raise FileNotFoundError(f"Data directory not found: {data_dir}")
|
|
|
|
documents: List[Tuple[str, dict]] = []
|
|
for file_path in sorted(data_dir.glob("*.json")):
|
|
with file_path.open("r", encoding="utf-8") as handle:
|
|
try:
|
|
documents.append((file_path.name, json.load(handle)))
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON in {file_path}: {exc}") from exc
|
|
if not documents:
|
|
raise RuntimeError(f"No JSON files found under {data_dir}")
|
|
return documents
|
|
|
|
|
|
def clean_label(value: str) -> str:
|
|
text = str(value or "").replace("_", " ").strip()
|
|
text = re.sub(r"\s+", " ", text)
|
|
return text
|
|
|
|
|
|
def normalize_key(value: str) -> str:
|
|
text = clean_label(value).lower()
|
|
return text
|
|
|
|
|
|
def unique_labels(values: Iterable[str]) -> List[str]:
|
|
seen = set()
|
|
result: List[str] = []
|
|
for value in values:
|
|
label = clean_label(value)
|
|
if not label:
|
|
continue
|
|
key = normalize_key(label)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(label)
|
|
return result
|
|
|
|
|
|
class LoaderCaches:
|
|
def __init__(self) -> None:
|
|
self.makes: Dict[str, int] = {}
|
|
self.models: Dict[Tuple[int, str], int] = {}
|
|
self.model_years: Dict[Tuple[int, int], int] = {}
|
|
self.trims: Dict[Tuple[int, str], int] = {}
|
|
self.engines: Dict[str, int] = {}
|
|
|
|
|
|
class LoaderStats:
|
|
def __init__(self) -> None:
|
|
self.counter = Counter()
|
|
|
|
def as_dict(self) -> Dict[str, int]:
|
|
return dict(self.counter)
|
|
|
|
def bump(self, key: str, amount: int = 1) -> None:
|
|
self.counter[key] += amount
|
|
|
|
|
|
def truncate_lookup_tables(cur: psycopg.Cursor) -> None:
|
|
cur.execute("TRUNCATE vehicles.trim_engine, vehicles.trim_transmission RESTART IDENTITY CASCADE")
|
|
cur.execute("TRUNCATE vehicles.trim, vehicles.model_year, vehicles.model, vehicles.make RESTART IDENTITY CASCADE")
|
|
cur.execute("TRUNCATE vehicles.engine, vehicles.transmission RESTART IDENTITY CASCADE")
|
|
|
|
|
|
def ensure_transmissions(cur: psycopg.Cursor, names: Iterable[str]) -> None:
|
|
for name in unique_labels(names):
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.transmission (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO NOTHING
|
|
""",
|
|
(name,),
|
|
)
|
|
|
|
|
|
def upsert_make(cur: psycopg.Cursor, caches: LoaderCaches, name: str) -> int:
|
|
key = normalize_key(name)
|
|
if key in caches.makes:
|
|
return caches.makes[key]
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.make (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""",
|
|
(name,),
|
|
)
|
|
make_id = cur.fetchone()[0]
|
|
caches.makes[key] = make_id
|
|
return make_id
|
|
|
|
|
|
def upsert_model(cur: psycopg.Cursor, caches: LoaderCaches, make_id: int, name: str) -> int:
|
|
key = (make_id, normalize_key(name))
|
|
if key in caches.models:
|
|
return caches.models[key]
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.model (make_id, name)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT (make_id, name) DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""",
|
|
(make_id, name),
|
|
)
|
|
model_id = cur.fetchone()[0]
|
|
caches.models[key] = model_id
|
|
return model_id
|
|
|
|
|
|
def upsert_model_year(cur: psycopg.Cursor, caches: LoaderCaches, model_id: int, year: int) -> int:
|
|
key = (model_id, year)
|
|
if key in caches.model_years:
|
|
return caches.model_years[key]
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.model_year (model_id, year)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT (model_id, year) DO UPDATE SET year = EXCLUDED.year
|
|
RETURNING id
|
|
""",
|
|
(model_id, year),
|
|
)
|
|
model_year_id = cur.fetchone()[0]
|
|
caches.model_years[key] = model_year_id
|
|
return model_year_id
|
|
|
|
|
|
def upsert_trim(cur: psycopg.Cursor, caches: LoaderCaches, model_year_id: int, name: str) -> int:
|
|
key = (model_year_id, normalize_key(name))
|
|
if key in caches.trims:
|
|
return caches.trims[key]
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.trim (model_year_id, name)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT (model_year_id, name) DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""",
|
|
(model_year_id, name),
|
|
)
|
|
trim_id = cur.fetchone()[0]
|
|
caches.trims[key] = trim_id
|
|
return trim_id
|
|
|
|
|
|
def upsert_engine(cur: psycopg.Cursor, caches: LoaderCaches, name: str) -> int:
|
|
key = normalize_key(name)
|
|
if key in caches.engines:
|
|
return caches.engines[key]
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.engine (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""",
|
|
(name,),
|
|
)
|
|
engine_id = cur.fetchone()[0]
|
|
caches.engines[key] = engine_id
|
|
return engine_id
|
|
|
|
|
|
def link_trim_engine(cur: psycopg.Cursor, trim_id: int, engine_id: int) -> None:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vehicles.trim_engine (trim_id, engine_id)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT (trim_id, engine_id) DO NOTHING
|
|
""",
|
|
(trim_id, engine_id),
|
|
)
|
|
|
|
|
|
def process_documents(cur: psycopg.Cursor, documents: List[Tuple[str, dict]], stats: LoaderStats) -> None:
|
|
caches = LoaderCaches()
|
|
|
|
for filename, payload in documents:
|
|
if not isinstance(payload, dict):
|
|
stats.bump("skipped_files_invalid_root")
|
|
print(f"[WARN] Skipping {filename}: root is not an object")
|
|
continue
|
|
|
|
for make_key, year_entries in payload.items():
|
|
make_name = clean_label(make_key)
|
|
if not make_name:
|
|
stats.bump("skipped_makes_invalid_name")
|
|
continue
|
|
make_id = upsert_make(cur, caches, make_name)
|
|
stats.bump("makes")
|
|
|
|
for year_entry in year_entries or []:
|
|
year_raw = year_entry.get("year")
|
|
try:
|
|
year = int(year_raw)
|
|
except (TypeError, ValueError):
|
|
stats.bump("skipped_years_invalid")
|
|
continue
|
|
|
|
models = year_entry.get("models") or []
|
|
for model in models:
|
|
model_name = clean_label(model.get("name", ""))
|
|
if not model_name:
|
|
stats.bump("skipped_models_invalid_name")
|
|
continue
|
|
|
|
engine_names = unique_labels(model.get("engines") or [])
|
|
if not engine_names:
|
|
stats.bump("skipped_models_missing_engines")
|
|
continue
|
|
|
|
trim_names = unique_labels(model.get("submodels") or [])
|
|
if not trim_names:
|
|
trim_names = [model_name]
|
|
|
|
model_id = upsert_model(cur, caches, make_id, model_name)
|
|
model_year_id = upsert_model_year(cur, caches, model_id, year)
|
|
stats.bump("model_years")
|
|
|
|
trim_ids: List[int] = []
|
|
for trim_name in trim_names:
|
|
trim_id = upsert_trim(cur, caches, model_year_id, trim_name)
|
|
trim_ids.append(trim_id)
|
|
stats.bump("trims")
|
|
|
|
for engine_name in engine_names:
|
|
engine_id = upsert_engine(cur, caches, engine_name)
|
|
stats.bump("engines")
|
|
for trim_id in trim_ids:
|
|
link_trim_engine(cur, trim_id, engine_id)
|
|
stats.bump("trim_engine_links")
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
data_dir = Path(args.data_dir).expanduser().resolve()
|
|
documents = load_json_documents(data_dir)
|
|
conninfo = build_conninfo(args)
|
|
transmissions = unique_labels(args.transmissions.split(","))
|
|
|
|
with psycopg.connect(conninfo) as conn:
|
|
with conn.cursor() as cur:
|
|
if not args.skip_truncate:
|
|
truncate_lookup_tables(cur)
|
|
ensure_transmissions(cur, transmissions or DEFAULT_TRANSMISSIONS)
|
|
stats = LoaderStats()
|
|
process_documents(cur, documents, stats)
|
|
|
|
print("\nVehicle lookup data load completed.")
|
|
for key, value in sorted(stats.as_dict().items()):
|
|
print(f" {key}: {value}")
|
|
print(f"\nProcessed directory: {data_dir}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|