348 lines
14 KiB
Python
Executable File
348 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import logging
|
|
import sys
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import click
|
|
from .config import config
|
|
from .utils.logging import setup_logging
|
|
from .scheduler import start_etl_scheduler
|
|
from .pipeline import run_etl_pipeline
|
|
from .connections import test_connections
|
|
|
|
# Import manual JSON processing components
|
|
try:
|
|
from .pipelines.manual_json_pipeline import ManualJsonPipeline, PipelineConfig, default_progress_callback
|
|
from .loaders.json_manual_loader import LoadMode
|
|
from .utils.make_name_mapper import MakeNameMapper
|
|
from .utils.engine_spec_parser import EngineSpecParser
|
|
from .extractors.json_extractor import JsonExtractor
|
|
except ImportError as e:
|
|
# Handle import errors gracefully for existing functionality
|
|
ManualJsonPipeline = None
|
|
logger = logging.getLogger(__name__)
|
|
logger.warning(f"Manual JSON processing components not available: {e}")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@click.group()
|
|
def cli():
|
|
"""MVP Platform Vehicles ETL Tool"""
|
|
setup_logging(config.LOG_LEVEL)
|
|
|
|
@cli.command()
|
|
def build_catalog():
|
|
"""Build vehicle catalog from source database"""
|
|
success = run_etl_pipeline()
|
|
if not success:
|
|
sys.exit(1)
|
|
|
|
@cli.command()
|
|
def schedule():
|
|
"""Start ETL scheduler (default mode)"""
|
|
start_etl_scheduler()
|
|
|
|
@cli.command()
|
|
@click.option('--full', is_flag=True, help='Full reload instead of incremental')
|
|
def update(full):
|
|
"""Run ETL update"""
|
|
logger.info(f"Starting ETL update (full={full})")
|
|
success = run_etl_pipeline()
|
|
if not success:
|
|
sys.exit(1)
|
|
|
|
@cli.command()
|
|
def test():
|
|
"""Test database connections"""
|
|
success = test_connections()
|
|
if not success:
|
|
logger.error("Connection test failed")
|
|
sys.exit(1)
|
|
else:
|
|
logger.info("All connections tested successfully")
|
|
|
|
@cli.command()
|
|
@click.option('--sources-dir', '-s', default='sources/makes',
|
|
help='Directory containing JSON make files (default: sources/makes)')
|
|
@click.option('--mode', '-m', type=click.Choice(['clear', 'append']), default='append',
|
|
help='Loading mode: clear (destructive) or append (safe, default)')
|
|
@click.option('--progress/--no-progress', default=True,
|
|
help='Show progress tracking (default: enabled)')
|
|
@click.option('--validate/--no-validate', default=True,
|
|
help='Validate referential integrity after loading (default: enabled)')
|
|
@click.option('--batch-size', '-b', type=int, default=1000,
|
|
help='Database batch size for inserts (default: 1000)')
|
|
@click.option('--dry-run', is_flag=True,
|
|
help='Extract and validate data without loading to database')
|
|
@click.option('--verbose', '-v', is_flag=True,
|
|
help='Enable verbose output')
|
|
def load_manual(sources_dir, mode, progress, validate, batch_size, dry_run, verbose):
|
|
"""Load vehicle data from JSON files manually
|
|
|
|
This command processes JSON files in the specified directory and loads
|
|
vehicle data into the PostgreSQL database. It supports two modes:
|
|
|
|
• APPEND mode (default): Safely add new data with duplicate detection
|
|
• CLEAR mode: Remove all existing data and reload (destructive)
|
|
|
|
Examples:
|
|
python -m etl load-manual
|
|
python -m etl load-manual --mode clear --sources-dir custom/path
|
|
python -m etl load-manual --dry-run --verbose
|
|
"""
|
|
if ManualJsonPipeline is None:
|
|
click.echo("❌ Manual JSON processing components are not available", err=True)
|
|
click.echo(" Please check your installation and dependencies", err=True)
|
|
sys.exit(1)
|
|
|
|
# Validate sources directory
|
|
sources_path = Path(sources_dir)
|
|
if not sources_path.exists():
|
|
click.echo(f"❌ Sources directory not found: {sources_dir}", err=True)
|
|
click.echo(" Please specify a valid directory with --sources-dir", err=True)
|
|
sys.exit(1)
|
|
|
|
# Count JSON files
|
|
json_files = list(sources_path.glob("*.json"))
|
|
if not json_files:
|
|
click.echo(f"❌ No JSON files found in: {sources_dir}", err=True)
|
|
click.echo(" Please ensure the directory contains *.json files", err=True)
|
|
sys.exit(1)
|
|
|
|
# Set log level if verbose
|
|
if verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Create configuration
|
|
load_mode_enum = LoadMode.CLEAR if mode == 'clear' else LoadMode.APPEND
|
|
config = PipelineConfig(
|
|
sources_directory=str(sources_path),
|
|
load_mode=load_mode_enum,
|
|
enable_progress_tracking=progress,
|
|
validate_integrity=validate,
|
|
batch_size=batch_size
|
|
)
|
|
|
|
click.echo(f"🚀 Manual JSON Processing Pipeline")
|
|
click.echo(f" Sources: {sources_dir}")
|
|
click.echo(f" Files: {len(json_files)} JSON files")
|
|
click.echo(f" Mode: {mode.upper()}")
|
|
if dry_run:
|
|
click.echo(f" Dry run: Validation only (no database changes)")
|
|
|
|
try:
|
|
# Create pipeline
|
|
pipeline = ManualJsonPipeline(str(sources_path), config)
|
|
|
|
# Progress callback for CLI
|
|
def cli_progress_callback(progress_info):
|
|
if progress:
|
|
percentage = progress_info['percentage']
|
|
phase = progress_info['phase']
|
|
files = f"{progress_info['files_completed']}/{progress_info['total_files']}"
|
|
|
|
if progress_info['files_per_second'] > 0:
|
|
rate = f"({progress_info['files_per_second']:.1f} files/sec)"
|
|
eta_min = progress_info['eta_seconds'] / 60
|
|
eta = f"ETA: {eta_min:.1f}min" if eta_min > 0 else ""
|
|
click.echo(f"[{percentage:5.1f}%] {phase}: {files} {rate} {eta}")
|
|
else:
|
|
click.echo(f"[{percentage:5.1f}%] {phase}: {files}")
|
|
|
|
if dry_run:
|
|
# Extraction only for validation
|
|
click.echo("\n📋 Running extraction validation...")
|
|
extraction_result = pipeline.run_extraction_only()
|
|
|
|
# Report extraction results
|
|
click.echo(f"\n✅ Extraction Validation Complete")
|
|
click.echo(f" Files processed: {extraction_result.total_files_processed}")
|
|
click.echo(f" Success rate: {extraction_result.success_rate:.1%}")
|
|
click.echo(f" Models extracted: {extraction_result.total_models:,}")
|
|
click.echo(f" Engines extracted: {extraction_result.total_engines:,}")
|
|
click.echo(f" Electric models: {extraction_result.total_electric_models:,}")
|
|
|
|
if extraction_result.failed_extractions > 0:
|
|
click.echo(f" ⚠️ Failed extractions: {extraction_result.failed_extractions}")
|
|
sys.exit(1)
|
|
else:
|
|
# Full pipeline execution
|
|
if mode == 'clear':
|
|
click.echo("\n⚠️ WARNING: CLEAR mode will delete all existing vehicle data!")
|
|
if not click.confirm("Are you sure you want to continue?", default=False):
|
|
click.echo("Operation cancelled")
|
|
return
|
|
|
|
click.echo(f"\n🔄 Running pipeline...")
|
|
result = pipeline.run(progress_callback=cli_progress_callback)
|
|
|
|
# Print comprehensive report
|
|
click.echo(f"\n" + "="*60)
|
|
click.echo(f"📊 PIPELINE EXECUTION REPORT")
|
|
click.echo(f"="*60)
|
|
|
|
# Performance
|
|
click.echo(f"\n⏱️ PERFORMANCE")
|
|
click.echo(f" Duration: {result.duration_seconds:.1f} seconds ({result.duration_minutes:.1f} minutes)")
|
|
click.echo(f" Processing rate: {result.files_per_second:.1f} files/sec")
|
|
click.echo(f" Loading rate: {result.records_per_second:,.0f} records/sec")
|
|
|
|
# Success rates
|
|
click.echo(f"\n📈 SUCCESS RATES")
|
|
click.echo(f" Extraction: {result.extraction_success_rate:.1%}")
|
|
click.echo(f" Loading: {result.loading_success_rate:.1%}")
|
|
click.echo(f" Overall: {result.overall_success_rate:.1%}")
|
|
|
|
# Data loaded
|
|
click.echo(f"\n💾 DATA LOADED")
|
|
click.echo(f" Makes: {result.load_result.total_makes}")
|
|
click.echo(f" Models: {result.load_result.total_models}")
|
|
click.echo(f" Engines: {result.load_result.total_engines}")
|
|
click.echo(f" Trims: {result.load_result.total_trims}")
|
|
click.echo(f" Total records: {result.total_records_loaded:,}")
|
|
|
|
# Issues
|
|
if result.load_result.failed_makes:
|
|
click.echo(f"\n⚠️ FAILED MAKES ({len(result.load_result.failed_makes)}):")
|
|
for make in result.load_result.failed_makes:
|
|
click.echo(f" • {make}")
|
|
|
|
if result.integrity_issues:
|
|
click.echo(f"\n❌ INTEGRITY ISSUES ({len(result.integrity_issues)}):")
|
|
for issue in result.integrity_issues:
|
|
click.echo(f" • {issue}")
|
|
else:
|
|
click.echo(f"\n✅ REFERENTIAL INTEGRITY: PASSED")
|
|
|
|
# Final status
|
|
if result.was_successful:
|
|
click.echo(f"\n🎉 PIPELINE COMPLETED SUCCESSFULLY")
|
|
if verbose:
|
|
# Show database statistics
|
|
db_stats = pipeline.loader.get_database_statistics()
|
|
click.echo(f"\n📋 DATABASE STATISTICS:")
|
|
for table, count in db_stats.items():
|
|
click.echo(f" {table}: {count:,} records")
|
|
else:
|
|
click.echo(f"\n⚠️ PIPELINE COMPLETED WITH ISSUES")
|
|
sys.exit(1)
|
|
|
|
except KeyboardInterrupt:
|
|
click.echo(f"\n⏸️ Pipeline interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
click.echo(f"\n❌ Pipeline failed: {str(e)}", err=True)
|
|
if verbose:
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
@cli.command()
|
|
@click.option('--sources-dir', '-s', default='sources/makes',
|
|
help='Directory containing JSON make files (default: sources/makes)')
|
|
@click.option('--verbose', '-v', is_flag=True,
|
|
help='Enable verbose output with detailed statistics')
|
|
def validate_json(sources_dir, verbose):
|
|
"""Validate JSON files and show extraction statistics
|
|
|
|
This command validates the structure and content of JSON files
|
|
without loading data into the database. Useful for:
|
|
|
|
• Checking data quality before loading
|
|
• Debugging extraction issues
|
|
• Getting statistics about available data
|
|
|
|
Examples:
|
|
python -m etl validate-json
|
|
python -m etl validate-json --sources-dir custom/path --verbose
|
|
"""
|
|
if JsonExtractor is None:
|
|
click.echo("❌ JSON validation components are not available", err=True)
|
|
sys.exit(1)
|
|
|
|
# Validate sources directory
|
|
sources_path = Path(sources_dir)
|
|
if not sources_path.exists():
|
|
click.echo(f"❌ Sources directory not found: {sources_dir}", err=True)
|
|
sys.exit(1)
|
|
|
|
# Count JSON files
|
|
json_files = list(sources_path.glob("*.json"))
|
|
if not json_files:
|
|
click.echo(f"❌ No JSON files found in: {sources_dir}", err=True)
|
|
sys.exit(1)
|
|
|
|
click.echo(f"🔍 JSON File Validation")
|
|
click.echo(f" Directory: {sources_dir}")
|
|
click.echo(f" Files: {len(json_files)} JSON files")
|
|
|
|
try:
|
|
# Initialize components
|
|
make_mapper = MakeNameMapper()
|
|
engine_parser = EngineSpecParser()
|
|
extractor = JsonExtractor(make_mapper, engine_parser)
|
|
|
|
# Run extraction validation
|
|
click.echo(f"\n📋 Validating JSON structure and content...")
|
|
result = extractor.extract_all_makes(str(sources_path))
|
|
|
|
# Basic results
|
|
click.echo(f"\n✅ Validation Complete")
|
|
click.echo(f" Files processed: {result.total_files_processed}")
|
|
click.echo(f" Success rate: {result.success_rate:.1%}")
|
|
click.echo(f" Models found: {result.total_models:,}")
|
|
click.echo(f" Engines found: {result.total_engines:,}")
|
|
click.echo(f" Electric models: {result.total_electric_models:,}")
|
|
|
|
if result.failed_extractions > 0:
|
|
click.echo(f" ⚠️ Failed extractions: {result.failed_extractions}")
|
|
|
|
# Show top makes by model count
|
|
if verbose and result.makes:
|
|
click.echo(f"\n🏆 Top Makes by Model Count:")
|
|
top_makes = sorted(result.makes, key=lambda m: m.total_models, reverse=True)[:10]
|
|
for i, make in enumerate(top_makes, 1):
|
|
click.echo(f" {i:2d}. {make.name}: {make.total_models} models, {make.total_engines} engines")
|
|
|
|
# Show makes with issues
|
|
error_makes = [make for make in result.makes if make.processing_errors]
|
|
if error_makes:
|
|
click.echo(f"\n⚠️ Makes with Processing Errors ({len(error_makes)}):")
|
|
for make in error_makes[:5]:
|
|
click.echo(f" • {make.name}: {len(make.processing_errors)} errors")
|
|
if len(error_makes) > 5:
|
|
click.echo(f" ... and {len(error_makes) - 5} more")
|
|
|
|
# Show data quality insights
|
|
click.echo(f"\n📊 Data Quality Insights:")
|
|
|
|
# Engine configuration distribution
|
|
config_counts = {}
|
|
for make in result.makes:
|
|
for model in make.models:
|
|
for engine in model.engines:
|
|
config_counts[engine.configuration] = config_counts.get(engine.configuration, 0) + 1
|
|
|
|
if config_counts:
|
|
click.echo(f" Engine configurations:")
|
|
for config, count in sorted(config_counts.items(), key=lambda x: x[1], reverse=True):
|
|
percentage = count / result.total_engines * 100
|
|
click.echo(f" {config}: {count:,} ({percentage:.1f}%)")
|
|
|
|
if result.failed_extractions > 0:
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
click.echo(f"❌ Validation failed: {str(e)}", err=True)
|
|
if verbose:
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
# Default to scheduler mode if no command provided
|
|
if len(sys.argv) == 1:
|
|
start_etl_scheduler()
|
|
else:
|
|
cli() |