71 lines
2.3 KiB
Python
71 lines
2.3 KiB
Python
import schedule
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
# Import locally to avoid circular import
|
|
import importlib
|
|
from .config import config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def scheduled_etl_job():
|
|
"""Execute the ETL pipeline on schedule"""
|
|
start_time = datetime.now()
|
|
logger.info(f"Starting scheduled ETL job at {start_time}")
|
|
|
|
try:
|
|
# Import dynamically to avoid circular import
|
|
from .pipeline import run_etl_pipeline
|
|
success = run_etl_pipeline()
|
|
end_time = datetime.now()
|
|
duration = end_time - start_time
|
|
|
|
if success:
|
|
logger.info(f"ETL job completed successfully in {duration}")
|
|
else:
|
|
logger.error(f"ETL job failed after {duration}")
|
|
|
|
except Exception as e:
|
|
end_time = datetime.now()
|
|
duration = end_time - start_time
|
|
logger.error(f"ETL job crashed after {duration}: {e}")
|
|
|
|
def start_etl_scheduler():
|
|
"""Start the ETL scheduler"""
|
|
logger.info(f"Starting ETL scheduler with cron pattern: {config.ETL_SCHEDULE}")
|
|
|
|
# Parse cron pattern (simplified for weekly schedule)
|
|
# Format: "0 2 * * 0" = minute hour day-of-month month day-of-week
|
|
# "0 2 * * 0" = Every Sunday at 2:00 AM
|
|
|
|
if config.ETL_SCHEDULE == "0 2 * * 0":
|
|
schedule.every().sunday.at("02:00").do(scheduled_etl_job)
|
|
logger.info("Scheduled ETL to run every Sunday at 2:00 AM")
|
|
else:
|
|
# Default fallback - run once daily at 2 AM
|
|
schedule.every().day.at("02:00").do(scheduled_etl_job)
|
|
logger.warning(f"Unknown cron pattern {config.ETL_SCHEDULE}, defaulting to daily at 2:00 AM")
|
|
|
|
# Run scheduler loop
|
|
logger.info("ETL scheduler started")
|
|
|
|
while True:
|
|
try:
|
|
schedule.run_pending()
|
|
time.sleep(60) # Check every minute
|
|
except KeyboardInterrupt:
|
|
logger.info("ETL scheduler stopped by user")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"ETL scheduler error: {e}")
|
|
time.sleep(300) # Wait 5 minutes on error
|
|
|
|
if __name__ == "__main__":
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=getattr(logging, config.LOG_LEVEL.upper()),
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Start scheduler
|
|
start_etl_scheduler() |