#!/usr/bin/env python3 """ Fetches VehAPI data into an offline snapshot (SQLite + meta.json). Workflow: 1. Walks Year -> Make -> Model -> Trim -> Transmission -> Engine using VehAPI. 2. Persists observed compatibility pairs to snapshot.sqlite (no Cartesian products). 3. Stores request/response cache for resume; obeys rate limits and 429 retry-after. """ from __future__ import annotations import argparse import hashlib import json import random import sqlite3 import sys import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from urllib.parse import quote try: import requests except ImportError: # pragma: no cover - env guard print("[error] Missing dependency 'requests'. Install with `pip install requests`.", file=sys.stderr) sys.exit(1) SCRIPT_VERSION = "vehapi_fetch_snapshot.py@1.1.0" DEFAULT_MIN_YEAR = 2015 DEFAULT_MAX_YEAR = 2022 DEFAULT_RATE_PER_MIN = 55 # stays under the 60 req/min ceiling MAX_ATTEMPTS = 5 FALLBACK_TRIMS = ["Base"] FALLBACK_TRANSMISSIONS = ["Manual", "Automatic"] DEFAULT_BASE_URL = "https://vehapi.com/api/v1/car-lists/get/car" def canonicalize(value: str) -> str: """Lowercase, trim, collapse spaces, and normalize hyphens for dedupe keys.""" import re cleaned = (value or "").strip() cleaned = re.sub(r"[\s\u00A0]+", " ", cleaned) cleaned = re.sub(r"[-\u2010-\u2015]+", "-", cleaned) return cleaned.lower() def infer_trans_bucket(trans_str: str) -> str: lowered = (trans_str or "").lower() if "manual" in lowered or "mt" in lowered or "m/t" in lowered: return "Manual" return "Automatic" def infer_fuel_bucket(engine_str: str, trans_str: str, trim_str: str) -> str: target = " ".join([engine_str or "", trans_str or "", trim_str or ""]).lower() if any(token in target for token in ["electric", "ev", "battery", "motor", "kwh"]): return "Electric" if any(token in target for token in ["hybrid", "phev", "plug-in", "hev", "e-hybrid"]): return "Hybrid" if any(token in target for token in ["diesel", "tdi", "dci", "duramax", "power stroke", "cummins"]): return "Diesel" return "Gas" def read_text_file(path: Path) -> str: with path.open("r", encoding="utf-8") as fh: return fh.read() def read_lines(path: Path) -> List[str]: return [line.strip() for line in read_text_file(path).splitlines() if line.strip()] def sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as fh: for chunk in iter(lambda: fh.read(8192), b""): h.update(chunk) return h.hexdigest() def ensure_snapshot_dir(root: Path, custom_dir: Optional[str]) -> Path: if custom_dir: snapshot_dir = Path(custom_dir) else: today = datetime.now(timezone.utc).date().isoformat() snapshot_dir = root / today snapshot_dir.mkdir(parents=True, exist_ok=True) return snapshot_dir class RateLimiter: """Fixed delay limiter to stay below the VehAPI threshold (60 req/min).""" def __init__(self, max_per_min: int) -> None: self.delay = 60.0 / max_per_min # ~1.09 sec for 55 rpm self._last_request = 0.0 def acquire(self) -> None: now = time.monotonic() elapsed = now - self._last_request if elapsed < self.delay: time.sleep(self.delay - elapsed) self._last_request = time.monotonic() @dataclass class FetchCounts: pairs_inserted: int = 0 cache_hits: int = 0 fallback_transmissions: int = 0 fallback_engines: int = 0 class VehapiFetcher: def __init__( self, session: requests.Session, base_url: str, token: str, min_year: int, max_year: int, allowed_makes: Sequence[str], snapshot_path: Path, responses_cache: bool = True, rate_per_min: int = DEFAULT_RATE_PER_MIN, ) -> None: self.session = session self.base_url = base_url.rstrip("/") self.token = token self.min_year = min_year self.max_year = max_year self.allowed_makes = {canonicalize(m): m for m in allowed_makes} self.snapshot_path = snapshot_path self.conn = sqlite3.connect(self.snapshot_path) self.conn.execute("PRAGMA journal_mode=WAL;") self.conn.execute("PRAGMA synchronous=NORMAL;") self._init_schema() self.responses_cache = responses_cache self.rate_limiter = RateLimiter(rate_per_min) self.counts = FetchCounts() def _init_schema(self) -> None: self.conn.execute( """ CREATE TABLE IF NOT EXISTS pairs( year INT, make TEXT, model TEXT, trim TEXT, engine_display TEXT, engine_canon TEXT, engine_bucket TEXT, trans_display TEXT, trans_canon TEXT, trans_bucket TEXT, PRIMARY KEY(year, make, model, trim, engine_canon, trans_canon) ) """ ) self.conn.execute( """ CREATE TABLE IF NOT EXISTS meta( key TEXT PRIMARY KEY, value TEXT ) """ ) self.conn.execute( """ CREATE TABLE IF NOT EXISTS responses( request_key TEXT PRIMARY KEY, url TEXT, status INT, headers_json TEXT, body_json TEXT, fetched_at TEXT ) """ ) self.conn.commit() def _store_meta(self, meta: Dict[str, Any]) -> None: rows = [(k, str(v)) for k, v in meta.items()] self.conn.executemany("INSERT OR REPLACE INTO meta(key, value) VALUES (?, ?)", rows) self.conn.commit() def _load_cached_response(self, request_key: str) -> Optional[Any]: if not self.responses_cache: return None cur = self.conn.execute("SELECT body_json FROM responses WHERE request_key = ?", (request_key,)) row = cur.fetchone() if not row: return None self.counts.cache_hits += 1 try: return json.loads(row[0]) except Exception: return None def _save_response(self, request_key: str, url: str, status: int, headers: Dict[str, Any], body: Any) -> None: self.conn.execute( """ INSERT OR REPLACE INTO responses(request_key, url, status, headers_json, body_json, fetched_at) VALUES (?, ?, ?, ?, ?, ?) """, ( request_key, url, status, json.dumps(dict(headers), default=str), json.dumps(body, default=str), datetime.now(timezone.utc).isoformat(), ), ) self.conn.commit() def _request_json(self, path_parts: Sequence[str], label: str) -> Any: path_parts = [str(p) for p in path_parts] request_key = "/".join(path_parts) cached = self._load_cached_response(request_key) if cached is not None: return cached url = f"{self.base_url}/" + "/".join(quote(p, safe="") for p in path_parts) attempts = 0 backoff = 1.0 while attempts < MAX_ATTEMPTS: attempts += 1 self.rate_limiter.acquire() try: resp = self.session.get(url, headers={"Authorization": f"Bearer {self.token}"}, timeout=30) except requests.RequestException as exc: print(f"[warn] {label}: request error {exc}; retrying...", file=sys.stderr) time.sleep(backoff + random.uniform(0, 0.5)) backoff = min(backoff * 2, 30) continue if resp.status_code == 429: retry_after = resp.headers.get("retry-after") or resp.headers.get("Retry-After") try: retry_seconds = float(retry_after) except (TypeError, ValueError): retry_seconds = 30.0 sleep_for = retry_seconds + random.uniform(0, 0.5) print(f"[info] {label}: hit 429, sleeping {sleep_for:.1f}s before retry", file=sys.stderr) time.sleep(sleep_for) backoff = min(backoff * 2, 30) continue if resp.status_code >= 500: print(f"[warn] {label}: server {resp.status_code}, retrying...", file=sys.stderr) time.sleep(backoff + random.uniform(0, 0.5)) backoff = min(backoff * 2, 30) continue if not resp.ok: print(f"[warn] {label}: HTTP {resp.status_code}, skipping", file=sys.stderr) return [] try: body = resp.json() except ValueError: print(f"[warn] {label}: non-JSON response, skipping", file=sys.stderr) return [] self._save_response(request_key, url, resp.status_code, resp.headers, body) return body print(f"[error] {label}: exhausted retries", file=sys.stderr) return [] @staticmethod def _extract_values(payload: Any, keys: Sequence[str]) -> List[str]: values: List[str] = [] if isinstance(payload, dict): payload = payload.get("data") or payload.get("results") or payload.get("items") or payload if not payload: return values if isinstance(payload, list): for item in payload: if isinstance(item, str): if item.strip(): values.append(item.strip()) continue if isinstance(item, dict): for key in keys: val = item.get(key) if val: values.append(str(val).strip()) break return values def _record_pair( self, year: int, make: str, model: str, trim: str, engine_display: str, engine_bucket: str, trans_display: str, trans_bucket: str, ) -> None: engine_canon = canonicalize(engine_display) trans_canon = canonicalize(trans_display) cur = self.conn.execute( """ INSERT OR IGNORE INTO pairs( year, make, model, trim, engine_display, engine_canon, engine_bucket, trans_display, trans_canon, trans_bucket ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( year, make, model, trim, engine_display.strip(), engine_canon, engine_bucket, trans_display.strip(), trans_canon, trans_bucket, ), ) if cur.rowcount: self.counts.pairs_inserted += 1 def _fetch_engines_for_transmission( self, year: int, make: str, model: str, trim: str, transmission: str, trans_bucket: str ) -> None: path = ["engines", str(year), make, model, trim, transmission] label = f"engines:{year}/{make}/{model}/{trim}/{transmission}" engines_payload = self._request_json(path, label) engines = self._extract_values(engines_payload, ["engine"]) if not engines: engine_bucket = infer_fuel_bucket("", transmission, trim) fallback_engine = engine_bucket self._record_pair(year, make, model, trim, fallback_engine, engine_bucket, transmission, trans_bucket) self.counts.fallback_engines += 1 return for engine in engines: engine_bucket = infer_fuel_bucket(engine, transmission, trim) self._record_pair(year, make, model, trim, engine, engine_bucket, transmission, trans_bucket) def _fetch_transmissions_for_trim(self, year: int, make: str, model: str, trim: str) -> None: path = ["transmissions", str(year), make, model, trim] label = f"transmissions:{year}/{make}/{model}/{trim}" transmissions_payload = self._request_json(path, label) transmissions = self._extract_values(transmissions_payload, ["transmission"]) if not transmissions: for fallback in FALLBACK_TRANSMISSIONS: trans_bucket = infer_trans_bucket(fallback) engine_bucket = infer_fuel_bucket("", fallback, trim) self._record_pair(year, make, model, trim, engine_bucket, engine_bucket, fallback, trans_bucket) self.counts.fallback_transmissions += 1 self.counts.fallback_engines += 1 return for trans in transmissions: trans_bucket = infer_trans_bucket(trans) self._fetch_engines_for_transmission(year, make, model, trim, trans, trans_bucket) def _fetch_trims_for_model(self, year: int, make: str, model: str) -> None: print(f" -> {year} {make} {model}", file=sys.stderr) path = ["trims", str(year), make, model] label = f"trims:{year}/{make}/{model}" trims_payload = self._request_json(path, label) trims = self._extract_values(trims_payload, ["trim"]) if not trims: trims = FALLBACK_TRIMS for trim in trims: self._fetch_transmissions_for_trim(year, make, model, trim) self.conn.commit() def _fetch_models_for_make(self, year: int, make: str) -> None: path = ["models", str(year), make] label = f"models:{year}/{make}" models_payload = self._request_json(path, label) models = self._extract_values(models_payload, ["model"]) if not models: print(f"[warn] {label}: no models returned", file=sys.stderr) return for model in models: self._fetch_trims_for_model(year, make, model) def _fetch_makes_for_year(self, year: int) -> List[str]: path = ["makes", str(year)] label = f"makes:{year}" makes_payload = self._request_json(path, label) makes = self._extract_values(makes_payload, ["make"]) filtered = [] for make in makes: canon = canonicalize(make) if canon in self.allowed_makes: filtered.append(make) return filtered def run(self) -> FetchCounts: for year in range(self.min_year, self.max_year + 1): makes = self._fetch_makes_for_year(year) if not makes: print(f"[info] {year}: no allowed makes found, skipping", file=sys.stderr) continue print(f"[info] {year}: {len(makes)} makes", file=sys.stderr) for idx, make in enumerate(makes, 1): print(f"[{year}] ({idx}/{len(makes)}) {make}", file=sys.stderr) self._fetch_models_for_make(year, make) print(f" [{self.counts.pairs_inserted} pairs so far]", file=sys.stderr) self.conn.commit() return self.counts def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Fetch VehAPI snapshot into SQLite.") parser.add_argument("--min-year", type=int, default=int(read_env("MIN_YEAR", DEFAULT_MIN_YEAR)), help="Inclusive min year (default env MIN_YEAR or 2017)") parser.add_argument("--max-year", type=int, default=int(read_env("MAX_YEAR", DEFAULT_MAX_YEAR)), help="Inclusive max year (default env MAX_YEAR or 2026)") parser.add_argument("--snapshot-dir", type=str, help="Target snapshot directory (default snapshots/)") parser.add_argument("--base-url", type=str, default=read_env("VEHAPI_BASE_URL", DEFAULT_BASE_URL), help="VehAPI base URL (e.g. https://vehapi.com/api/v1/car-lists/get/car)") parser.add_argument("--rate-per-min", type=int, default=int(read_env("VEHAPI_MAX_RPM", DEFAULT_RATE_PER_MIN)), help="Max requests per minute (<=60)") parser.add_argument("--makes-file", type=str, default="source-makes.txt", help="Path to source-makes.txt") parser.add_argument("--api-key-file", type=str, default="vehapi.key", help="Path to VehAPI bearer token file") parser.add_argument("--no-response-cache", action="store_true", help="Disable request cache stored in snapshot.sqlite") return parser def read_env(key: str, default: Any) -> Any: import os return os.environ.get(key, default) def main(argv: Sequence[str]) -> int: parser = build_arg_parser() args = parser.parse_args(argv) base_dir = Path(__file__).resolve().parent snapshot_root = base_dir / "snapshots" snapshot_dir = ensure_snapshot_dir(snapshot_root, args.snapshot_dir) snapshot_path = snapshot_dir / "snapshot.sqlite" meta_path = snapshot_dir / "meta.json" makes_file = (base_dir / args.makes_file).resolve() api_key_file = (base_dir / args.api_key_file).resolve() if not makes_file.exists(): print(f"[error] makes file not found: {makes_file}", file=sys.stderr) return 1 if not api_key_file.exists(): print(f"[error] api key file not found: {api_key_file}", file=sys.stderr) return 1 allowed_makes = read_lines(makes_file) token = read_text_file(api_key_file).strip() if not token: print("[error] vehapi.key is empty", file=sys.stderr) return 1 session = requests.Session() fetcher = VehapiFetcher( session=session, base_url=args.base_url, token=token, min_year=args.min_year, max_year=args.max_year, allowed_makes=allowed_makes, snapshot_path=snapshot_path, responses_cache=not args.no_response_cache, rate_per_min=args.rate_per_min, ) started_at = datetime.now(timezone.utc) counts = fetcher.run() finished_at = datetime.now(timezone.utc) meta = { "generated_at": finished_at.isoformat(), "started_at": started_at.isoformat(), "min_year": args.min_year, "max_year": args.max_year, "script_version": SCRIPT_VERSION, "makes_file": str(makes_file), "makes_hash": sha256_file(makes_file), "api_base_url": args.base_url, "snapshot_path": str(snapshot_path), "pairs_inserted": counts.pairs_inserted, "fallback_transmissions": counts.fallback_transmissions, "fallback_engines": counts.fallback_engines, "response_cache_hits": counts.cache_hits, } fetcher._store_meta(meta) with meta_path.open("w", encoding="utf-8") as fh: json.dump(meta, fh, indent=2) print( f"[done] wrote snapshot to {snapshot_path} with {counts.pairs_inserted} pairs " f"(fallback trans={counts.fallback_transmissions}, fallback engines={counts.fallback_engines}, cache hits={counts.cache_hits})", file=sys.stderr, ) return 0 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))