Files
motovaultpro/mvp-platform-services/vehicles/etl/downloaders/nhtsa_downloader.py
Eric Gullickson a052040e3a Initial Commit
2025-09-17 16:09:15 -05:00

181 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""
NHTSA vPIC Database Downloader
Downloads and prepares the NHTSA vPIC database file for ETL processing
"""
import os
import logging
import requests
import zipfile
from pathlib import Path
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
class NHTSADownloader:
"""Downloads and manages NHTSA vPIC database files"""
def __init__(self, download_dir: str = "/app/data"):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
def get_latest_database_url(self) -> str:
"""
Get the latest NHTSA vPIC database URL
Uses July 2025 version as specified
"""
return "https://vpic.nhtsa.dot.gov/api/vPICList_lite_2025_07.bak.zip"
def download_database(self, url: Optional[str] = None) -> Optional[Path]:
"""
Download NHTSA vPIC database file
Args:
url: Database URL (defaults to latest)
Returns:
Path to downloaded .bak file or None if failed
"""
if url is None:
url = self.get_latest_database_url()
logger.info(f"Starting download of NHTSA vPIC database from: {url}")
try:
# Extract filename from URL
zip_filename = url.split('/')[-1]
zip_path = self.download_dir / zip_filename
# Download with progress
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
logger.info(f"Downloading {zip_filename} ({total_size:,} bytes)")
with open(zip_path, 'wb') as f:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
if downloaded % (1024 * 1024 * 10) == 0: # Log every 10MB
logger.info(f"Download progress: {progress:.1f}% ({downloaded:,}/{total_size:,} bytes)")
logger.info(f"Successfully downloaded: {zip_path}")
# Extract the .bak file
bak_path = self.extract_bak_file(zip_path)
# Clean up zip file
zip_path.unlink()
logger.info(f"Cleaned up zip file: {zip_path}")
return bak_path
except Exception as e:
logger.error(f"Failed to download database: {e}")
return None
def extract_bak_file(self, zip_path: Path) -> Path:
"""
Extract .bak file from zip archive
Args:
zip_path: Path to zip file
Returns:
Path to extracted .bak file
"""
logger.info(f"Extracting .bak file from: {zip_path}")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Find the .bak file
bak_files = [name for name in zip_ref.namelist() if name.endswith('.bak')]
if not bak_files:
raise ValueError("No .bak file found in zip archive")
if len(bak_files) > 1:
logger.warning(f"Multiple .bak files found, using first: {bak_files}")
bak_filename = bak_files[0]
logger.info(f"Extracting: {bak_filename}")
# Extract to download directory
zip_ref.extract(bak_filename, self.download_dir)
bak_path = self.download_dir / bak_filename
logger.info(f"Successfully extracted: {bak_path}")
return bak_path
def get_existing_bak_file(self) -> Optional[Path]:
"""
Find an existing .bak file in preferred locations.
Searches both the shared mount (/app/shared) and local download dir (/app/data).
Returns:
Path to most recent .bak file or None
"""
search_dirs = [Path("/app/shared"), self.download_dir]
candidates = []
for d in search_dirs:
try:
if d.exists():
candidates.extend(list(d.glob("*.bak")))
except Exception as e:
logger.debug(f"Skipping directory {d}: {e}")
if candidates:
latest_bak = max(candidates, key=lambda p: p.stat().st_mtime)
logger.info(f"Found existing .bak file: {latest_bak}")
return latest_bak
return None
def ensure_database_file(self, force_download: bool = False) -> Optional[Path]:
"""
Ensure we have a database file - download if needed
Args:
force_download: Force download even if file exists
Returns:
Path to .bak file or None if failed
"""
if not force_download:
existing_file = self.get_existing_bak_file()
if existing_file:
logger.info(f"Using existing database file: {existing_file}")
return existing_file
logger.info("Downloading fresh database file...")
return self.download_database()
def get_database_info(self, bak_path: Path) -> dict:
"""
Get information about the database file
Args:
bak_path: Path to .bak file
Returns:
Dictionary with file info
"""
if not bak_path.exists():
return {"exists": False}
stat = bak_path.stat()
return {
"exists": True,
"path": str(bak_path),
"size_mb": round(stat.st_size / (1024 * 1024), 1),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"name": bak_path.name
}