596 lines
15 KiB
Markdown
596 lines
15 KiB
Markdown
# Phase 4: Scheduled ETL Implementation
|
|
|
|
## Overview
|
|
|
|
This phase implements automated weekly ETL processing using a cron-based scheduler within the existing ETL container. The ETL process extracts data from the MSSQL source database, transforms it for optimal query performance, and loads it into the MVP Platform database.
|
|
|
|
## Prerequisites
|
|
|
|
- Phase 3 API migration completed successfully
|
|
- ETL scheduler container built and functional
|
|
- MSSQL source database with NHTSA data restored
|
|
- MVP Platform database accessible
|
|
- ETL Python code functional in vehicle-etl directory
|
|
|
|
## Scheduled ETL Architecture
|
|
|
|
**Container**: `etl-scheduler` (already defined in Phase 1)
|
|
**Schedule**: Weekly on Sunday at 2 AM (configurable)
|
|
**Runtime**: Python 3.11 with cron daemon
|
|
**Dependencies**: Both MSSQL and MVP Platform databases must be healthy
|
|
|
|
## Tasks
|
|
|
|
### Task 4.1: Create ETL Scheduler Dockerfile
|
|
|
|
**Location**: `vehicle-etl/docker/Dockerfile.etl`
|
|
|
|
**Action**: Create Dockerfile with cron daemon and ETL dependencies:
|
|
|
|
```dockerfile
|
|
FROM python:3.11-slim
|
|
|
|
# Install system dependencies including cron
|
|
RUN apt-get update && apt-get install -y \
|
|
cron \
|
|
procps \
|
|
curl \
|
|
&& rm -rf /var/lib/apt/lists/*
|
|
|
|
# Create app directory
|
|
WORKDIR /app
|
|
|
|
# Copy requirements and install Python dependencies
|
|
COPY requirements-etl.txt .
|
|
RUN pip install --no-cache-dir -r requirements-etl.txt
|
|
|
|
# Copy ETL source code
|
|
COPY etl/ ./etl/
|
|
COPY sql/ ./sql/
|
|
COPY scripts/ ./scripts/
|
|
|
|
# Create logs directory
|
|
RUN mkdir -p /app/logs
|
|
|
|
# Copy cron configuration script
|
|
COPY docker/setup-cron.sh /setup-cron.sh
|
|
RUN chmod +x /setup-cron.sh
|
|
|
|
# Copy entrypoint script
|
|
COPY docker/entrypoint.sh /entrypoint.sh
|
|
RUN chmod +x /entrypoint.sh
|
|
|
|
# Set up cron job
|
|
RUN /setup-cron.sh
|
|
|
|
# Health check
|
|
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
|
|
CMD python -c "import sys; from etl.connections import test_connections; sys.exit(0 if test_connections() else 1)"
|
|
|
|
ENTRYPOINT ["/entrypoint.sh"]
|
|
```
|
|
|
|
### Task 4.2: Create Cron Setup Script
|
|
|
|
**Location**: `vehicle-etl/docker/setup-cron.sh`
|
|
|
|
**Action**: Create script to configure cron job:
|
|
|
|
```bash
|
|
#!/bin/bash
|
|
|
|
# Create cron job from environment variable or default
|
|
ETL_SCHEDULE=${ETL_SCHEDULE:-"0 2 * * 0"}
|
|
|
|
# Create cron job that runs the ETL process
|
|
echo "$ETL_SCHEDULE cd /app && python -m etl.main build-catalog >> /app/logs/etl-cron.log 2>&1" > /etc/cron.d/etl-job
|
|
|
|
# Set permissions
|
|
chmod 0644 /etc/cron.d/etl-job
|
|
|
|
# Apply cron job
|
|
crontab /etc/cron.d/etl-job
|
|
|
|
echo "ETL cron job configured with schedule: $ETL_SCHEDULE"
|
|
```
|
|
|
|
### Task 4.3: Create Container Entrypoint
|
|
|
|
**Location**: `vehicle-etl/docker/entrypoint.sh`
|
|
|
|
**Action**: Create entrypoint script that starts cron daemon:
|
|
|
|
```bash
|
|
#!/bin/bash
|
|
set -e
|
|
|
|
# Start cron daemon in the background
|
|
cron -f &
|
|
CRON_PID=$!
|
|
|
|
# Function to handle shutdown
|
|
shutdown() {
|
|
echo "Shutting down ETL scheduler..."
|
|
kill $CRON_PID
|
|
exit 0
|
|
}
|
|
|
|
# Trap SIGTERM and SIGINT
|
|
trap shutdown SIGTERM SIGINT
|
|
|
|
# Run initial ETL if requested
|
|
if [ "$RUN_INITIAL_ETL" = "true" ]; then
|
|
echo "Running initial ETL process..."
|
|
cd /app && python -m etl.main build-catalog
|
|
fi
|
|
|
|
# Log startup
|
|
echo "ETL scheduler started with schedule: ${ETL_SCHEDULE:-0 2 * * 0}"
|
|
echo "Cron daemon PID: $CRON_PID"
|
|
|
|
# Keep container running
|
|
wait $CRON_PID
|
|
```
|
|
|
|
### Task 4.4: Update ETL Main Module
|
|
|
|
**Location**: `vehicle-etl/etl/main.py`
|
|
|
|
**Action**: Ensure ETL main module supports build-catalog command:
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
ETL Main Module - Vehicle Catalog Builder
|
|
"""
|
|
|
|
import sys
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime
|
|
import traceback
|
|
|
|
from etl.utils.logging import setup_logging
|
|
from etl.builders.vehicle_catalog_builder import VehicleCatalogBuilder
|
|
from etl.connections import test_connections
|
|
|
|
def build_catalog():
|
|
"""Run the complete ETL pipeline to build vehicle catalog"""
|
|
try:
|
|
setup_logging()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
start_time = datetime.now()
|
|
logger.info(f"Starting ETL pipeline at {start_time}")
|
|
|
|
# Test all connections first
|
|
if not test_connections():
|
|
logger.error("Connection tests failed - aborting ETL")
|
|
return False
|
|
|
|
# Initialize catalog builder
|
|
builder = VehicleCatalogBuilder()
|
|
|
|
# Run ETL pipeline steps
|
|
logger.info("Step 1: Extracting data from MSSQL source...")
|
|
extract_success = builder.extract_source_data()
|
|
if not extract_success:
|
|
logger.error("Data extraction failed")
|
|
return False
|
|
|
|
logger.info("Step 2: Transforming data for catalog...")
|
|
transform_success = builder.transform_catalog_data()
|
|
if not transform_success:
|
|
logger.error("Data transformation failed")
|
|
return False
|
|
|
|
logger.info("Step 3: Loading data to MVP Platform database...")
|
|
load_success = builder.load_catalog_data()
|
|
if not load_success:
|
|
logger.error("Data loading failed")
|
|
return False
|
|
|
|
# Generate completion report
|
|
end_time = datetime.now()
|
|
duration = end_time - start_time
|
|
logger.info(f"ETL pipeline completed successfully in {duration}")
|
|
|
|
# Write completion marker
|
|
with open('/app/logs/etl-last-run.txt', 'w') as f:
|
|
f.write(f"{end_time.isoformat()}\n")
|
|
f.write(f"Duration: {duration}\n")
|
|
f.write("Status: SUCCESS\n")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"ETL pipeline failed: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Write error marker
|
|
with open('/app/logs/etl-last-run.txt', 'w') as f:
|
|
f.write(f"{datetime.now().isoformat()}\n")
|
|
f.write(f"Status: FAILED\n")
|
|
f.write(f"Error: {str(e)}\n")
|
|
|
|
return False
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(description='Vehicle ETL Pipeline')
|
|
parser.add_argument('command', choices=['build-catalog', 'test-connections', 'validate'],
|
|
help='Command to execute')
|
|
parser.add_argument('--log-level', default='INFO',
|
|
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
|
|
help='Logging level')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=getattr(logging, args.log_level),
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
if args.command == 'build-catalog':
|
|
success = build_catalog()
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.command == 'test-connections':
|
|
success = test_connections()
|
|
print("All connections successful" if success else "Connection tests failed")
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.command == 'validate':
|
|
# Add validation logic here
|
|
print("Validation not yet implemented")
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
```
|
|
|
|
### Task 4.5: Create Connection Testing Module
|
|
|
|
**Location**: `vehicle-etl/etl/connections.py`
|
|
|
|
**Action**: Create connection testing utilities:
|
|
|
|
```python
|
|
"""
|
|
Database connection testing utilities
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import pyodbc
|
|
import psycopg2
|
|
import redis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def test_mssql_connection():
|
|
"""Test MSSQL source database connection"""
|
|
try:
|
|
connection_string = (
|
|
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
|
|
f"SERVER={os.getenv('MSSQL_HOST', 'localhost')};"
|
|
f"DATABASE={os.getenv('MSSQL_DATABASE', 'VPICList')};"
|
|
f"UID={os.getenv('MSSQL_USERNAME', 'sa')};"
|
|
f"PWD={os.getenv('MSSQL_PASSWORD')};"
|
|
f"TrustServerCertificate=yes;"
|
|
)
|
|
|
|
conn = pyodbc.connect(connection_string)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT @@VERSION")
|
|
version = cursor.fetchone()
|
|
logger.info(f"MSSQL connection successful: {version[0][:50]}...")
|
|
cursor.close()
|
|
conn.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"MSSQL connection failed: {str(e)}")
|
|
return False
|
|
|
|
def test_postgres_connection():
|
|
"""Test PostgreSQL MVP Platform database connection"""
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=os.getenv('POSTGRES_HOST', 'localhost'),
|
|
port=int(os.getenv('POSTGRES_PORT', '5432')),
|
|
database=os.getenv('POSTGRES_DATABASE', 'mvp-platform-vehicles'),
|
|
user=os.getenv('POSTGRES_USERNAME', 'mvp_platform_user'),
|
|
password=os.getenv('POSTGRES_PASSWORD')
|
|
)
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT version()")
|
|
version = cursor.fetchone()
|
|
logger.info(f"PostgreSQL connection successful: {version[0][:50]}...")
|
|
cursor.close()
|
|
conn.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"PostgreSQL connection failed: {str(e)}")
|
|
return False
|
|
|
|
def test_redis_connection():
|
|
"""Test Redis cache connection"""
|
|
try:
|
|
r = redis.Redis(
|
|
host=os.getenv('REDIS_HOST', 'localhost'),
|
|
port=int(os.getenv('REDIS_PORT', '6379')),
|
|
decode_responses=True
|
|
)
|
|
|
|
r.ping()
|
|
logger.info("Redis connection successful")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Redis connection failed: {str(e)}")
|
|
return False
|
|
|
|
def test_connections():
|
|
"""Test all database connections"""
|
|
logger.info("Testing all database connections...")
|
|
|
|
mssql_ok = test_mssql_connection()
|
|
postgres_ok = test_postgres_connection()
|
|
redis_ok = test_redis_connection()
|
|
|
|
all_ok = mssql_ok and postgres_ok and redis_ok
|
|
|
|
if all_ok:
|
|
logger.info("All database connections successful")
|
|
else:
|
|
logger.error("One or more database connections failed")
|
|
|
|
return all_ok
|
|
```
|
|
|
|
### Task 4.6: Create ETL Monitoring Script
|
|
|
|
**Location**: `vehicle-etl/scripts/check-etl-status.sh`
|
|
|
|
**Action**: Create monitoring script for ETL health:
|
|
|
|
```bash
|
|
#!/bin/bash
|
|
|
|
# ETL Status Monitoring Script
|
|
|
|
LOG_FILE="/app/logs/etl-last-run.txt"
|
|
CRON_LOG="/app/logs/etl-cron.log"
|
|
|
|
echo "=== ETL Status Check ==="
|
|
echo "Timestamp: $(date)"
|
|
echo
|
|
|
|
# Check if last run file exists
|
|
if [ ! -f "$LOG_FILE" ]; then
|
|
echo "❌ No ETL run detected yet"
|
|
exit 1
|
|
fi
|
|
|
|
# Read last run information
|
|
echo "📄 Last ETL Run Information:"
|
|
cat "$LOG_FILE"
|
|
echo
|
|
|
|
# Check if last run was successful
|
|
if grep -q "Status: SUCCESS" "$LOG_FILE"; then
|
|
echo "✅ Last ETL run was successful"
|
|
EXIT_CODE=0
|
|
else
|
|
echo "❌ Last ETL run failed"
|
|
EXIT_CODE=1
|
|
fi
|
|
|
|
# Show last few lines of cron log
|
|
echo
|
|
echo "📋 Recent ETL Log (last 10 lines):"
|
|
if [ -f "$CRON_LOG" ]; then
|
|
tail -10 "$CRON_LOG"
|
|
else
|
|
echo "No cron log found"
|
|
fi
|
|
|
|
echo
|
|
echo "=== End Status Check ==="
|
|
|
|
exit $EXIT_CODE
|
|
```
|
|
|
|
### Task 4.7: Update Docker Compose Health Checks
|
|
|
|
**Location**: `docker-compose.yml` (update existing etl-scheduler service)
|
|
|
|
**Action**: Update the ETL scheduler service definition with proper health checks:
|
|
|
|
```yaml
|
|
etl-scheduler:
|
|
build:
|
|
context: ./vehicle-etl
|
|
dockerfile: docker/Dockerfile.etl
|
|
container_name: mvp-etl-scheduler
|
|
environment:
|
|
# ... existing environment variables ...
|
|
# Health check configuration
|
|
- HEALTH_CHECK_ENABLED=true
|
|
volumes:
|
|
- ./vehicle-etl/logs:/app/logs
|
|
- etl_scheduler_data:/app/data
|
|
depends_on:
|
|
mssql-source:
|
|
condition: service_healthy
|
|
mvp-platform-database:
|
|
condition: service_healthy
|
|
redis:
|
|
condition: service_healthy
|
|
restart: unless-stopped
|
|
healthcheck:
|
|
test: ["CMD", "/app/scripts/check-etl-status.sh"]
|
|
interval: 60s
|
|
timeout: 30s
|
|
retries: 3
|
|
start_period: 120s
|
|
```
|
|
|
|
### Task 4.8: Create ETL Requirements File
|
|
|
|
**Location**: `vehicle-etl/requirements-etl.txt`
|
|
|
|
**Action**: Ensure all required Python packages are listed:
|
|
|
|
```txt
|
|
# Database connectivity
|
|
pyodbc>=4.0.35
|
|
psycopg2-binary>=2.9.5
|
|
redis>=4.5.1
|
|
|
|
# Data processing
|
|
pandas>=1.5.3
|
|
numpy>=1.24.2
|
|
|
|
# Utilities
|
|
python-dateutil>=2.8.2
|
|
tqdm>=4.64.1
|
|
|
|
# Logging and monitoring
|
|
structlog>=22.3.0
|
|
|
|
# Configuration
|
|
python-decouple>=3.6
|
|
|
|
# Testing (for validation)
|
|
pytest>=7.2.1
|
|
pytest-asyncio>=0.20.3
|
|
```
|
|
|
|
## Validation Steps
|
|
|
|
### Step 1: Build and Test ETL Container
|
|
|
|
```bash
|
|
# Build the ETL scheduler container
|
|
docker-compose build etl-scheduler
|
|
|
|
# Test container startup
|
|
docker-compose up etl-scheduler -d
|
|
|
|
# Check container logs
|
|
docker-compose logs etl-scheduler
|
|
```
|
|
|
|
### Step 2: Test ETL Connection
|
|
|
|
```bash
|
|
# Test database connections
|
|
docker-compose exec etl-scheduler python -m etl.main test-connections
|
|
|
|
# Should output: "All connections successful"
|
|
```
|
|
|
|
### Step 3: Test Manual ETL Execution
|
|
|
|
```bash
|
|
# Run ETL manually to test functionality
|
|
docker-compose exec etl-scheduler python -m etl.main build-catalog
|
|
|
|
# Check for success in logs
|
|
docker-compose exec etl-scheduler cat /app/logs/etl-last-run.txt
|
|
```
|
|
|
|
### Step 4: Verify Cron Configuration
|
|
|
|
```bash
|
|
# Check cron job is configured
|
|
docker-compose exec etl-scheduler crontab -l
|
|
|
|
# Should show: "0 2 * * 0 cd /app && python -m etl.main build-catalog >> /app/logs/etl-cron.log 2>&1"
|
|
```
|
|
|
|
### Step 5: Test ETL Status Monitoring
|
|
|
|
```bash
|
|
# Test status check script
|
|
docker-compose exec etl-scheduler /app/scripts/check-etl-status.sh
|
|
|
|
# Check health check endpoint
|
|
curl -f http://localhost:8080/health || echo "Health check failed"
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Common Issues and Solutions
|
|
|
|
**Issue**: Cron daemon not starting
|
|
**Solution**: Check entrypoint.sh permissions, verify cron package installation
|
|
|
|
**Issue**: Database connection failures
|
|
**Solution**: Verify network connectivity, check environment variables, ensure databases are healthy
|
|
|
|
**Issue**: ETL process hanging
|
|
**Solution**: Add timeout mechanisms, check for deadlocks, increase memory limits
|
|
|
|
**Issue**: Log files not being written
|
|
**Solution**: Check volume mounts, verify directory permissions
|
|
|
|
### ETL Failure Recovery
|
|
|
|
**Automatic Recovery**:
|
|
- Container restart policy: `unless-stopped`
|
|
- Retry logic in ETL scripts (max 3 retries)
|
|
- Health check will restart container if ETL consistently fails
|
|
|
|
**Manual Recovery**:
|
|
```bash
|
|
# Check ETL status
|
|
docker-compose exec etl-scheduler /app/scripts/check-etl-status.sh
|
|
|
|
# Restart ETL container
|
|
docker-compose restart etl-scheduler
|
|
|
|
# Run ETL manually if needed
|
|
docker-compose exec etl-scheduler python -m etl.main build-catalog
|
|
```
|
|
|
|
### Rollback Procedure
|
|
|
|
1. Stop ETL scheduler:
|
|
```bash
|
|
docker-compose stop etl-scheduler
|
|
```
|
|
|
|
2. Remove ETL-related files if needed:
|
|
```bash
|
|
rm -rf vehicle-etl/docker/
|
|
```
|
|
|
|
3. Remove ETL scheduler from docker-compose.yml
|
|
|
|
4. Restart remaining services:
|
|
```bash
|
|
docker-compose up -d
|
|
```
|
|
|
|
## Next Steps
|
|
|
|
After successful completion of Phase 4:
|
|
|
|
1. Proceed to [Phase 5: Testing & Validation](./phase-05-testing.md)
|
|
2. Monitor ETL execution for first few runs
|
|
3. Set up alerting for ETL failures
|
|
4. Document ETL maintenance procedures
|
|
|
|
## Dependencies for Next Phase
|
|
|
|
- ETL scheduler running successfully
|
|
- Cron job configured and functional
|
|
- First ETL run completed successfully
|
|
- MVP Platform database populated with vehicle data
|
|
- ETL monitoring and health checks working |