465 lines
18 KiB
Python
465 lines
18 KiB
Python
"""
|
|
Manual JSON Pipeline for Vehicle Data Processing
|
|
|
|
Coordinates end-to-end processing of JSON vehicle data:
|
|
1. Extract data from JSON files
|
|
2. Load data into PostgreSQL database
|
|
3. Progress tracking and comprehensive reporting
|
|
|
|
Key Features:
|
|
- Full extraction→loading workflow coordination
|
|
- Clear/append mode support
|
|
- Progress tracking with detailed statistics
|
|
- Comprehensive error handling and reporting
|
|
- Performance monitoring and optimization
|
|
- Referential integrity validation
|
|
|
|
Usage:
|
|
pipeline = ManualJsonPipeline(sources_dir="sources/makes")
|
|
result = pipeline.run(mode=LoadMode.APPEND, progress_callback=print_progress)
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import List, Dict, Optional, Callable, Tuple
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
# Import our components (handle both relative and direct imports)
|
|
try:
|
|
from ..extractors.json_extractor import JsonExtractor, ExtractionResult
|
|
from ..loaders.json_manual_loader import JsonManualLoader, LoadMode, LoadResult
|
|
from ..utils.make_name_mapper import MakeNameMapper
|
|
from ..utils.engine_spec_parser import EngineSpecParser
|
|
except ImportError:
|
|
# Fallback for direct execution
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
from extractors.json_extractor import JsonExtractor, ExtractionResult
|
|
from loaders.json_manual_loader import JsonManualLoader, LoadMode, LoadResult
|
|
from utils.make_name_mapper import MakeNameMapper
|
|
from utils.engine_spec_parser import EngineSpecParser
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class PipelineConfig:
|
|
"""Pipeline configuration options"""
|
|
sources_directory: str # Directory containing JSON files
|
|
load_mode: LoadMode = LoadMode.APPEND # Loading mode
|
|
enable_progress_tracking: bool = True # Enable progress callbacks
|
|
validate_integrity: bool = True # Validate referential integrity after loading
|
|
batch_size: int = 1000 # Database batch size
|
|
log_level: str = "INFO" # Logging level
|
|
|
|
def __post_init__(self):
|
|
"""Validate configuration"""
|
|
if not self.sources_directory:
|
|
raise ValueError("sources_directory is required")
|
|
|
|
if not Path(self.sources_directory).exists():
|
|
raise ValueError(f"Sources directory does not exist: {self.sources_directory}")
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete pipeline execution result"""
|
|
# Configuration
|
|
config: PipelineConfig
|
|
|
|
# Timing
|
|
start_time: float
|
|
end_time: float
|
|
|
|
# Extraction results
|
|
extraction_result: ExtractionResult
|
|
|
|
# Loading results
|
|
load_result: LoadResult
|
|
|
|
# Performance metrics
|
|
total_files_processed: int
|
|
total_records_loaded: int
|
|
files_per_second: float
|
|
records_per_second: float
|
|
|
|
# Quality metrics
|
|
extraction_success_rate: float
|
|
loading_success_rate: float
|
|
overall_success_rate: float
|
|
|
|
# Validation results
|
|
integrity_issues: List[str]
|
|
|
|
@property
|
|
def duration_seconds(self) -> float:
|
|
return self.end_time - self.start_time
|
|
|
|
@property
|
|
def duration_minutes(self) -> float:
|
|
return self.duration_seconds / 60.0
|
|
|
|
@property
|
|
def was_successful(self) -> bool:
|
|
"""True if pipeline completed without critical errors"""
|
|
return (self.extraction_result.failed_extractions == 0 and
|
|
len(self.load_result.failed_makes) == 0 and
|
|
len(self.integrity_issues) == 0)
|
|
|
|
|
|
class PipelineProgress:
|
|
"""Progress tracking for pipeline execution"""
|
|
|
|
def __init__(self, total_files: int):
|
|
self.total_files = total_files
|
|
self.current_file = 0
|
|
self.current_phase = "Starting"
|
|
self.start_time = time.time()
|
|
self.phase_start_time = time.time()
|
|
|
|
def update_phase(self, phase: str) -> None:
|
|
"""Update current phase"""
|
|
self.current_phase = phase
|
|
self.phase_start_time = time.time()
|
|
|
|
def update_file_progress(self, files_completed: int) -> None:
|
|
"""Update file progress"""
|
|
self.current_file = files_completed
|
|
|
|
def get_progress_info(self) -> Dict[str, any]:
|
|
"""Get current progress information"""
|
|
elapsed = time.time() - self.start_time
|
|
phase_elapsed = time.time() - self.phase_start_time
|
|
|
|
if self.current_file > 0:
|
|
files_per_second = self.current_file / elapsed
|
|
eta_seconds = (self.total_files - self.current_file) / files_per_second if files_per_second > 0 else 0
|
|
else:
|
|
files_per_second = 0
|
|
eta_seconds = 0
|
|
|
|
return {
|
|
'phase': self.current_phase,
|
|
'files_completed': self.current_file,
|
|
'total_files': self.total_files,
|
|
'percentage': (self.current_file / self.total_files * 100) if self.total_files > 0 else 0,
|
|
'elapsed_seconds': elapsed,
|
|
'phase_elapsed_seconds': phase_elapsed,
|
|
'files_per_second': files_per_second,
|
|
'eta_seconds': eta_seconds
|
|
}
|
|
|
|
|
|
class ManualJsonPipeline:
|
|
"""End-to-end JSON processing pipeline"""
|
|
|
|
def __init__(self, sources_dir: str, config: Optional[PipelineConfig] = None):
|
|
"""
|
|
Initialize pipeline
|
|
|
|
Args:
|
|
sources_dir: Directory containing JSON files
|
|
config: Pipeline configuration (optional)
|
|
"""
|
|
self.sources_dir = sources_dir
|
|
self.config = config or PipelineConfig(sources_directory=sources_dir)
|
|
|
|
# Initialize components
|
|
self.make_mapper = MakeNameMapper()
|
|
self.engine_parser = EngineSpecParser()
|
|
self.extractor = JsonExtractor(self.make_mapper, self.engine_parser)
|
|
self.loader = JsonManualLoader()
|
|
|
|
# Progress tracking
|
|
self.progress_callback: Optional[Callable[[Dict[str, any]], None]] = None
|
|
|
|
logger.info(f"ManualJsonPipeline initialized for {sources_dir}")
|
|
|
|
def set_progress_callback(self, callback: Callable[[Dict[str, any]], None]) -> None:
|
|
"""
|
|
Set progress callback function
|
|
|
|
Args:
|
|
callback: Function to call with progress updates
|
|
"""
|
|
self.progress_callback = callback
|
|
|
|
def _update_progress(self, progress: PipelineProgress) -> None:
|
|
"""Update progress via callback if configured"""
|
|
if self.progress_callback and self.config.enable_progress_tracking:
|
|
progress_info = progress.get_progress_info()
|
|
self.progress_callback(progress_info)
|
|
|
|
def run(self, mode: Optional[LoadMode] = None, progress_callback: Optional[Callable] = None) -> PipelineResult:
|
|
"""
|
|
Execute complete pipeline
|
|
|
|
Args:
|
|
mode: Loading mode (overrides config)
|
|
progress_callback: Progress callback function (overrides config)
|
|
|
|
Returns:
|
|
PipelineResult with complete execution details
|
|
"""
|
|
start_time = time.time()
|
|
|
|
# Override config if specified
|
|
load_mode = mode or self.config.load_mode
|
|
if progress_callback:
|
|
self.set_progress_callback(progress_callback)
|
|
|
|
logger.info(f"Starting manual JSON pipeline in {load_mode.value} mode")
|
|
logger.info(f"Processing directory: {self.sources_dir}")
|
|
|
|
try:
|
|
# Count files for progress tracking
|
|
json_files = list(Path(self.sources_dir).glob("*.json"))
|
|
total_files = len(json_files)
|
|
|
|
if total_files == 0:
|
|
raise ValueError(f"No JSON files found in {self.sources_dir}")
|
|
|
|
progress = PipelineProgress(total_files)
|
|
|
|
# Phase 1: Extract data from JSON files
|
|
progress.update_phase("Extracting data from JSON files")
|
|
self._update_progress(progress)
|
|
|
|
logger.info(f"Phase 1: Extracting data from {total_files} JSON files")
|
|
extraction_result = self.extractor.extract_all_makes(self.sources_dir)
|
|
|
|
progress.update_file_progress(extraction_result.total_files_processed)
|
|
self._update_progress(progress)
|
|
|
|
if extraction_result.failed_extractions > 0:
|
|
logger.warning(f"Extraction completed with {extraction_result.failed_extractions} failures")
|
|
else:
|
|
logger.info(f"Extraction completed successfully: {extraction_result.total_models} models, {extraction_result.total_engines} engines")
|
|
|
|
# Phase 2: Load data into database
|
|
progress.update_phase("Loading data into database")
|
|
self._update_progress(progress)
|
|
|
|
logger.info(f"Phase 2: Loading {len(extraction_result.makes)} makes into database ({load_mode.value} mode)")
|
|
load_result = self.loader.load_all_makes(extraction_result.makes, load_mode)
|
|
|
|
if len(load_result.failed_makes) > 0:
|
|
logger.warning(f"Loading completed with {len(load_result.failed_makes)} failures")
|
|
else:
|
|
logger.info(f"Loading completed successfully: {load_result.success_count} makes loaded")
|
|
|
|
# Phase 3: Validate referential integrity (if enabled)
|
|
integrity_issues = []
|
|
if self.config.validate_integrity:
|
|
progress.update_phase("Validating referential integrity")
|
|
self._update_progress(progress)
|
|
|
|
logger.info("Phase 3: Validating referential integrity")
|
|
integrity_issues = self.loader.validate_referential_integrity()
|
|
|
|
if integrity_issues:
|
|
logger.warning(f"Referential integrity issues found: {len(integrity_issues)}")
|
|
else:
|
|
logger.info("Referential integrity validation passed")
|
|
|
|
# Calculate performance metrics
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
files_per_second = total_files / duration if duration > 0 else 0
|
|
total_records = (load_result.total_models + load_result.total_engines +
|
|
load_result.total_trims + load_result.total_trim_engine_mappings)
|
|
records_per_second = total_records / duration if duration > 0 else 0
|
|
|
|
# Calculate success rates
|
|
extraction_success_rate = extraction_result.success_rate
|
|
loading_success_rate = load_result.success_rate
|
|
overall_success_rate = min(extraction_success_rate, loading_success_rate)
|
|
|
|
# Create result
|
|
result = PipelineResult(
|
|
config=self.config,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
extraction_result=extraction_result,
|
|
load_result=load_result,
|
|
total_files_processed=total_files,
|
|
total_records_loaded=total_records,
|
|
files_per_second=files_per_second,
|
|
records_per_second=records_per_second,
|
|
extraction_success_rate=extraction_success_rate,
|
|
loading_success_rate=loading_success_rate,
|
|
overall_success_rate=overall_success_rate,
|
|
integrity_issues=integrity_issues
|
|
)
|
|
|
|
progress.update_phase("Pipeline complete")
|
|
self._update_progress(progress)
|
|
|
|
logger.info(f"Pipeline completed in {result.duration_seconds:.1f} seconds")
|
|
logger.info(f"Performance: {files_per_second:.1f} files/sec, {records_per_second:.0f} records/sec")
|
|
logger.info(f"Overall success rate: {overall_success_rate:.1%}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
end_time = time.time()
|
|
logger.error(f"Pipeline failed after {end_time - start_time:.1f} seconds: {str(e)}")
|
|
raise
|
|
|
|
def run_extraction_only(self) -> ExtractionResult:
|
|
"""
|
|
Run extraction phase only (for testing/validation)
|
|
|
|
Returns:
|
|
ExtractionResult with extracted data
|
|
"""
|
|
logger.info("Running extraction-only pipeline")
|
|
|
|
result = self.extractor.extract_all_makes(self.sources_dir)
|
|
|
|
logger.info(f"Extraction complete: {result.total_models} models, {result.total_engines} engines")
|
|
logger.info(f"Success rate: {result.success_rate:.1%}")
|
|
|
|
return result
|
|
|
|
def get_source_statistics(self) -> Dict[str, any]:
|
|
"""
|
|
Get statistics about source JSON files
|
|
|
|
Returns:
|
|
Dictionary with source file statistics
|
|
"""
|
|
json_files = list(Path(self.sources_dir).glob("*.json"))
|
|
|
|
total_size_bytes = sum(f.stat().st_size for f in json_files)
|
|
|
|
return {
|
|
'total_files': len(json_files),
|
|
'total_size_bytes': total_size_bytes,
|
|
'total_size_mb': total_size_bytes / (1024 * 1024),
|
|
'average_file_size_kb': (total_size_bytes / len(json_files) / 1024) if json_files else 0,
|
|
'directory': str(self.sources_dir)
|
|
}
|
|
|
|
def print_pipeline_report(self, result: PipelineResult) -> None:
|
|
"""
|
|
Print comprehensive pipeline execution report
|
|
|
|
Args:
|
|
result: PipelineResult from pipeline execution
|
|
"""
|
|
print(f"🚀 MANUAL JSON PIPELINE EXECUTION REPORT")
|
|
print(f"=" * 60)
|
|
|
|
# Configuration
|
|
print(f"\n⚙️ CONFIGURATION")
|
|
print(f" Sources directory: {result.config.sources_directory}")
|
|
print(f" Load mode: {result.config.load_mode.value.upper()}")
|
|
print(f" Batch size: {result.config.batch_size}")
|
|
print(f" Integrity validation: {'Enabled' if result.config.validate_integrity else 'Disabled'}")
|
|
|
|
# Performance
|
|
print(f"\n⏱️ PERFORMANCE")
|
|
print(f" Total duration: {result.duration_seconds:.1f} seconds ({result.duration_minutes:.1f} minutes)")
|
|
print(f" Files processed: {result.total_files_processed}")
|
|
print(f" Records loaded: {result.total_records_loaded:,}")
|
|
print(f" Processing rate: {result.files_per_second:.1f} files/sec")
|
|
print(f" Loading rate: {result.records_per_second:,.0f} records/sec")
|
|
|
|
# Success rates
|
|
print(f"\n📊 SUCCESS RATES")
|
|
print(f" Extraction: {result.extraction_success_rate:.1%}")
|
|
print(f" Loading: {result.loading_success_rate:.1%}")
|
|
print(f" Overall: {result.overall_success_rate:.1%}")
|
|
|
|
# Data summary
|
|
print(f"\n📈 DATA PROCESSED")
|
|
print(f" Makes: {result.load_result.total_makes}")
|
|
print(f" Models: {result.load_result.total_models}")
|
|
print(f" Model years: {result.load_result.total_model_years}")
|
|
print(f" Trims: {result.load_result.total_trims}")
|
|
print(f" Engines: {result.load_result.total_engines}")
|
|
print(f" Trim-engine mappings: {result.load_result.total_trim_engine_mappings}")
|
|
|
|
# Issues
|
|
if result.load_result.failed_makes:
|
|
print(f"\n⚠️ FAILED MAKES ({len(result.load_result.failed_makes)}):")
|
|
for make in result.load_result.failed_makes:
|
|
print(f" {make}")
|
|
|
|
if result.integrity_issues:
|
|
print(f"\n❌ REFERENTIAL INTEGRITY ISSUES ({len(result.integrity_issues)}):")
|
|
for issue in result.integrity_issues:
|
|
print(f" {issue}")
|
|
else:
|
|
print(f"\n✅ REFERENTIAL INTEGRITY: PASSED")
|
|
|
|
# Final status
|
|
print(f"\n🎯 PIPELINE STATUS: {'SUCCESS' if result.was_successful else 'COMPLETED WITH ISSUES'}")
|
|
|
|
|
|
def default_progress_callback(progress_info: Dict[str, any]) -> None:
|
|
"""Default progress callback that prints to console"""
|
|
percentage = progress_info['percentage']
|
|
phase = progress_info['phase']
|
|
files_completed = progress_info['files_completed']
|
|
total_files = progress_info['total_files']
|
|
|
|
if progress_info['files_per_second'] > 0:
|
|
eta_minutes = progress_info['eta_seconds'] / 60
|
|
print(f"[{percentage:5.1f}%] {phase}: {files_completed}/{total_files} files "
|
|
f"({progress_info['files_per_second']:.1f} files/sec, ETA: {eta_minutes:.1f}min)")
|
|
else:
|
|
print(f"[{percentage:5.1f}%] {phase}: {files_completed}/{total_files} files")
|
|
|
|
|
|
# Example usage and testing functions
|
|
def example_usage():
|
|
"""Demonstrate ManualJsonPipeline usage"""
|
|
print("🚀 ManualJsonPipeline Example Usage")
|
|
print("=" * 40)
|
|
|
|
sources_dir = "sources/makes"
|
|
|
|
if not Path(sources_dir).exists():
|
|
print(f"❌ Sources directory not found: {sources_dir}")
|
|
return
|
|
|
|
print(f"\n💡 Example pipeline execution:")
|
|
print(f"""
|
|
# Create pipeline with configuration
|
|
config = PipelineConfig(
|
|
sources_directory="{sources_dir}",
|
|
load_mode=LoadMode.APPEND,
|
|
enable_progress_tracking=True,
|
|
validate_integrity=True
|
|
)
|
|
|
|
pipeline = ManualJsonPipeline("{sources_dir}", config)
|
|
|
|
# Run with progress tracking
|
|
result = pipeline.run(progress_callback=default_progress_callback)
|
|
|
|
# Print comprehensive report
|
|
pipeline.print_pipeline_report(result)
|
|
""")
|
|
|
|
# Show source statistics
|
|
try:
|
|
pipeline = ManualJsonPipeline(sources_dir)
|
|
stats = pipeline.get_source_statistics()
|
|
|
|
print(f"\n📊 Source Directory Statistics:")
|
|
print(f" Files: {stats['total_files']}")
|
|
print(f" Total size: {stats['total_size_mb']:.1f} MB")
|
|
print(f" Average file size: {stats['average_file_size_kb']:.1f} KB")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Could not get source statistics: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
example_usage() |