""" Manual JSON Pipeline for Vehicle Data Processing Coordinates end-to-end processing of JSON vehicle data: 1. Extract data from JSON files 2. Load data into PostgreSQL database 3. Progress tracking and comprehensive reporting Key Features: - Full extraction→loading workflow coordination - Clear/append mode support - Progress tracking with detailed statistics - Comprehensive error handling and reporting - Performance monitoring and optimization - Referential integrity validation Usage: pipeline = ManualJsonPipeline(sources_dir="sources/makes") result = pipeline.run(mode=LoadMode.APPEND, progress_callback=print_progress) """ import logging import time from typing import List, Dict, Optional, Callable, Tuple from dataclasses import dataclass from pathlib import Path # Import our components (handle both relative and direct imports) try: from ..extractors.json_extractor import JsonExtractor, ExtractionResult from ..loaders.json_manual_loader import JsonManualLoader, LoadMode, LoadResult from ..utils.make_name_mapper import MakeNameMapper from ..utils.engine_spec_parser import EngineSpecParser except ImportError: # Fallback for direct execution import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from extractors.json_extractor import JsonExtractor, ExtractionResult from loaders.json_manual_loader import JsonManualLoader, LoadMode, LoadResult from utils.make_name_mapper import MakeNameMapper from utils.engine_spec_parser import EngineSpecParser logger = logging.getLogger(__name__) @dataclass class PipelineConfig: """Pipeline configuration options""" sources_directory: str # Directory containing JSON files load_mode: LoadMode = LoadMode.APPEND # Loading mode enable_progress_tracking: bool = True # Enable progress callbacks validate_integrity: bool = True # Validate referential integrity after loading batch_size: int = 1000 # Database batch size log_level: str = "INFO" # Logging level def __post_init__(self): """Validate configuration""" if not self.sources_directory: raise ValueError("sources_directory is required") if not Path(self.sources_directory).exists(): raise ValueError(f"Sources directory does not exist: {self.sources_directory}") @dataclass class PipelineResult: """Complete pipeline execution result""" # Configuration config: PipelineConfig # Timing start_time: float end_time: float # Extraction results extraction_result: ExtractionResult # Loading results load_result: LoadResult # Performance metrics total_files_processed: int total_records_loaded: int files_per_second: float records_per_second: float # Quality metrics extraction_success_rate: float loading_success_rate: float overall_success_rate: float # Validation results integrity_issues: List[str] @property def duration_seconds(self) -> float: return self.end_time - self.start_time @property def duration_minutes(self) -> float: return self.duration_seconds / 60.0 @property def was_successful(self) -> bool: """True if pipeline completed without critical errors""" return (self.extraction_result.failed_extractions == 0 and len(self.load_result.failed_makes) == 0 and len(self.integrity_issues) == 0) class PipelineProgress: """Progress tracking for pipeline execution""" def __init__(self, total_files: int): self.total_files = total_files self.current_file = 0 self.current_phase = "Starting" self.start_time = time.time() self.phase_start_time = time.time() def update_phase(self, phase: str) -> None: """Update current phase""" self.current_phase = phase self.phase_start_time = time.time() def update_file_progress(self, files_completed: int) -> None: """Update file progress""" self.current_file = files_completed def get_progress_info(self) -> Dict[str, any]: """Get current progress information""" elapsed = time.time() - self.start_time phase_elapsed = time.time() - self.phase_start_time if self.current_file > 0: files_per_second = self.current_file / elapsed eta_seconds = (self.total_files - self.current_file) / files_per_second if files_per_second > 0 else 0 else: files_per_second = 0 eta_seconds = 0 return { 'phase': self.current_phase, 'files_completed': self.current_file, 'total_files': self.total_files, 'percentage': (self.current_file / self.total_files * 100) if self.total_files > 0 else 0, 'elapsed_seconds': elapsed, 'phase_elapsed_seconds': phase_elapsed, 'files_per_second': files_per_second, 'eta_seconds': eta_seconds } class ManualJsonPipeline: """End-to-end JSON processing pipeline""" def __init__(self, sources_dir: str, config: Optional[PipelineConfig] = None): """ Initialize pipeline Args: sources_dir: Directory containing JSON files config: Pipeline configuration (optional) """ self.sources_dir = sources_dir self.config = config or PipelineConfig(sources_directory=sources_dir) # Initialize components self.make_mapper = MakeNameMapper() self.engine_parser = EngineSpecParser() self.extractor = JsonExtractor(self.make_mapper, self.engine_parser) self.loader = JsonManualLoader() # Progress tracking self.progress_callback: Optional[Callable[[Dict[str, any]], None]] = None logger.info(f"ManualJsonPipeline initialized for {sources_dir}") def set_progress_callback(self, callback: Callable[[Dict[str, any]], None]) -> None: """ Set progress callback function Args: callback: Function to call with progress updates """ self.progress_callback = callback def _update_progress(self, progress: PipelineProgress) -> None: """Update progress via callback if configured""" if self.progress_callback and self.config.enable_progress_tracking: progress_info = progress.get_progress_info() self.progress_callback(progress_info) def run(self, mode: Optional[LoadMode] = None, progress_callback: Optional[Callable] = None) -> PipelineResult: """ Execute complete pipeline Args: mode: Loading mode (overrides config) progress_callback: Progress callback function (overrides config) Returns: PipelineResult with complete execution details """ start_time = time.time() # Override config if specified load_mode = mode or self.config.load_mode if progress_callback: self.set_progress_callback(progress_callback) logger.info(f"Starting manual JSON pipeline in {load_mode.value} mode") logger.info(f"Processing directory: {self.sources_dir}") try: # Count files for progress tracking json_files = list(Path(self.sources_dir).glob("*.json")) total_files = len(json_files) if total_files == 0: raise ValueError(f"No JSON files found in {self.sources_dir}") progress = PipelineProgress(total_files) # Phase 1: Extract data from JSON files progress.update_phase("Extracting data from JSON files") self._update_progress(progress) logger.info(f"Phase 1: Extracting data from {total_files} JSON files") extraction_result = self.extractor.extract_all_makes(self.sources_dir) progress.update_file_progress(extraction_result.total_files_processed) self._update_progress(progress) if extraction_result.failed_extractions > 0: logger.warning(f"Extraction completed with {extraction_result.failed_extractions} failures") else: logger.info(f"Extraction completed successfully: {extraction_result.total_models} models, {extraction_result.total_engines} engines") # Phase 2: Load data into database progress.update_phase("Loading data into database") self._update_progress(progress) logger.info(f"Phase 2: Loading {len(extraction_result.makes)} makes into database ({load_mode.value} mode)") load_result = self.loader.load_all_makes(extraction_result.makes, load_mode) if len(load_result.failed_makes) > 0: logger.warning(f"Loading completed with {len(load_result.failed_makes)} failures") else: logger.info(f"Loading completed successfully: {load_result.success_count} makes loaded") # Phase 3: Validate referential integrity (if enabled) integrity_issues = [] if self.config.validate_integrity: progress.update_phase("Validating referential integrity") self._update_progress(progress) logger.info("Phase 3: Validating referential integrity") integrity_issues = self.loader.validate_referential_integrity() if integrity_issues: logger.warning(f"Referential integrity issues found: {len(integrity_issues)}") else: logger.info("Referential integrity validation passed") # Calculate performance metrics end_time = time.time() duration = end_time - start_time files_per_second = total_files / duration if duration > 0 else 0 total_records = (load_result.total_models + load_result.total_engines + load_result.total_trims + load_result.total_trim_engine_mappings) records_per_second = total_records / duration if duration > 0 else 0 # Calculate success rates extraction_success_rate = extraction_result.success_rate loading_success_rate = load_result.success_rate overall_success_rate = min(extraction_success_rate, loading_success_rate) # Create result result = PipelineResult( config=self.config, start_time=start_time, end_time=end_time, extraction_result=extraction_result, load_result=load_result, total_files_processed=total_files, total_records_loaded=total_records, files_per_second=files_per_second, records_per_second=records_per_second, extraction_success_rate=extraction_success_rate, loading_success_rate=loading_success_rate, overall_success_rate=overall_success_rate, integrity_issues=integrity_issues ) progress.update_phase("Pipeline complete") self._update_progress(progress) logger.info(f"Pipeline completed in {result.duration_seconds:.1f} seconds") logger.info(f"Performance: {files_per_second:.1f} files/sec, {records_per_second:.0f} records/sec") logger.info(f"Overall success rate: {overall_success_rate:.1%}") return result except Exception as e: end_time = time.time() logger.error(f"Pipeline failed after {end_time - start_time:.1f} seconds: {str(e)}") raise def run_extraction_only(self) -> ExtractionResult: """ Run extraction phase only (for testing/validation) Returns: ExtractionResult with extracted data """ logger.info("Running extraction-only pipeline") result = self.extractor.extract_all_makes(self.sources_dir) logger.info(f"Extraction complete: {result.total_models} models, {result.total_engines} engines") logger.info(f"Success rate: {result.success_rate:.1%}") return result def get_source_statistics(self) -> Dict[str, any]: """ Get statistics about source JSON files Returns: Dictionary with source file statistics """ json_files = list(Path(self.sources_dir).glob("*.json")) total_size_bytes = sum(f.stat().st_size for f in json_files) return { 'total_files': len(json_files), 'total_size_bytes': total_size_bytes, 'total_size_mb': total_size_bytes / (1024 * 1024), 'average_file_size_kb': (total_size_bytes / len(json_files) / 1024) if json_files else 0, 'directory': str(self.sources_dir) } def print_pipeline_report(self, result: PipelineResult) -> None: """ Print comprehensive pipeline execution report Args: result: PipelineResult from pipeline execution """ print(f"🚀 MANUAL JSON PIPELINE EXECUTION REPORT") print(f"=" * 60) # Configuration print(f"\n⚙️ CONFIGURATION") print(f" Sources directory: {result.config.sources_directory}") print(f" Load mode: {result.config.load_mode.value.upper()}") print(f" Batch size: {result.config.batch_size}") print(f" Integrity validation: {'Enabled' if result.config.validate_integrity else 'Disabled'}") # Performance print(f"\n⏱️ PERFORMANCE") print(f" Total duration: {result.duration_seconds:.1f} seconds ({result.duration_minutes:.1f} minutes)") print(f" Files processed: {result.total_files_processed}") print(f" Records loaded: {result.total_records_loaded:,}") print(f" Processing rate: {result.files_per_second:.1f} files/sec") print(f" Loading rate: {result.records_per_second:,.0f} records/sec") # Success rates print(f"\n📊 SUCCESS RATES") print(f" Extraction: {result.extraction_success_rate:.1%}") print(f" Loading: {result.loading_success_rate:.1%}") print(f" Overall: {result.overall_success_rate:.1%}") # Data summary print(f"\n📈 DATA PROCESSED") print(f" Makes: {result.load_result.total_makes}") print(f" Models: {result.load_result.total_models}") print(f" Model years: {result.load_result.total_model_years}") print(f" Trims: {result.load_result.total_trims}") print(f" Engines: {result.load_result.total_engines}") print(f" Trim-engine mappings: {result.load_result.total_trim_engine_mappings}") # Issues if result.load_result.failed_makes: print(f"\n⚠️ FAILED MAKES ({len(result.load_result.failed_makes)}):") for make in result.load_result.failed_makes: print(f" {make}") if result.integrity_issues: print(f"\n❌ REFERENTIAL INTEGRITY ISSUES ({len(result.integrity_issues)}):") for issue in result.integrity_issues: print(f" {issue}") else: print(f"\n✅ REFERENTIAL INTEGRITY: PASSED") # Final status print(f"\n🎯 PIPELINE STATUS: {'SUCCESS' if result.was_successful else 'COMPLETED WITH ISSUES'}") def default_progress_callback(progress_info: Dict[str, any]) -> None: """Default progress callback that prints to console""" percentage = progress_info['percentage'] phase = progress_info['phase'] files_completed = progress_info['files_completed'] total_files = progress_info['total_files'] if progress_info['files_per_second'] > 0: eta_minutes = progress_info['eta_seconds'] / 60 print(f"[{percentage:5.1f}%] {phase}: {files_completed}/{total_files} files " f"({progress_info['files_per_second']:.1f} files/sec, ETA: {eta_minutes:.1f}min)") else: print(f"[{percentage:5.1f}%] {phase}: {files_completed}/{total_files} files") # Example usage and testing functions def example_usage(): """Demonstrate ManualJsonPipeline usage""" print("🚀 ManualJsonPipeline Example Usage") print("=" * 40) sources_dir = "sources/makes" if not Path(sources_dir).exists(): print(f"❌ Sources directory not found: {sources_dir}") return print(f"\n💡 Example pipeline execution:") print(f""" # Create pipeline with configuration config = PipelineConfig( sources_directory="{sources_dir}", load_mode=LoadMode.APPEND, enable_progress_tracking=True, validate_integrity=True ) pipeline = ManualJsonPipeline("{sources_dir}", config) # Run with progress tracking result = pipeline.run(progress_callback=default_progress_callback) # Print comprehensive report pipeline.print_pipeline_report(result) """) # Show source statistics try: pipeline = ManualJsonPipeline(sources_dir) stats = pipeline.get_source_statistics() print(f"\n📊 Source Directory Statistics:") print(f" Files: {stats['total_files']}") print(f" Total size: {stats['total_size_mb']:.1f} MB") print(f" Average file size: {stats['average_file_size_kb']:.1f} KB") except Exception as e: print(f"⚠️ Could not get source statistics: {e}") if __name__ == "__main__": example_usage()