Vehicle ETL Process fixed. Admin settings fixed.
This commit is contained in:
@@ -32,7 +32,7 @@ except ImportError: # pragma: no cover - env guard
|
||||
SCRIPT_VERSION = "vehapi_fetch_snapshot.py@1.1.0"
|
||||
DEFAULT_MIN_YEAR = 2015
|
||||
DEFAULT_MAX_YEAR = 2022
|
||||
DEFAULT_RATE_PER_SEC = 55 # stays under the 60 req/sec ceiling
|
||||
DEFAULT_RATE_PER_MIN = 55 # stays under the 60 req/min ceiling
|
||||
MAX_ATTEMPTS = 5
|
||||
FALLBACK_TRIMS = ["Base"]
|
||||
FALLBACK_TRANSMISSIONS = ["Manual", "Automatic"]
|
||||
@@ -95,22 +95,18 @@ def ensure_snapshot_dir(root: Path, custom_dir: Optional[str]) -> Path:
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""Simple leaky bucket limiter to stay below the VehAPI threshold."""
|
||||
"""Fixed delay limiter to stay below the VehAPI threshold (60 req/min)."""
|
||||
|
||||
def __init__(self, max_per_sec: int) -> None:
|
||||
self.max_per_sec = max_per_sec
|
||||
self._history: List[float] = []
|
||||
def __init__(self, max_per_min: int) -> None:
|
||||
self.delay = 60.0 / max_per_min # ~1.09 sec for 55 rpm
|
||||
self._last_request = 0.0
|
||||
|
||||
def acquire(self) -> None:
|
||||
while True:
|
||||
now = time.monotonic()
|
||||
window_start = now - 1
|
||||
self._history = [ts for ts in self._history if ts >= window_start]
|
||||
if len(self._history) < self.max_per_sec:
|
||||
break
|
||||
sleep_for = max(self._history[0] - window_start, 0.001)
|
||||
time.sleep(sleep_for)
|
||||
self._history.append(time.monotonic())
|
||||
now = time.monotonic()
|
||||
elapsed = now - self._last_request
|
||||
if elapsed < self.delay:
|
||||
time.sleep(self.delay - elapsed)
|
||||
self._last_request = time.monotonic()
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -132,7 +128,7 @@ class VehapiFetcher:
|
||||
allowed_makes: Sequence[str],
|
||||
snapshot_path: Path,
|
||||
responses_cache: bool = True,
|
||||
rate_per_sec: int = DEFAULT_RATE_PER_SEC,
|
||||
rate_per_min: int = DEFAULT_RATE_PER_MIN,
|
||||
) -> None:
|
||||
self.session = session
|
||||
self.base_url = base_url.rstrip("/")
|
||||
@@ -146,7 +142,7 @@ class VehapiFetcher:
|
||||
self.conn.execute("PRAGMA synchronous=NORMAL;")
|
||||
self._init_schema()
|
||||
self.responses_cache = responses_cache
|
||||
self.rate_limiter = RateLimiter(rate_per_sec)
|
||||
self.rate_limiter = RateLimiter(rate_per_min)
|
||||
self.counts = FetchCounts()
|
||||
|
||||
def _init_schema(self) -> None:
|
||||
@@ -251,7 +247,7 @@ class VehapiFetcher:
|
||||
retry_seconds = float(retry_after)
|
||||
except (TypeError, ValueError):
|
||||
retry_seconds = 30.0
|
||||
sleep_for = retry_seconds + random.uniform(0, 3)
|
||||
sleep_for = retry_seconds + random.uniform(0, 0.5)
|
||||
print(f"[info] {label}: hit 429, sleeping {sleep_for:.1f}s before retry", file=sys.stderr)
|
||||
time.sleep(sleep_for)
|
||||
backoff = min(backoff * 2, 30)
|
||||
@@ -374,6 +370,7 @@ class VehapiFetcher:
|
||||
self._fetch_engines_for_transmission(year, make, model, trim, trans, trans_bucket)
|
||||
|
||||
def _fetch_trims_for_model(self, year: int, make: str, model: str) -> None:
|
||||
print(f" -> {year} {make} {model}", file=sys.stderr)
|
||||
path = ["trims", str(year), make, model]
|
||||
label = f"trims:{year}/{make}/{model}"
|
||||
trims_payload = self._request_json(path, label)
|
||||
@@ -416,9 +413,10 @@ class VehapiFetcher:
|
||||
print(f"[info] {year}: no allowed makes found, skipping", file=sys.stderr)
|
||||
continue
|
||||
print(f"[info] {year}: {len(makes)} makes", file=sys.stderr)
|
||||
for make in makes:
|
||||
print(f"[info] {year} {make}: fetching models", file=sys.stderr)
|
||||
for idx, make in enumerate(makes, 1):
|
||||
print(f"[{year}] ({idx}/{len(makes)}) {make}", file=sys.stderr)
|
||||
self._fetch_models_for_make(year, make)
|
||||
print(f" [{self.counts.pairs_inserted} pairs so far]", file=sys.stderr)
|
||||
self.conn.commit()
|
||||
return self.counts
|
||||
|
||||
@@ -429,7 +427,7 @@ def build_arg_parser() -> argparse.ArgumentParser:
|
||||
parser.add_argument("--max-year", type=int, default=int(read_env("MAX_YEAR", DEFAULT_MAX_YEAR)), help="Inclusive max year (default env MAX_YEAR or 2026)")
|
||||
parser.add_argument("--snapshot-dir", type=str, help="Target snapshot directory (default snapshots/<today>)")
|
||||
parser.add_argument("--base-url", type=str, default=read_env("VEHAPI_BASE_URL", DEFAULT_BASE_URL), help="VehAPI base URL (e.g. https://vehapi.com/api/v1/car-lists/get/car)")
|
||||
parser.add_argument("--rate-per-sec", type=int, default=int(read_env("VEHAPI_MAX_RPS", DEFAULT_RATE_PER_SEC)), help="Max requests per second (<=60)")
|
||||
parser.add_argument("--rate-per-min", type=int, default=int(read_env("VEHAPI_MAX_RPM", DEFAULT_RATE_PER_MIN)), help="Max requests per minute (<=60)")
|
||||
parser.add_argument("--makes-file", type=str, default="source-makes.txt", help="Path to source-makes.txt")
|
||||
parser.add_argument("--api-key-file", type=str, default="vehapi.key", help="Path to VehAPI bearer token file")
|
||||
parser.add_argument("--no-response-cache", action="store_true", help="Disable request cache stored in snapshot.sqlite")
|
||||
@@ -477,7 +475,7 @@ def main(argv: Sequence[str]) -> int:
|
||||
allowed_makes=allowed_makes,
|
||||
snapshot_path=snapshot_path,
|
||||
responses_cache=not args.no_response_cache,
|
||||
rate_per_sec=args.rate_per_sec,
|
||||
rate_per_min=args.rate_per_min,
|
||||
)
|
||||
|
||||
started_at = datetime.now(timezone.utc)
|
||||
|
||||
Reference in New Issue
Block a user