# Phase 4: Scheduled ETL Implementation ## Overview This phase implements automated weekly ETL processing using a cron-based scheduler within the existing ETL container. The ETL process extracts data from the MSSQL source database, transforms it for optimal query performance, and loads it into the MVP Platform database. ## Prerequisites - Phase 3 API migration completed successfully - ETL scheduler container built and functional - MSSQL source database with NHTSA data restored - MVP Platform database accessible - ETL Python code functional in vehicle-etl directory ## Scheduled ETL Architecture **Container**: `etl-scheduler` (already defined in Phase 1) **Schedule**: Weekly on Sunday at 2 AM (configurable) **Runtime**: Python 3.11 with cron daemon **Dependencies**: Both MSSQL and MVP Platform databases must be healthy ## Tasks ### Task 4.1: Create ETL Scheduler Dockerfile **Location**: `vehicle-etl/docker/Dockerfile.etl` **Action**: Create Dockerfile with cron daemon and ETL dependencies: ```dockerfile FROM python:3.11-slim # Install system dependencies including cron RUN apt-get update && apt-get install -y \ cron \ procps \ curl \ && rm -rf /var/lib/apt/lists/* # Create app directory WORKDIR /app # Copy requirements and install Python dependencies COPY requirements-etl.txt . RUN pip install --no-cache-dir -r requirements-etl.txt # Copy ETL source code COPY etl/ ./etl/ COPY sql/ ./sql/ COPY scripts/ ./scripts/ # Create logs directory RUN mkdir -p /app/logs # Copy cron configuration script COPY docker/setup-cron.sh /setup-cron.sh RUN chmod +x /setup-cron.sh # Copy entrypoint script COPY docker/entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh # Set up cron job RUN /setup-cron.sh # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ CMD python -c "import sys; from etl.connections import test_connections; sys.exit(0 if test_connections() else 1)" ENTRYPOINT ["/entrypoint.sh"] ``` ### Task 4.2: Create Cron Setup Script **Location**: `vehicle-etl/docker/setup-cron.sh` **Action**: Create script to configure cron job: ```bash #!/bin/bash # Create cron job from environment variable or default ETL_SCHEDULE=${ETL_SCHEDULE:-"0 2 * * 0"} # Create cron job that runs the ETL process echo "$ETL_SCHEDULE cd /app && python -m etl.main build-catalog >> /app/logs/etl-cron.log 2>&1" > /etc/cron.d/etl-job # Set permissions chmod 0644 /etc/cron.d/etl-job # Apply cron job crontab /etc/cron.d/etl-job echo "ETL cron job configured with schedule: $ETL_SCHEDULE" ``` ### Task 4.3: Create Container Entrypoint **Location**: `vehicle-etl/docker/entrypoint.sh` **Action**: Create entrypoint script that starts cron daemon: ```bash #!/bin/bash set -e # Start cron daemon in the background cron -f & CRON_PID=$! # Function to handle shutdown shutdown() { echo "Shutting down ETL scheduler..." kill $CRON_PID exit 0 } # Trap SIGTERM and SIGINT trap shutdown SIGTERM SIGINT # Run initial ETL if requested if [ "$RUN_INITIAL_ETL" = "true" ]; then echo "Running initial ETL process..." cd /app && python -m etl.main build-catalog fi # Log startup echo "ETL scheduler started with schedule: ${ETL_SCHEDULE:-0 2 * * 0}" echo "Cron daemon PID: $CRON_PID" # Keep container running wait $CRON_PID ``` ### Task 4.4: Update ETL Main Module **Location**: `vehicle-etl/etl/main.py` **Action**: Ensure ETL main module supports build-catalog command: ```python #!/usr/bin/env python3 """ ETL Main Module - Vehicle Catalog Builder """ import sys import argparse import logging from datetime import datetime import traceback from etl.utils.logging import setup_logging from etl.builders.vehicle_catalog_builder import VehicleCatalogBuilder from etl.connections import test_connections def build_catalog(): """Run the complete ETL pipeline to build vehicle catalog""" try: setup_logging() logger = logging.getLogger(__name__) start_time = datetime.now() logger.info(f"Starting ETL pipeline at {start_time}") # Test all connections first if not test_connections(): logger.error("Connection tests failed - aborting ETL") return False # Initialize catalog builder builder = VehicleCatalogBuilder() # Run ETL pipeline steps logger.info("Step 1: Extracting data from MSSQL source...") extract_success = builder.extract_source_data() if not extract_success: logger.error("Data extraction failed") return False logger.info("Step 2: Transforming data for catalog...") transform_success = builder.transform_catalog_data() if not transform_success: logger.error("Data transformation failed") return False logger.info("Step 3: Loading data to MVP Platform database...") load_success = builder.load_catalog_data() if not load_success: logger.error("Data loading failed") return False # Generate completion report end_time = datetime.now() duration = end_time - start_time logger.info(f"ETL pipeline completed successfully in {duration}") # Write completion marker with open('/app/logs/etl-last-run.txt', 'w') as f: f.write(f"{end_time.isoformat()}\n") f.write(f"Duration: {duration}\n") f.write("Status: SUCCESS\n") return True except Exception as e: logger.error(f"ETL pipeline failed: {str(e)}") logger.error(traceback.format_exc()) # Write error marker with open('/app/logs/etl-last-run.txt', 'w') as f: f.write(f"{datetime.now().isoformat()}\n") f.write(f"Status: FAILED\n") f.write(f"Error: {str(e)}\n") return False def main(): """Main entry point""" parser = argparse.ArgumentParser(description='Vehicle ETL Pipeline') parser.add_argument('command', choices=['build-catalog', 'test-connections', 'validate'], help='Command to execute') parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], help='Logging level') args = parser.parse_args() # Setup logging logging.basicConfig( level=getattr(logging, args.log_level), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) if args.command == 'build-catalog': success = build_catalog() sys.exit(0 if success else 1) elif args.command == 'test-connections': success = test_connections() print("All connections successful" if success else "Connection tests failed") sys.exit(0 if success else 1) elif args.command == 'validate': # Add validation logic here print("Validation not yet implemented") sys.exit(1) if __name__ == '__main__': main() ``` ### Task 4.5: Create Connection Testing Module **Location**: `vehicle-etl/etl/connections.py` **Action**: Create connection testing utilities: ```python """ Database connection testing utilities """ import os import logging import pyodbc import psycopg2 import redis logger = logging.getLogger(__name__) def test_mssql_connection(): """Test MSSQL source database connection""" try: connection_string = ( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={os.getenv('MSSQL_HOST', 'localhost')};" f"DATABASE={os.getenv('MSSQL_DATABASE', 'VPICList')};" f"UID={os.getenv('MSSQL_USERNAME', 'sa')};" f"PWD={os.getenv('MSSQL_PASSWORD')};" f"TrustServerCertificate=yes;" ) conn = pyodbc.connect(connection_string) cursor = conn.cursor() cursor.execute("SELECT @@VERSION") version = cursor.fetchone() logger.info(f"MSSQL connection successful: {version[0][:50]}...") cursor.close() conn.close() return True except Exception as e: logger.error(f"MSSQL connection failed: {str(e)}") return False def test_postgres_connection(): """Test PostgreSQL MVP Platform database connection""" try: conn = psycopg2.connect( host=os.getenv('POSTGRES_HOST', 'localhost'), port=int(os.getenv('POSTGRES_PORT', '5432')), database=os.getenv('POSTGRES_DATABASE', 'mvp-platform-vehicles'), user=os.getenv('POSTGRES_USERNAME', 'mvp_platform_user'), password=os.getenv('POSTGRES_PASSWORD') ) cursor = conn.cursor() cursor.execute("SELECT version()") version = cursor.fetchone() logger.info(f"PostgreSQL connection successful: {version[0][:50]}...") cursor.close() conn.close() return True except Exception as e: logger.error(f"PostgreSQL connection failed: {str(e)}") return False def test_redis_connection(): """Test Redis cache connection""" try: r = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', '6379')), decode_responses=True ) r.ping() logger.info("Redis connection successful") return True except Exception as e: logger.error(f"Redis connection failed: {str(e)}") return False def test_connections(): """Test all database connections""" logger.info("Testing all database connections...") mssql_ok = test_mssql_connection() postgres_ok = test_postgres_connection() redis_ok = test_redis_connection() all_ok = mssql_ok and postgres_ok and redis_ok if all_ok: logger.info("All database connections successful") else: logger.error("One or more database connections failed") return all_ok ``` ### Task 4.6: Create ETL Monitoring Script **Location**: `vehicle-etl/scripts/check-etl-status.sh` **Action**: Create monitoring script for ETL health: ```bash #!/bin/bash # ETL Status Monitoring Script LOG_FILE="/app/logs/etl-last-run.txt" CRON_LOG="/app/logs/etl-cron.log" echo "=== ETL Status Check ===" echo "Timestamp: $(date)" echo # Check if last run file exists if [ ! -f "$LOG_FILE" ]; then echo "❌ No ETL run detected yet" exit 1 fi # Read last run information echo "📄 Last ETL Run Information:" cat "$LOG_FILE" echo # Check if last run was successful if grep -q "Status: SUCCESS" "$LOG_FILE"; then echo "✅ Last ETL run was successful" EXIT_CODE=0 else echo "❌ Last ETL run failed" EXIT_CODE=1 fi # Show last few lines of cron log echo echo "📋 Recent ETL Log (last 10 lines):" if [ -f "$CRON_LOG" ]; then tail -10 "$CRON_LOG" else echo "No cron log found" fi echo echo "=== End Status Check ===" exit $EXIT_CODE ``` ### Task 4.7: Update Docker Compose Health Checks **Location**: `docker-compose.yml` (update existing etl-scheduler service) **Action**: Update the ETL scheduler service definition with proper health checks: ```yaml etl-scheduler: build: context: ./vehicle-etl dockerfile: docker/Dockerfile.etl container_name: mvp-etl-scheduler environment: # ... existing environment variables ... # Health check configuration - HEALTH_CHECK_ENABLED=true volumes: - ./vehicle-etl/logs:/app/logs - etl_scheduler_data:/app/data depends_on: mssql-source: condition: service_healthy mvp-platform-database: condition: service_healthy redis: condition: service_healthy restart: unless-stopped healthcheck: test: ["CMD", "/app/scripts/check-etl-status.sh"] interval: 60s timeout: 30s retries: 3 start_period: 120s ``` ### Task 4.8: Create ETL Requirements File **Location**: `vehicle-etl/requirements-etl.txt` **Action**: Ensure all required Python packages are listed: ```txt # Database connectivity pyodbc>=4.0.35 psycopg2-binary>=2.9.5 redis>=4.5.1 # Data processing pandas>=1.5.3 numpy>=1.24.2 # Utilities python-dateutil>=2.8.2 tqdm>=4.64.1 # Logging and monitoring structlog>=22.3.0 # Configuration python-decouple>=3.6 # Testing (for validation) pytest>=7.2.1 pytest-asyncio>=0.20.3 ``` ## Validation Steps ### Step 1: Build and Test ETL Container ```bash # Build the ETL scheduler container docker-compose build etl-scheduler # Test container startup docker-compose up etl-scheduler -d # Check container logs docker-compose logs etl-scheduler ``` ### Step 2: Test ETL Connection ```bash # Test database connections docker-compose exec etl-scheduler python -m etl.main test-connections # Should output: "All connections successful" ``` ### Step 3: Test Manual ETL Execution ```bash # Run ETL manually to test functionality docker-compose exec etl-scheduler python -m etl.main build-catalog # Check for success in logs docker-compose exec etl-scheduler cat /app/logs/etl-last-run.txt ``` ### Step 4: Verify Cron Configuration ```bash # Check cron job is configured docker-compose exec etl-scheduler crontab -l # Should show: "0 2 * * 0 cd /app && python -m etl.main build-catalog >> /app/logs/etl-cron.log 2>&1" ``` ### Step 5: Test ETL Status Monitoring ```bash # Test status check script docker-compose exec etl-scheduler /app/scripts/check-etl-status.sh # Check health check endpoint curl -f http://localhost:8080/health || echo "Health check failed" ``` ## Error Handling ### Common Issues and Solutions **Issue**: Cron daemon not starting **Solution**: Check entrypoint.sh permissions, verify cron package installation **Issue**: Database connection failures **Solution**: Verify network connectivity, check environment variables, ensure databases are healthy **Issue**: ETL process hanging **Solution**: Add timeout mechanisms, check for deadlocks, increase memory limits **Issue**: Log files not being written **Solution**: Check volume mounts, verify directory permissions ### ETL Failure Recovery **Automatic Recovery**: - Container restart policy: `unless-stopped` - Retry logic in ETL scripts (max 3 retries) - Health check will restart container if ETL consistently fails **Manual Recovery**: ```bash # Check ETL status docker-compose exec etl-scheduler /app/scripts/check-etl-status.sh # Restart ETL container docker-compose restart etl-scheduler # Run ETL manually if needed docker-compose exec etl-scheduler python -m etl.main build-catalog ``` ### Rollback Procedure 1. Stop ETL scheduler: ```bash docker-compose stop etl-scheduler ``` 2. Remove ETL-related files if needed: ```bash rm -rf vehicle-etl/docker/ ``` 3. Remove ETL scheduler from docker-compose.yml 4. Restart remaining services: ```bash docker-compose up -d ``` ## Next Steps After successful completion of Phase 4: 1. Proceed to [Phase 5: Testing & Validation](./phase-05-testing.md) 2. Monitor ETL execution for first few runs 3. Set up alerting for ETL failures 4. Document ETL maintenance procedures ## Dependencies for Next Phase - ETL scheduler running successfully - Cron job configured and functional - First ETL run completed successfully - MVP Platform database populated with vehicle data - ETL monitoring and health checks working