From 4cc792157fa2348bfe10329508fe785e27afb9dd Mon Sep 17 00:00:00 2001 From: grabowski Date: Sun, 28 Sep 2025 18:24:33 +0700 Subject: [PATCH] Implement adaptive scheduler with intelligent retry logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace fixed hourly schedule with adaptive scheduling system - Switch to 1-minute retries when no data is available from API - Return to hourly schedule once data is successfully fetched - Fix data fetching to use yesterday's date (API has 1-day delay) - Add comprehensive logging for scheduler mode changes - Improve resilience against API data availability issues The scheduler now intelligently adapts to data availability: - Normal mode: hourly runs at top of each hour - Retry mode: minute-based retries until data is available - Automatic mode switching based on fetch success/failure 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/main.py | 70 ++++++++++++++++++++++++++++------------- src/water_scraper_v3.py | 6 ++-- 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/main.py b/src/main.py index a82d51c..9473d25 100644 --- a/src/main.py +++ b/src/main.py @@ -64,51 +64,79 @@ def run_test_cycle(): return False def run_continuous_monitoring(): - """Run continuous monitoring with scheduling""" + """Run continuous monitoring with adaptive scheduling""" logger.info("Starting continuous monitoring...") - + try: # Validate configuration Config.validate_config() - + # Initialize scraper db_config = Config.get_database_config() scraper = EnhancedWaterMonitorScraper(db_config) - + # Setup signal handlers setup_signal_handlers(scraper) - + logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval") + logger.info("Adaptive retry: switches to 1-minute intervals when no data available") logger.info("Press Ctrl+C to stop") - + # Run initial cycle logger.info("Running initial data collection...") - scraper.run_scraping_cycle() + initial_success = scraper.run_scraping_cycle() - # Start scheduled monitoring - import schedule + # Adaptive scheduling state from datetime import datetime, timedelta + retry_mode = not initial_success + last_successful_fetch = None if not initial_success else datetime.now() - # Calculate next full hour - now = datetime.now() - next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) - minutes_to_wait = (next_hour - now).total_seconds() / 60 + if retry_mode: + logger.warning("No data fetched in initial run - entering retry mode") + next_run = datetime.now() + timedelta(minutes=1) + else: + logger.info("Initial data fetch successful - using hourly schedule") + next_run = (datetime.now() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) - logger.info(f"Next scheduled run at {next_hour.strftime('%H:%M')} (waiting {minutes_to_wait:.1f} minutes)") - - # Schedule at the top of each hour - schedule.every().hour.at(":00").do(scraper.run_scraping_cycle) + logger.info(f"Next run at {next_run.strftime('%H:%M')}") while True: - schedule.run_pending() - time.sleep(60) # Check every minute - + current_time = datetime.now() + + if current_time >= next_run: + logger.info("Running scheduled data collection...") + success = scraper.run_scraping_cycle() + + if success: + last_successful_fetch = current_time + if retry_mode: + logger.info("✅ Data fetch successful - switching back to hourly schedule") + retry_mode = False + # Schedule next run at the next full hour + next_run = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + else: + # Continue hourly schedule + next_run = (current_time + timedelta(hours=Config.SCRAPING_INTERVAL_HOURS)).replace(minute=0, second=0, microsecond=0) + + logger.info(f"Next scheduled run at {next_run.strftime('%H:%M')}") + else: + if not retry_mode: + logger.warning("⚠️ No data fetched - switching to retry mode (1-minute intervals)") + retry_mode = True + + # Schedule retry in 1 minute + next_run = current_time + timedelta(minutes=1) + logger.info(f"Retrying in 1 minute at {next_run.strftime('%H:%M')}") + + # Sleep for 10 seconds and check again + time.sleep(10) + except KeyboardInterrupt: logger.info("Monitoring stopped by user") except Exception as e: logger.error(f"Monitoring failed: {e}") return False - + return True def run_gap_filling(days_back: int): diff --git a/src/water_scraper_v3.py b/src/water_scraper_v3.py index 658a37f..2e8652a 100644 --- a/src/water_scraper_v3.py +++ b/src/water_scraper_v3.py @@ -407,9 +407,9 @@ class EnhancedWaterMonitorScraper: return None def fetch_water_data(self) -> Optional[List[Dict]]: - """Fetch water levels and discharge data from API for current date""" - current_date = datetime.datetime.now() - return self.fetch_water_data_for_date(current_date) + """Fetch water levels and discharge data from API for yesterday (API has 1-day delay)""" + yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + return self.fetch_water_data_for_date(yesterday) def save_to_database(self, water_data: List[Dict], max_retries: int = 3) -> bool: """Save water measurements to database with retry logic"""