Files
motovaultpro/mvp-platform-services/vehicles/etl/pipelines/manual_json_pipeline.py
Eric Gullickson a052040e3a Initial Commit
2025-09-17 16:09:15 -05:00

465 lines
18 KiB
Python

"""
Manual JSON Pipeline for Vehicle Data Processing
Coordinates end-to-end processing of JSON vehicle data:
1. Extract data from JSON files
2. Load data into PostgreSQL database
3. Progress tracking and comprehensive reporting
Key Features:
- Full extraction→loading workflow coordination
- Clear/append mode support
- Progress tracking with detailed statistics
- Comprehensive error handling and reporting
- Performance monitoring and optimization
- Referential integrity validation
Usage:
pipeline = ManualJsonPipeline(sources_dir="sources/makes")
result = pipeline.run(mode=LoadMode.APPEND, progress_callback=print_progress)
"""
import logging
import time
from typing import List, Dict, Optional, Callable, Tuple
from dataclasses import dataclass
from pathlib import Path
# Import our components (handle both relative and direct imports)
try:
from ..extractors.json_extractor import JsonExtractor, ExtractionResult
from ..loaders.json_manual_loader import JsonManualLoader, LoadMode, LoadResult
from ..utils.make_name_mapper import MakeNameMapper
from ..utils.engine_spec_parser import EngineSpecParser
except ImportError:
# Fallback for direct execution
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from extractors.json_extractor import JsonExtractor, ExtractionResult
from loaders.json_manual_loader import JsonManualLoader, LoadMode, LoadResult
from utils.make_name_mapper import MakeNameMapper
from utils.engine_spec_parser import EngineSpecParser
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
"""Pipeline configuration options"""
sources_directory: str # Directory containing JSON files
load_mode: LoadMode = LoadMode.APPEND # Loading mode
enable_progress_tracking: bool = True # Enable progress callbacks
validate_integrity: bool = True # Validate referential integrity after loading
batch_size: int = 1000 # Database batch size
log_level: str = "INFO" # Logging level
def __post_init__(self):
"""Validate configuration"""
if not self.sources_directory:
raise ValueError("sources_directory is required")
if not Path(self.sources_directory).exists():
raise ValueError(f"Sources directory does not exist: {self.sources_directory}")
@dataclass
class PipelineResult:
"""Complete pipeline execution result"""
# Configuration
config: PipelineConfig
# Timing
start_time: float
end_time: float
# Extraction results
extraction_result: ExtractionResult
# Loading results
load_result: LoadResult
# Performance metrics
total_files_processed: int
total_records_loaded: int
files_per_second: float
records_per_second: float
# Quality metrics
extraction_success_rate: float
loading_success_rate: float
overall_success_rate: float
# Validation results
integrity_issues: List[str]
@property
def duration_seconds(self) -> float:
return self.end_time - self.start_time
@property
def duration_minutes(self) -> float:
return self.duration_seconds / 60.0
@property
def was_successful(self) -> bool:
"""True if pipeline completed without critical errors"""
return (self.extraction_result.failed_extractions == 0 and
len(self.load_result.failed_makes) == 0 and
len(self.integrity_issues) == 0)
class PipelineProgress:
"""Progress tracking for pipeline execution"""
def __init__(self, total_files: int):
self.total_files = total_files
self.current_file = 0
self.current_phase = "Starting"
self.start_time = time.time()
self.phase_start_time = time.time()
def update_phase(self, phase: str) -> None:
"""Update current phase"""
self.current_phase = phase
self.phase_start_time = time.time()
def update_file_progress(self, files_completed: int) -> None:
"""Update file progress"""
self.current_file = files_completed
def get_progress_info(self) -> Dict[str, any]:
"""Get current progress information"""
elapsed = time.time() - self.start_time
phase_elapsed = time.time() - self.phase_start_time
if self.current_file > 0:
files_per_second = self.current_file / elapsed
eta_seconds = (self.total_files - self.current_file) / files_per_second if files_per_second > 0 else 0
else:
files_per_second = 0
eta_seconds = 0
return {
'phase': self.current_phase,
'files_completed': self.current_file,
'total_files': self.total_files,
'percentage': (self.current_file / self.total_files * 100) if self.total_files > 0 else 0,
'elapsed_seconds': elapsed,
'phase_elapsed_seconds': phase_elapsed,
'files_per_second': files_per_second,
'eta_seconds': eta_seconds
}
class ManualJsonPipeline:
"""End-to-end JSON processing pipeline"""
def __init__(self, sources_dir: str, config: Optional[PipelineConfig] = None):
"""
Initialize pipeline
Args:
sources_dir: Directory containing JSON files
config: Pipeline configuration (optional)
"""
self.sources_dir = sources_dir
self.config = config or PipelineConfig(sources_directory=sources_dir)
# Initialize components
self.make_mapper = MakeNameMapper()
self.engine_parser = EngineSpecParser()
self.extractor = JsonExtractor(self.make_mapper, self.engine_parser)
self.loader = JsonManualLoader()
# Progress tracking
self.progress_callback: Optional[Callable[[Dict[str, any]], None]] = None
logger.info(f"ManualJsonPipeline initialized for {sources_dir}")
def set_progress_callback(self, callback: Callable[[Dict[str, any]], None]) -> None:
"""
Set progress callback function
Args:
callback: Function to call with progress updates
"""
self.progress_callback = callback
def _update_progress(self, progress: PipelineProgress) -> None:
"""Update progress via callback if configured"""
if self.progress_callback and self.config.enable_progress_tracking:
progress_info = progress.get_progress_info()
self.progress_callback(progress_info)
def run(self, mode: Optional[LoadMode] = None, progress_callback: Optional[Callable] = None) -> PipelineResult:
"""
Execute complete pipeline
Args:
mode: Loading mode (overrides config)
progress_callback: Progress callback function (overrides config)
Returns:
PipelineResult with complete execution details
"""
start_time = time.time()
# Override config if specified
load_mode = mode or self.config.load_mode
if progress_callback:
self.set_progress_callback(progress_callback)
logger.info(f"Starting manual JSON pipeline in {load_mode.value} mode")
logger.info(f"Processing directory: {self.sources_dir}")
try:
# Count files for progress tracking
json_files = list(Path(self.sources_dir).glob("*.json"))
total_files = len(json_files)
if total_files == 0:
raise ValueError(f"No JSON files found in {self.sources_dir}")
progress = PipelineProgress(total_files)
# Phase 1: Extract data from JSON files
progress.update_phase("Extracting data from JSON files")
self._update_progress(progress)
logger.info(f"Phase 1: Extracting data from {total_files} JSON files")
extraction_result = self.extractor.extract_all_makes(self.sources_dir)
progress.update_file_progress(extraction_result.total_files_processed)
self._update_progress(progress)
if extraction_result.failed_extractions > 0:
logger.warning(f"Extraction completed with {extraction_result.failed_extractions} failures")
else:
logger.info(f"Extraction completed successfully: {extraction_result.total_models} models, {extraction_result.total_engines} engines")
# Phase 2: Load data into database
progress.update_phase("Loading data into database")
self._update_progress(progress)
logger.info(f"Phase 2: Loading {len(extraction_result.makes)} makes into database ({load_mode.value} mode)")
load_result = self.loader.load_all_makes(extraction_result.makes, load_mode)
if len(load_result.failed_makes) > 0:
logger.warning(f"Loading completed with {len(load_result.failed_makes)} failures")
else:
logger.info(f"Loading completed successfully: {load_result.success_count} makes loaded")
# Phase 3: Validate referential integrity (if enabled)
integrity_issues = []
if self.config.validate_integrity:
progress.update_phase("Validating referential integrity")
self._update_progress(progress)
logger.info("Phase 3: Validating referential integrity")
integrity_issues = self.loader.validate_referential_integrity()
if integrity_issues:
logger.warning(f"Referential integrity issues found: {len(integrity_issues)}")
else:
logger.info("Referential integrity validation passed")
# Calculate performance metrics
end_time = time.time()
duration = end_time - start_time
files_per_second = total_files / duration if duration > 0 else 0
total_records = (load_result.total_models + load_result.total_engines +
load_result.total_trims + load_result.total_trim_engine_mappings)
records_per_second = total_records / duration if duration > 0 else 0
# Calculate success rates
extraction_success_rate = extraction_result.success_rate
loading_success_rate = load_result.success_rate
overall_success_rate = min(extraction_success_rate, loading_success_rate)
# Create result
result = PipelineResult(
config=self.config,
start_time=start_time,
end_time=end_time,
extraction_result=extraction_result,
load_result=load_result,
total_files_processed=total_files,
total_records_loaded=total_records,
files_per_second=files_per_second,
records_per_second=records_per_second,
extraction_success_rate=extraction_success_rate,
loading_success_rate=loading_success_rate,
overall_success_rate=overall_success_rate,
integrity_issues=integrity_issues
)
progress.update_phase("Pipeline complete")
self._update_progress(progress)
logger.info(f"Pipeline completed in {result.duration_seconds:.1f} seconds")
logger.info(f"Performance: {files_per_second:.1f} files/sec, {records_per_second:.0f} records/sec")
logger.info(f"Overall success rate: {overall_success_rate:.1%}")
return result
except Exception as e:
end_time = time.time()
logger.error(f"Pipeline failed after {end_time - start_time:.1f} seconds: {str(e)}")
raise
def run_extraction_only(self) -> ExtractionResult:
"""
Run extraction phase only (for testing/validation)
Returns:
ExtractionResult with extracted data
"""
logger.info("Running extraction-only pipeline")
result = self.extractor.extract_all_makes(self.sources_dir)
logger.info(f"Extraction complete: {result.total_models} models, {result.total_engines} engines")
logger.info(f"Success rate: {result.success_rate:.1%}")
return result
def get_source_statistics(self) -> Dict[str, any]:
"""
Get statistics about source JSON files
Returns:
Dictionary with source file statistics
"""
json_files = list(Path(self.sources_dir).glob("*.json"))
total_size_bytes = sum(f.stat().st_size for f in json_files)
return {
'total_files': len(json_files),
'total_size_bytes': total_size_bytes,
'total_size_mb': total_size_bytes / (1024 * 1024),
'average_file_size_kb': (total_size_bytes / len(json_files) / 1024) if json_files else 0,
'directory': str(self.sources_dir)
}
def print_pipeline_report(self, result: PipelineResult) -> None:
"""
Print comprehensive pipeline execution report
Args:
result: PipelineResult from pipeline execution
"""
print(f"🚀 MANUAL JSON PIPELINE EXECUTION REPORT")
print(f"=" * 60)
# Configuration
print(f"\n⚙️ CONFIGURATION")
print(f" Sources directory: {result.config.sources_directory}")
print(f" Load mode: {result.config.load_mode.value.upper()}")
print(f" Batch size: {result.config.batch_size}")
print(f" Integrity validation: {'Enabled' if result.config.validate_integrity else 'Disabled'}")
# Performance
print(f"\n⏱️ PERFORMANCE")
print(f" Total duration: {result.duration_seconds:.1f} seconds ({result.duration_minutes:.1f} minutes)")
print(f" Files processed: {result.total_files_processed}")
print(f" Records loaded: {result.total_records_loaded:,}")
print(f" Processing rate: {result.files_per_second:.1f} files/sec")
print(f" Loading rate: {result.records_per_second:,.0f} records/sec")
# Success rates
print(f"\n📊 SUCCESS RATES")
print(f" Extraction: {result.extraction_success_rate:.1%}")
print(f" Loading: {result.loading_success_rate:.1%}")
print(f" Overall: {result.overall_success_rate:.1%}")
# Data summary
print(f"\n📈 DATA PROCESSED")
print(f" Makes: {result.load_result.total_makes}")
print(f" Models: {result.load_result.total_models}")
print(f" Model years: {result.load_result.total_model_years}")
print(f" Trims: {result.load_result.total_trims}")
print(f" Engines: {result.load_result.total_engines}")
print(f" Trim-engine mappings: {result.load_result.total_trim_engine_mappings}")
# Issues
if result.load_result.failed_makes:
print(f"\n⚠️ FAILED MAKES ({len(result.load_result.failed_makes)}):")
for make in result.load_result.failed_makes:
print(f" {make}")
if result.integrity_issues:
print(f"\n❌ REFERENTIAL INTEGRITY ISSUES ({len(result.integrity_issues)}):")
for issue in result.integrity_issues:
print(f" {issue}")
else:
print(f"\n✅ REFERENTIAL INTEGRITY: PASSED")
# Final status
print(f"\n🎯 PIPELINE STATUS: {'SUCCESS' if result.was_successful else 'COMPLETED WITH ISSUES'}")
def default_progress_callback(progress_info: Dict[str, any]) -> None:
"""Default progress callback that prints to console"""
percentage = progress_info['percentage']
phase = progress_info['phase']
files_completed = progress_info['files_completed']
total_files = progress_info['total_files']
if progress_info['files_per_second'] > 0:
eta_minutes = progress_info['eta_seconds'] / 60
print(f"[{percentage:5.1f}%] {phase}: {files_completed}/{total_files} files "
f"({progress_info['files_per_second']:.1f} files/sec, ETA: {eta_minutes:.1f}min)")
else:
print(f"[{percentage:5.1f}%] {phase}: {files_completed}/{total_files} files")
# Example usage and testing functions
def example_usage():
"""Demonstrate ManualJsonPipeline usage"""
print("🚀 ManualJsonPipeline Example Usage")
print("=" * 40)
sources_dir = "sources/makes"
if not Path(sources_dir).exists():
print(f"❌ Sources directory not found: {sources_dir}")
return
print(f"\n💡 Example pipeline execution:")
print(f"""
# Create pipeline with configuration
config = PipelineConfig(
sources_directory="{sources_dir}",
load_mode=LoadMode.APPEND,
enable_progress_tracking=True,
validate_integrity=True
)
pipeline = ManualJsonPipeline("{sources_dir}", config)
# Run with progress tracking
result = pipeline.run(progress_callback=default_progress_callback)
# Print comprehensive report
pipeline.print_pipeline_report(result)
""")
# Show source statistics
try:
pipeline = ManualJsonPipeline(sources_dir)
stats = pipeline.get_source_statistics()
print(f"\n📊 Source Directory Statistics:")
print(f" Files: {stats['total_files']}")
print(f" Total size: {stats['total_size_mb']:.1f} MB")
print(f" Average file size: {stats['average_file_size_kb']:.1f} KB")
except Exception as e:
print(f"⚠️ Could not get source statistics: {e}")
if __name__ == "__main__":
example_usage()