Files
motovaultpro/data/vehicle-etl/vehapi_fetch_snapshot.py
2025-12-15 20:51:52 -06:00

514 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Fetches VehAPI data into an offline snapshot (SQLite + meta.json).
Workflow:
1. Walks Year -> Make -> Model -> Trim -> Transmission -> Engine using VehAPI.
2. Persists observed compatibility pairs to snapshot.sqlite (no Cartesian products).
3. Stores request/response cache for resume; obeys rate limits and 429 retry-after.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import random
import sqlite3
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
from urllib.parse import quote
try:
import requests
except ImportError: # pragma: no cover - env guard
print("[error] Missing dependency 'requests'. Install with `pip install requests`.", file=sys.stderr)
sys.exit(1)
SCRIPT_VERSION = "vehapi_fetch_snapshot.py@1.1.0"
DEFAULT_MIN_YEAR = 2015
DEFAULT_MAX_YEAR = 2022
DEFAULT_RATE_PER_MIN = 55 # stays under the 60 req/min ceiling
MAX_ATTEMPTS = 5
FALLBACK_TRIMS = ["Base"]
FALLBACK_TRANSMISSIONS = ["Manual", "Automatic"]
DEFAULT_BASE_URL = "https://vehapi.com/api/v1/car-lists/get/car"
def canonicalize(value: str) -> str:
"""Lowercase, trim, collapse spaces, and normalize hyphens for dedupe keys."""
import re
cleaned = (value or "").strip()
cleaned = re.sub(r"[\s\u00A0]+", " ", cleaned)
cleaned = re.sub(r"[-\u2010-\u2015]+", "-", cleaned)
return cleaned.lower()
def infer_trans_bucket(trans_str: str) -> str:
lowered = (trans_str or "").lower()
if "manual" in lowered or "mt" in lowered or "m/t" in lowered:
return "Manual"
return "Automatic"
def infer_fuel_bucket(engine_str: str, trans_str: str, trim_str: str) -> str:
target = " ".join([engine_str or "", trans_str or "", trim_str or ""]).lower()
if any(token in target for token in ["electric", "ev", "battery", "motor", "kwh"]):
return "Electric"
if any(token in target for token in ["hybrid", "phev", "plug-in", "hev", "e-hybrid"]):
return "Hybrid"
if any(token in target for token in ["diesel", "tdi", "dci", "duramax", "power stroke", "cummins"]):
return "Diesel"
return "Gas"
def read_text_file(path: Path) -> str:
with path.open("r", encoding="utf-8") as fh:
return fh.read()
def read_lines(path: Path) -> List[str]:
return [line.strip() for line in read_text_file(path).splitlines() if line.strip()]
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def ensure_snapshot_dir(root: Path, custom_dir: Optional[str]) -> Path:
if custom_dir:
snapshot_dir = Path(custom_dir)
else:
today = datetime.now(timezone.utc).date().isoformat()
snapshot_dir = root / today
snapshot_dir.mkdir(parents=True, exist_ok=True)
return snapshot_dir
class RateLimiter:
"""Fixed delay limiter to stay below the VehAPI threshold (60 req/min)."""
def __init__(self, max_per_min: int) -> None:
self.delay = 60.0 / max_per_min # ~1.09 sec for 55 rpm
self._last_request = 0.0
def acquire(self) -> None:
now = time.monotonic()
elapsed = now - self._last_request
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
self._last_request = time.monotonic()
@dataclass
class FetchCounts:
pairs_inserted: int = 0
cache_hits: int = 0
fallback_transmissions: int = 0
fallback_engines: int = 0
class VehapiFetcher:
def __init__(
self,
session: requests.Session,
base_url: str,
token: str,
min_year: int,
max_year: int,
allowed_makes: Sequence[str],
snapshot_path: Path,
responses_cache: bool = True,
rate_per_min: int = DEFAULT_RATE_PER_MIN,
) -> None:
self.session = session
self.base_url = base_url.rstrip("/")
self.token = token
self.min_year = min_year
self.max_year = max_year
self.allowed_makes = {canonicalize(m): m for m in allowed_makes}
self.snapshot_path = snapshot_path
self.conn = sqlite3.connect(self.snapshot_path)
self.conn.execute("PRAGMA journal_mode=WAL;")
self.conn.execute("PRAGMA synchronous=NORMAL;")
self._init_schema()
self.responses_cache = responses_cache
self.rate_limiter = RateLimiter(rate_per_min)
self.counts = FetchCounts()
def _init_schema(self) -> None:
self.conn.execute(
"""
CREATE TABLE IF NOT EXISTS pairs(
year INT,
make TEXT,
model TEXT,
trim TEXT,
engine_display TEXT,
engine_canon TEXT,
engine_bucket TEXT,
trans_display TEXT,
trans_canon TEXT,
trans_bucket TEXT,
PRIMARY KEY(year, make, model, trim, engine_canon, trans_canon)
)
"""
)
self.conn.execute(
"""
CREATE TABLE IF NOT EXISTS meta(
key TEXT PRIMARY KEY,
value TEXT
)
"""
)
self.conn.execute(
"""
CREATE TABLE IF NOT EXISTS responses(
request_key TEXT PRIMARY KEY,
url TEXT,
status INT,
headers_json TEXT,
body_json TEXT,
fetched_at TEXT
)
"""
)
self.conn.commit()
def _store_meta(self, meta: Dict[str, Any]) -> None:
rows = [(k, str(v)) for k, v in meta.items()]
self.conn.executemany("INSERT OR REPLACE INTO meta(key, value) VALUES (?, ?)", rows)
self.conn.commit()
def _load_cached_response(self, request_key: str) -> Optional[Any]:
if not self.responses_cache:
return None
cur = self.conn.execute("SELECT body_json FROM responses WHERE request_key = ?", (request_key,))
row = cur.fetchone()
if not row:
return None
self.counts.cache_hits += 1
try:
return json.loads(row[0])
except Exception:
return None
def _save_response(self, request_key: str, url: str, status: int, headers: Dict[str, Any], body: Any) -> None:
self.conn.execute(
"""
INSERT OR REPLACE INTO responses(request_key, url, status, headers_json, body_json, fetched_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
request_key,
url,
status,
json.dumps(dict(headers), default=str),
json.dumps(body, default=str),
datetime.now(timezone.utc).isoformat(),
),
)
self.conn.commit()
def _request_json(self, path_parts: Sequence[str], label: str) -> Any:
path_parts = [str(p) for p in path_parts]
request_key = "/".join(path_parts)
cached = self._load_cached_response(request_key)
if cached is not None:
return cached
url = f"{self.base_url}/" + "/".join(quote(p, safe="") for p in path_parts)
attempts = 0
backoff = 1.0
while attempts < MAX_ATTEMPTS:
attempts += 1
self.rate_limiter.acquire()
try:
resp = self.session.get(url, headers={"Authorization": f"Bearer {self.token}"}, timeout=30)
except requests.RequestException as exc:
print(f"[warn] {label}: request error {exc}; retrying...", file=sys.stderr)
time.sleep(backoff + random.uniform(0, 0.5))
backoff = min(backoff * 2, 30)
continue
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after") or resp.headers.get("Retry-After")
try:
retry_seconds = float(retry_after)
except (TypeError, ValueError):
retry_seconds = 30.0
sleep_for = retry_seconds + random.uniform(0, 0.5)
print(f"[info] {label}: hit 429, sleeping {sleep_for:.1f}s before retry", file=sys.stderr)
time.sleep(sleep_for)
backoff = min(backoff * 2, 30)
continue
if resp.status_code >= 500:
print(f"[warn] {label}: server {resp.status_code}, retrying...", file=sys.stderr)
time.sleep(backoff + random.uniform(0, 0.5))
backoff = min(backoff * 2, 30)
continue
if not resp.ok:
print(f"[warn] {label}: HTTP {resp.status_code}, skipping", file=sys.stderr)
return []
try:
body = resp.json()
except ValueError:
print(f"[warn] {label}: non-JSON response, skipping", file=sys.stderr)
return []
self._save_response(request_key, url, resp.status_code, resp.headers, body)
return body
print(f"[error] {label}: exhausted retries", file=sys.stderr)
return []
@staticmethod
def _extract_values(payload: Any, keys: Sequence[str]) -> List[str]:
values: List[str] = []
if isinstance(payload, dict):
payload = payload.get("data") or payload.get("results") or payload.get("items") or payload
if not payload:
return values
if isinstance(payload, list):
for item in payload:
if isinstance(item, str):
if item.strip():
values.append(item.strip())
continue
if isinstance(item, dict):
for key in keys:
val = item.get(key)
if val:
values.append(str(val).strip())
break
return values
def _record_pair(
self,
year: int,
make: str,
model: str,
trim: str,
engine_display: str,
engine_bucket: str,
trans_display: str,
trans_bucket: str,
) -> None:
engine_canon = canonicalize(engine_display)
trans_canon = canonicalize(trans_display)
cur = self.conn.execute(
"""
INSERT OR IGNORE INTO pairs(
year, make, model, trim,
engine_display, engine_canon, engine_bucket,
trans_display, trans_canon, trans_bucket
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
year,
make,
model,
trim,
engine_display.strip(),
engine_canon,
engine_bucket,
trans_display.strip(),
trans_canon,
trans_bucket,
),
)
if cur.rowcount:
self.counts.pairs_inserted += 1
def _fetch_engines_for_transmission(
self, year: int, make: str, model: str, trim: str, transmission: str, trans_bucket: str
) -> None:
path = ["engines", str(year), make, model, trim, transmission]
label = f"engines:{year}/{make}/{model}/{trim}/{transmission}"
engines_payload = self._request_json(path, label)
engines = self._extract_values(engines_payload, ["engine"])
if not engines:
engine_bucket = infer_fuel_bucket("", transmission, trim)
fallback_engine = engine_bucket
self._record_pair(year, make, model, trim, fallback_engine, engine_bucket, transmission, trans_bucket)
self.counts.fallback_engines += 1
return
for engine in engines:
engine_bucket = infer_fuel_bucket(engine, transmission, trim)
self._record_pair(year, make, model, trim, engine, engine_bucket, transmission, trans_bucket)
def _fetch_transmissions_for_trim(self, year: int, make: str, model: str, trim: str) -> None:
path = ["transmissions", str(year), make, model, trim]
label = f"transmissions:{year}/{make}/{model}/{trim}"
transmissions_payload = self._request_json(path, label)
transmissions = self._extract_values(transmissions_payload, ["transmission"])
if not transmissions:
for fallback in FALLBACK_TRANSMISSIONS:
trans_bucket = infer_trans_bucket(fallback)
engine_bucket = infer_fuel_bucket("", fallback, trim)
self._record_pair(year, make, model, trim, engine_bucket, engine_bucket, fallback, trans_bucket)
self.counts.fallback_transmissions += 1
self.counts.fallback_engines += 1
return
for trans in transmissions:
trans_bucket = infer_trans_bucket(trans)
self._fetch_engines_for_transmission(year, make, model, trim, trans, trans_bucket)
def _fetch_trims_for_model(self, year: int, make: str, model: str) -> None:
print(f" -> {year} {make} {model}", file=sys.stderr)
path = ["trims", str(year), make, model]
label = f"trims:{year}/{make}/{model}"
trims_payload = self._request_json(path, label)
trims = self._extract_values(trims_payload, ["trim"])
if not trims:
trims = FALLBACK_TRIMS
for trim in trims:
self._fetch_transmissions_for_trim(year, make, model, trim)
self.conn.commit()
def _fetch_models_for_make(self, year: int, make: str) -> None:
path = ["models", str(year), make]
label = f"models:{year}/{make}"
models_payload = self._request_json(path, label)
models = self._extract_values(models_payload, ["model"])
if not models:
print(f"[warn] {label}: no models returned", file=sys.stderr)
return
for model in models:
self._fetch_trims_for_model(year, make, model)
def _fetch_makes_for_year(self, year: int) -> List[str]:
path = ["makes", str(year)]
label = f"makes:{year}"
makes_payload = self._request_json(path, label)
makes = self._extract_values(makes_payload, ["make"])
filtered = []
for make in makes:
canon = canonicalize(make)
if canon in self.allowed_makes:
filtered.append(make)
return filtered
def run(self) -> FetchCounts:
for year in range(self.min_year, self.max_year + 1):
makes = self._fetch_makes_for_year(year)
if not makes:
print(f"[info] {year}: no allowed makes found, skipping", file=sys.stderr)
continue
print(f"[info] {year}: {len(makes)} makes", file=sys.stderr)
for idx, make in enumerate(makes, 1):
print(f"[{year}] ({idx}/{len(makes)}) {make}", file=sys.stderr)
self._fetch_models_for_make(year, make)
print(f" [{self.counts.pairs_inserted} pairs so far]", file=sys.stderr)
self.conn.commit()
return self.counts
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Fetch VehAPI snapshot into SQLite.")
parser.add_argument("--min-year", type=int, default=int(read_env("MIN_YEAR", DEFAULT_MIN_YEAR)), help="Inclusive min year (default env MIN_YEAR or 2017)")
parser.add_argument("--max-year", type=int, default=int(read_env("MAX_YEAR", DEFAULT_MAX_YEAR)), help="Inclusive max year (default env MAX_YEAR or 2026)")
parser.add_argument("--snapshot-dir", type=str, help="Target snapshot directory (default snapshots/<today>)")
parser.add_argument("--base-url", type=str, default=read_env("VEHAPI_BASE_URL", DEFAULT_BASE_URL), help="VehAPI base URL (e.g. https://vehapi.com/api/v1/car-lists/get/car)")
parser.add_argument("--rate-per-min", type=int, default=int(read_env("VEHAPI_MAX_RPM", DEFAULT_RATE_PER_MIN)), help="Max requests per minute (<=60)")
parser.add_argument("--makes-file", type=str, default="source-makes.txt", help="Path to source-makes.txt")
parser.add_argument("--api-key-file", type=str, default="vehapi.key", help="Path to VehAPI bearer token file")
parser.add_argument("--no-response-cache", action="store_true", help="Disable request cache stored in snapshot.sqlite")
return parser
def read_env(key: str, default: Any) -> Any:
import os
return os.environ.get(key, default)
def main(argv: Sequence[str]) -> int:
parser = build_arg_parser()
args = parser.parse_args(argv)
base_dir = Path(__file__).resolve().parent
snapshot_root = base_dir / "snapshots"
snapshot_dir = ensure_snapshot_dir(snapshot_root, args.snapshot_dir)
snapshot_path = snapshot_dir / "snapshot.sqlite"
meta_path = snapshot_dir / "meta.json"
makes_file = (base_dir / args.makes_file).resolve()
api_key_file = (base_dir / args.api_key_file).resolve()
if not makes_file.exists():
print(f"[error] makes file not found: {makes_file}", file=sys.stderr)
return 1
if not api_key_file.exists():
print(f"[error] api key file not found: {api_key_file}", file=sys.stderr)
return 1
allowed_makes = read_lines(makes_file)
token = read_text_file(api_key_file).strip()
if not token:
print("[error] vehapi.key is empty", file=sys.stderr)
return 1
session = requests.Session()
fetcher = VehapiFetcher(
session=session,
base_url=args.base_url,
token=token,
min_year=args.min_year,
max_year=args.max_year,
allowed_makes=allowed_makes,
snapshot_path=snapshot_path,
responses_cache=not args.no_response_cache,
rate_per_min=args.rate_per_min,
)
started_at = datetime.now(timezone.utc)
counts = fetcher.run()
finished_at = datetime.now(timezone.utc)
meta = {
"generated_at": finished_at.isoformat(),
"started_at": started_at.isoformat(),
"min_year": args.min_year,
"max_year": args.max_year,
"script_version": SCRIPT_VERSION,
"makes_file": str(makes_file),
"makes_hash": sha256_file(makes_file),
"api_base_url": args.base_url,
"snapshot_path": str(snapshot_path),
"pairs_inserted": counts.pairs_inserted,
"fallback_transmissions": counts.fallback_transmissions,
"fallback_engines": counts.fallback_engines,
"response_cache_hits": counts.cache_hits,
}
fetcher._store_meta(meta)
with meta_path.open("w", encoding="utf-8") as fh:
json.dump(meta, fh, indent=2)
print(
f"[done] wrote snapshot to {snapshot_path} with {counts.pairs_inserted} pairs "
f"(fallback trans={counts.fallback_transmissions}, fallback engines={counts.fallback_engines}, cache hits={counts.cache_hits})",
file=sys.stderr,
)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))