15 KiB
Phase 4: Scheduled ETL Implementation
Overview
This phase implements automated weekly ETL processing using a cron-based scheduler within the existing ETL container. The ETL process extracts data from the MSSQL source database, transforms it for optimal query performance, and loads it into the MVP Platform database.
Prerequisites
- Phase 3 API migration completed successfully
- ETL scheduler container built and functional
- MSSQL source database with NHTSA data restored
- MVP Platform database accessible
- ETL Python code functional in vehicle-etl directory
Scheduled ETL Architecture
Container: etl-scheduler (already defined in Phase 1)
Schedule: Weekly on Sunday at 2 AM (configurable)
Runtime: Python 3.11 with cron daemon
Dependencies: Both MSSQL and MVP Platform databases must be healthy
Tasks
Task 4.1: Create ETL Scheduler Dockerfile
Location: vehicle-etl/docker/Dockerfile.etl
Action: Create Dockerfile with cron daemon and ETL dependencies:
FROM python:3.11-slim
# Install system dependencies including cron
RUN apt-get update && apt-get install -y \
cron \
procps \
curl \
&& rm -rf /var/lib/apt/lists/*
# Create app directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements-etl.txt .
RUN pip install --no-cache-dir -r requirements-etl.txt
# Copy ETL source code
COPY etl/ ./etl/
COPY sql/ ./sql/
COPY scripts/ ./scripts/
# Create logs directory
RUN mkdir -p /app/logs
# Copy cron configuration script
COPY docker/setup-cron.sh /setup-cron.sh
RUN chmod +x /setup-cron.sh
# Copy entrypoint script
COPY docker/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
# Set up cron job
RUN /setup-cron.sh
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import sys; from etl.connections import test_connections; sys.exit(0 if test_connections() else 1)"
ENTRYPOINT ["/entrypoint.sh"]
Task 4.2: Create Cron Setup Script
Location: vehicle-etl/docker/setup-cron.sh
Action: Create script to configure cron job:
#!/bin/bash
# Create cron job from environment variable or default
ETL_SCHEDULE=${ETL_SCHEDULE:-"0 2 * * 0"}
# Create cron job that runs the ETL process
echo "$ETL_SCHEDULE cd /app && python -m etl.main build-catalog >> /app/logs/etl-cron.log 2>&1" > /etc/cron.d/etl-job
# Set permissions
chmod 0644 /etc/cron.d/etl-job
# Apply cron job
crontab /etc/cron.d/etl-job
echo "ETL cron job configured with schedule: $ETL_SCHEDULE"
Task 4.3: Create Container Entrypoint
Location: vehicle-etl/docker/entrypoint.sh
Action: Create entrypoint script that starts cron daemon:
#!/bin/bash
set -e
# Start cron daemon in the background
cron -f &
CRON_PID=$!
# Function to handle shutdown
shutdown() {
echo "Shutting down ETL scheduler..."
kill $CRON_PID
exit 0
}
# Trap SIGTERM and SIGINT
trap shutdown SIGTERM SIGINT
# Run initial ETL if requested
if [ "$RUN_INITIAL_ETL" = "true" ]; then
echo "Running initial ETL process..."
cd /app && python -m etl.main build-catalog
fi
# Log startup
echo "ETL scheduler started with schedule: ${ETL_SCHEDULE:-0 2 * * 0}"
echo "Cron daemon PID: $CRON_PID"
# Keep container running
wait $CRON_PID
Task 4.4: Update ETL Main Module
Location: vehicle-etl/etl/main.py
Action: Ensure ETL main module supports build-catalog command:
#!/usr/bin/env python3
"""
ETL Main Module - Vehicle Catalog Builder
"""
import sys
import argparse
import logging
from datetime import datetime
import traceback
from etl.utils.logging import setup_logging
from etl.builders.vehicle_catalog_builder import VehicleCatalogBuilder
from etl.connections import test_connections
def build_catalog():
"""Run the complete ETL pipeline to build vehicle catalog"""
try:
setup_logging()
logger = logging.getLogger(__name__)
start_time = datetime.now()
logger.info(f"Starting ETL pipeline at {start_time}")
# Test all connections first
if not test_connections():
logger.error("Connection tests failed - aborting ETL")
return False
# Initialize catalog builder
builder = VehicleCatalogBuilder()
# Run ETL pipeline steps
logger.info("Step 1: Extracting data from MSSQL source...")
extract_success = builder.extract_source_data()
if not extract_success:
logger.error("Data extraction failed")
return False
logger.info("Step 2: Transforming data for catalog...")
transform_success = builder.transform_catalog_data()
if not transform_success:
logger.error("Data transformation failed")
return False
logger.info("Step 3: Loading data to MVP Platform database...")
load_success = builder.load_catalog_data()
if not load_success:
logger.error("Data loading failed")
return False
# Generate completion report
end_time = datetime.now()
duration = end_time - start_time
logger.info(f"ETL pipeline completed successfully in {duration}")
# Write completion marker
with open('/app/logs/etl-last-run.txt', 'w') as f:
f.write(f"{end_time.isoformat()}\n")
f.write(f"Duration: {duration}\n")
f.write("Status: SUCCESS\n")
return True
except Exception as e:
logger.error(f"ETL pipeline failed: {str(e)}")
logger.error(traceback.format_exc())
# Write error marker
with open('/app/logs/etl-last-run.txt', 'w') as f:
f.write(f"{datetime.now().isoformat()}\n")
f.write(f"Status: FAILED\n")
f.write(f"Error: {str(e)}\n")
return False
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='Vehicle ETL Pipeline')
parser.add_argument('command', choices=['build-catalog', 'test-connections', 'validate'],
help='Command to execute')
parser.add_argument('--log-level', default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
help='Logging level')
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=getattr(logging, args.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
if args.command == 'build-catalog':
success = build_catalog()
sys.exit(0 if success else 1)
elif args.command == 'test-connections':
success = test_connections()
print("All connections successful" if success else "Connection tests failed")
sys.exit(0 if success else 1)
elif args.command == 'validate':
# Add validation logic here
print("Validation not yet implemented")
sys.exit(1)
if __name__ == '__main__':
main()
Task 4.5: Create Connection Testing Module
Location: vehicle-etl/etl/connections.py
Action: Create connection testing utilities:
"""
Database connection testing utilities
"""
import os
import logging
import pyodbc
import psycopg2
import redis
logger = logging.getLogger(__name__)
def test_mssql_connection():
"""Test MSSQL source database connection"""
try:
connection_string = (
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={os.getenv('MSSQL_HOST', 'localhost')};"
f"DATABASE={os.getenv('MSSQL_DATABASE', 'VPICList')};"
f"UID={os.getenv('MSSQL_USERNAME', 'sa')};"
f"PWD={os.getenv('MSSQL_PASSWORD')};"
f"TrustServerCertificate=yes;"
)
conn = pyodbc.connect(connection_string)
cursor = conn.cursor()
cursor.execute("SELECT @@VERSION")
version = cursor.fetchone()
logger.info(f"MSSQL connection successful: {version[0][:50]}...")
cursor.close()
conn.close()
return True
except Exception as e:
logger.error(f"MSSQL connection failed: {str(e)}")
return False
def test_postgres_connection():
"""Test PostgreSQL MVP Platform database connection"""
try:
conn = psycopg2.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=int(os.getenv('POSTGRES_PORT', '5432')),
database=os.getenv('POSTGRES_DATABASE', 'mvp-platform-vehicles'),
user=os.getenv('POSTGRES_USERNAME', 'mvp_platform_user'),
password=os.getenv('POSTGRES_PASSWORD')
)
cursor = conn.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
logger.info(f"PostgreSQL connection successful: {version[0][:50]}...")
cursor.close()
conn.close()
return True
except Exception as e:
logger.error(f"PostgreSQL connection failed: {str(e)}")
return False
def test_redis_connection():
"""Test Redis cache connection"""
try:
r = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', '6379')),
decode_responses=True
)
r.ping()
logger.info("Redis connection successful")
return True
except Exception as e:
logger.error(f"Redis connection failed: {str(e)}")
return False
def test_connections():
"""Test all database connections"""
logger.info("Testing all database connections...")
mssql_ok = test_mssql_connection()
postgres_ok = test_postgres_connection()
redis_ok = test_redis_connection()
all_ok = mssql_ok and postgres_ok and redis_ok
if all_ok:
logger.info("All database connections successful")
else:
logger.error("One or more database connections failed")
return all_ok
Task 4.6: Create ETL Monitoring Script
Location: vehicle-etl/scripts/check-etl-status.sh
Action: Create monitoring script for ETL health:
#!/bin/bash
# ETL Status Monitoring Script
LOG_FILE="/app/logs/etl-last-run.txt"
CRON_LOG="/app/logs/etl-cron.log"
echo "=== ETL Status Check ==="
echo "Timestamp: $(date)"
echo
# Check if last run file exists
if [ ! -f "$LOG_FILE" ]; then
echo "❌ No ETL run detected yet"
exit 1
fi
# Read last run information
echo "📄 Last ETL Run Information:"
cat "$LOG_FILE"
echo
# Check if last run was successful
if grep -q "Status: SUCCESS" "$LOG_FILE"; then
echo "✅ Last ETL run was successful"
EXIT_CODE=0
else
echo "❌ Last ETL run failed"
EXIT_CODE=1
fi
# Show last few lines of cron log
echo
echo "📋 Recent ETL Log (last 10 lines):"
if [ -f "$CRON_LOG" ]; then
tail -10 "$CRON_LOG"
else
echo "No cron log found"
fi
echo
echo "=== End Status Check ==="
exit $EXIT_CODE
Task 4.7: Update Docker Compose Health Checks
Location: docker-compose.yml (update existing etl-scheduler service)
Action: Update the ETL scheduler service definition with proper health checks:
etl-scheduler:
build:
context: ./vehicle-etl
dockerfile: docker/Dockerfile.etl
container_name: mvp-etl-scheduler
environment:
# ... existing environment variables ...
# Health check configuration
- HEALTH_CHECK_ENABLED=true
volumes:
- ./vehicle-etl/logs:/app/logs
- etl_scheduler_data:/app/data
depends_on:
mssql-source:
condition: service_healthy
mvp-platform-database:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped
healthcheck:
test: ["CMD", "/app/scripts/check-etl-status.sh"]
interval: 60s
timeout: 30s
retries: 3
start_period: 120s
Task 4.8: Create ETL Requirements File
Location: vehicle-etl/requirements-etl.txt
Action: Ensure all required Python packages are listed:
# Database connectivity
pyodbc>=4.0.35
psycopg2-binary>=2.9.5
redis>=4.5.1
# Data processing
pandas>=1.5.3
numpy>=1.24.2
# Utilities
python-dateutil>=2.8.2
tqdm>=4.64.1
# Logging and monitoring
structlog>=22.3.0
# Configuration
python-decouple>=3.6
# Testing (for validation)
pytest>=7.2.1
pytest-asyncio>=0.20.3
Validation Steps
Step 1: Build and Test ETL Container
# Build the ETL scheduler container
docker-compose build etl-scheduler
# Test container startup
docker-compose up etl-scheduler -d
# Check container logs
docker-compose logs etl-scheduler
Step 2: Test ETL Connection
# Test database connections
docker-compose exec etl-scheduler python -m etl.main test-connections
# Should output: "All connections successful"
Step 3: Test Manual ETL Execution
# Run ETL manually to test functionality
docker-compose exec etl-scheduler python -m etl.main build-catalog
# Check for success in logs
docker-compose exec etl-scheduler cat /app/logs/etl-last-run.txt
Step 4: Verify Cron Configuration
# Check cron job is configured
docker-compose exec etl-scheduler crontab -l
# Should show: "0 2 * * 0 cd /app && python -m etl.main build-catalog >> /app/logs/etl-cron.log 2>&1"
Step 5: Test ETL Status Monitoring
# Test status check script
docker-compose exec etl-scheduler /app/scripts/check-etl-status.sh
# Check health check endpoint
curl -f http://localhost:8080/health || echo "Health check failed"
Error Handling
Common Issues and Solutions
Issue: Cron daemon not starting Solution: Check entrypoint.sh permissions, verify cron package installation
Issue: Database connection failures Solution: Verify network connectivity, check environment variables, ensure databases are healthy
Issue: ETL process hanging Solution: Add timeout mechanisms, check for deadlocks, increase memory limits
Issue: Log files not being written Solution: Check volume mounts, verify directory permissions
ETL Failure Recovery
Automatic Recovery:
- Container restart policy:
unless-stopped - Retry logic in ETL scripts (max 3 retries)
- Health check will restart container if ETL consistently fails
Manual Recovery:
# Check ETL status
docker-compose exec etl-scheduler /app/scripts/check-etl-status.sh
# Restart ETL container
docker-compose restart etl-scheduler
# Run ETL manually if needed
docker-compose exec etl-scheduler python -m etl.main build-catalog
Rollback Procedure
-
Stop ETL scheduler:
docker-compose stop etl-scheduler -
Remove ETL-related files if needed:
rm -rf vehicle-etl/docker/ -
Remove ETL scheduler from docker-compose.yml
-
Restart remaining services:
docker-compose up -d
Next Steps
After successful completion of Phase 4:
- Proceed to Phase 5: Testing & Validation
- Monitor ETL execution for first few runs
- Set up alerting for ETL failures
- Document ETL maintenance procedures
Dependencies for Next Phase
- ETL scheduler running successfully
- Cron job configured and functional
- First ETL run completed successfully
- MVP Platform database populated with vehicle data
- ETL monitoring and health checks working