Files
motovaultpro/mvp-platform-services/vehicles/etl/pipeline.py
Eric Gullickson a052040e3a Initial Commit
2025-09-17 16:09:15 -05:00

93 lines
4.1 KiB
Python

#!/usr/bin/env python3
import logging
from datetime import datetime
from .config import config
from .builders.normalized_vehicle_builder import NormalizedVehicleBuilder
from .utils.make_filter import MakeFilter
from .connections import test_connections
from .downloaders.nhtsa_downloader import NHTSADownloader
from .loaders.mssql_loader import MSSQLLoader
from .extractors.vin_proc_extractor import VinProcExtractor
logger = logging.getLogger(__name__)
def run_etl_pipeline():
"""Complete ETL pipeline execution including download and database loading"""
logger.info("Starting complete ETL pipeline")
start_time = datetime.now()
try:
# Step 1: Download NHTSA database file
logger.info("Step 1: Downloading NHTSA vPIC database")
downloader = NHTSADownloader()
bak_file = downloader.ensure_database_file(force_download=False)
if not bak_file:
logger.error("Failed to obtain NHTSA database file")
return False
db_info = downloader.get_database_info(bak_file)
logger.info(f"Using database file: {db_info['name']} ({db_info['size_mb']} MB)")
# Step 2: Load database into MSSQL
logger.info("Step 2: Loading database into MSSQL Server")
mssql_loader = MSSQLLoader()
if not mssql_loader.test_connection():
logger.error("MSSQL connection test failed")
return False
if not mssql_loader.restore_database(bak_file):
logger.error("Failed to restore database to MSSQL")
return False
# Verify MSSQL database content
content_info = mssql_loader.verify_database_content()
logger.info(f"MSSQL database loaded with tables: {content_info}")
# Step 2b: Research stored procedure definition/output for parity
try:
logger.info("Step 2b: Inspecting MSSQL VIN decode stored procedure for parity")
vpe = VinProcExtractor()
meta = vpe.find_proc()
if meta:
logger.info(f"VIN proc found: {meta['schema_name']}.{meta['object_name']} ({meta['type_desc']})")
definition = vpe.get_definition(meta['schema_name'], meta['object_name'])
logger.debug(f"VIN proc definition (first 500 chars): {definition[:500]}")
sample = vpe.sample_execute('1G1YU3D64H5602799')
if sample is not None:
logger.info(f"VIN proc sample output columns: {list(sample[0].keys()) if sample else 'no rows'}")
else:
logger.warning("VIN decode proc not found by pattern; continuing with catalog build")
except Exception as e:
logger.warning(f"VIN proc inspection failed (non-fatal): {e}")
# Step 3: Test all connections (MSSQL + PostgreSQL)
logger.info("Step 3: Testing all database connections")
if not test_connections():
logger.error("Connection test failed after database loading")
return False
# Step 4: Build normalized PostgreSQL schema from MSSQL with make filtering
logger.info("Step 4: Building normalized PostgreSQL vehicle schema from MSSQL with make filtering")
make_filter = MakeFilter()
builder = NormalizedVehicleBuilder(make_filter)
success = builder.build()
elapsed = datetime.now() - start_time
if success:
logger.info(f"Complete ETL pipeline finished successfully in {elapsed}")
logger.info("✅ ETL Summary:")
logger.info(f" - Downloaded: {db_info['name']} ({db_info['size_mb']} MB)")
logger.info(f" - MSSQL Tables: {content_info}")
logger.info(f" - PostgreSQL normalized schema: Built successfully")
return True
else:
logger.error(f"ETL pipeline failed during normalized schema building after {elapsed}")
return False
except Exception as e:
elapsed = datetime.now() - start_time
logger.error(f"ETL pipeline crashed after {elapsed}: {e}", exc_info=True)
return False