Implement adaptive scheduler with intelligent retry logic
- Replace fixed hourly schedule with adaptive scheduling system - Switch to 1-minute retries when no data is available from API - Return to hourly schedule once data is successfully fetched - Fix data fetching to use yesterday's date (API has 1-day delay) - Add comprehensive logging for scheduler mode changes - Improve resilience against API data availability issues The scheduler now intelligently adapts to data availability: - Normal mode: hourly runs at top of each hour - Retry mode: minute-based retries until data is available - Automatic mode switching based on fetch success/failure 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
70
src/main.py
70
src/main.py
@@ -64,51 +64,79 @@ def run_test_cycle():
|
||||
return False
|
||||
|
||||
def run_continuous_monitoring():
|
||||
"""Run continuous monitoring with scheduling"""
|
||||
"""Run continuous monitoring with adaptive scheduling"""
|
||||
logger.info("Starting continuous monitoring...")
|
||||
|
||||
|
||||
try:
|
||||
# Validate configuration
|
||||
Config.validate_config()
|
||||
|
||||
|
||||
# Initialize scraper
|
||||
db_config = Config.get_database_config()
|
||||
scraper = EnhancedWaterMonitorScraper(db_config)
|
||||
|
||||
|
||||
# Setup signal handlers
|
||||
setup_signal_handlers(scraper)
|
||||
|
||||
|
||||
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
|
||||
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
|
||||
# Run initial cycle
|
||||
logger.info("Running initial data collection...")
|
||||
scraper.run_scraping_cycle()
|
||||
initial_success = scraper.run_scraping_cycle()
|
||||
|
||||
# Start scheduled monitoring
|
||||
import schedule
|
||||
# Adaptive scheduling state
|
||||
from datetime import datetime, timedelta
|
||||
retry_mode = not initial_success
|
||||
last_successful_fetch = None if not initial_success else datetime.now()
|
||||
|
||||
# Calculate next full hour
|
||||
now = datetime.now()
|
||||
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
||||
minutes_to_wait = (next_hour - now).total_seconds() / 60
|
||||
if retry_mode:
|
||||
logger.warning("No data fetched in initial run - entering retry mode")
|
||||
next_run = datetime.now() + timedelta(minutes=1)
|
||||
else:
|
||||
logger.info("Initial data fetch successful - using hourly schedule")
|
||||
next_run = (datetime.now() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
logger.info(f"Next scheduled run at {next_hour.strftime('%H:%M')} (waiting {minutes_to_wait:.1f} minutes)")
|
||||
|
||||
# Schedule at the top of each hour
|
||||
schedule.every().hour.at(":00").do(scraper.run_scraping_cycle)
|
||||
logger.info(f"Next run at {next_run.strftime('%H:%M')}")
|
||||
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(60) # Check every minute
|
||||
|
||||
current_time = datetime.now()
|
||||
|
||||
if current_time >= next_run:
|
||||
logger.info("Running scheduled data collection...")
|
||||
success = scraper.run_scraping_cycle()
|
||||
|
||||
if success:
|
||||
last_successful_fetch = current_time
|
||||
if retry_mode:
|
||||
logger.info("✅ Data fetch successful - switching back to hourly schedule")
|
||||
retry_mode = False
|
||||
# Schedule next run at the next full hour
|
||||
next_run = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
||||
else:
|
||||
# Continue hourly schedule
|
||||
next_run = (current_time + timedelta(hours=Config.SCRAPING_INTERVAL_HOURS)).replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
logger.info(f"Next scheduled run at {next_run.strftime('%H:%M')}")
|
||||
else:
|
||||
if not retry_mode:
|
||||
logger.warning("⚠️ No data fetched - switching to retry mode (1-minute intervals)")
|
||||
retry_mode = True
|
||||
|
||||
# Schedule retry in 1 minute
|
||||
next_run = current_time + timedelta(minutes=1)
|
||||
logger.info(f"Retrying in 1 minute at {next_run.strftime('%H:%M')}")
|
||||
|
||||
# Sleep for 10 seconds and check again
|
||||
time.sleep(10)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Monitoring stopped by user")
|
||||
except Exception as e:
|
||||
logger.error(f"Monitoring failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
return True
|
||||
|
||||
def run_gap_filling(days_back: int):
|
||||
|
@@ -407,9 +407,9 @@ class EnhancedWaterMonitorScraper:
|
||||
return None
|
||||
|
||||
def fetch_water_data(self) -> Optional[List[Dict]]:
|
||||
"""Fetch water levels and discharge data from API for current date"""
|
||||
current_date = datetime.datetime.now()
|
||||
return self.fetch_water_data_for_date(current_date)
|
||||
"""Fetch water levels and discharge data from API for yesterday (API has 1-day delay)"""
|
||||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
return self.fetch_water_data_for_date(yesterday)
|
||||
|
||||
def save_to_database(self, water_data: List[Dict], max_retries: int = 3) -> bool:
|
||||
"""Save water measurements to database with retry logic"""
|
||||
|
Reference in New Issue
Block a user