#!/usr/bin/env python3 """ NHTSA vPIC Database Downloader Downloads and prepares the NHTSA vPIC database file for ETL processing """ import os import logging import requests import zipfile from pathlib import Path from datetime import datetime from typing import Optional logger = logging.getLogger(__name__) class NHTSADownloader: """Downloads and manages NHTSA vPIC database files""" def __init__(self, download_dir: str = "/app/data"): self.download_dir = Path(download_dir) self.download_dir.mkdir(exist_ok=True) def get_latest_database_url(self) -> str: """ Get the latest NHTSA vPIC database URL Uses July 2025 version as specified """ return "https://vpic.nhtsa.dot.gov/api/vPICList_lite_2025_07.bak.zip" def download_database(self, url: Optional[str] = None) -> Optional[Path]: """ Download NHTSA vPIC database file Args: url: Database URL (defaults to latest) Returns: Path to downloaded .bak file or None if failed """ if url is None: url = self.get_latest_database_url() logger.info(f"Starting download of NHTSA vPIC database from: {url}") try: # Extract filename from URL zip_filename = url.split('/')[-1] zip_path = self.download_dir / zip_filename # Download with progress response = requests.get(url, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) logger.info(f"Downloading {zip_filename} ({total_size:,} bytes)") with open(zip_path, 'wb') as f: downloaded = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0: progress = (downloaded / total_size) * 100 if downloaded % (1024 * 1024 * 10) == 0: # Log every 10MB logger.info(f"Download progress: {progress:.1f}% ({downloaded:,}/{total_size:,} bytes)") logger.info(f"Successfully downloaded: {zip_path}") # Extract the .bak file bak_path = self.extract_bak_file(zip_path) # Clean up zip file zip_path.unlink() logger.info(f"Cleaned up zip file: {zip_path}") return bak_path except Exception as e: logger.error(f"Failed to download database: {e}") return None def extract_bak_file(self, zip_path: Path) -> Path: """ Extract .bak file from zip archive Args: zip_path: Path to zip file Returns: Path to extracted .bak file """ logger.info(f"Extracting .bak file from: {zip_path}") with zipfile.ZipFile(zip_path, 'r') as zip_ref: # Find the .bak file bak_files = [name for name in zip_ref.namelist() if name.endswith('.bak')] if not bak_files: raise ValueError("No .bak file found in zip archive") if len(bak_files) > 1: logger.warning(f"Multiple .bak files found, using first: {bak_files}") bak_filename = bak_files[0] logger.info(f"Extracting: {bak_filename}") # Extract to download directory zip_ref.extract(bak_filename, self.download_dir) bak_path = self.download_dir / bak_filename logger.info(f"Successfully extracted: {bak_path}") return bak_path def get_existing_bak_file(self) -> Optional[Path]: """ Find an existing .bak file in preferred locations. Searches both the shared mount (/app/shared) and local download dir (/app/data). Returns: Path to most recent .bak file or None """ search_dirs = [Path("/app/shared"), self.download_dir] candidates = [] for d in search_dirs: try: if d.exists(): candidates.extend(list(d.glob("*.bak"))) except Exception as e: logger.debug(f"Skipping directory {d}: {e}") if candidates: latest_bak = max(candidates, key=lambda p: p.stat().st_mtime) logger.info(f"Found existing .bak file: {latest_bak}") return latest_bak return None def ensure_database_file(self, force_download: bool = False) -> Optional[Path]: """ Ensure we have a database file - download if needed Args: force_download: Force download even if file exists Returns: Path to .bak file or None if failed """ if not force_download: existing_file = self.get_existing_bak_file() if existing_file: logger.info(f"Using existing database file: {existing_file}") return existing_file logger.info("Downloading fresh database file...") return self.download_database() def get_database_info(self, bak_path: Path) -> dict: """ Get information about the database file Args: bak_path: Path to .bak file Returns: Dictionary with file info """ if not bak_path.exists(): return {"exists": False} stat = bak_path.stat() return { "exists": True, "path": str(bak_path), "size_mb": round(stat.st_size / (1024 * 1024), 1), "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "name": bak_path.name }