Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Alerts now run automatically after every successful new data fetch - Works for both hourly fetches and retry mode exits - Alert check runs when fresh data is saved to database - Logs alert results (total generated and sent count) Co-Authored-By: Claude <noreply@anthropic.com>
538 lines
18 KiB
Python
538 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Main entry point for the Thailand Water Monitor system
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import sys
|
|
import signal
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from .config import Config
|
|
from .water_scraper_v3 import EnhancedWaterMonitorScraper
|
|
from .logging_config import setup_logging, get_logger
|
|
from .exceptions import ConfigurationError, DatabaseConnectionError
|
|
from .metrics import get_metrics_collector
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
def setup_signal_handlers(scraper: Optional[EnhancedWaterMonitorScraper] = None):
|
|
"""Setup signal handlers for graceful shutdown"""
|
|
def signal_handler(signum, frame):
|
|
logger.info(f"Received signal {signum}, shutting down gracefully...")
|
|
if scraper:
|
|
logger.info("Stopping scraper...")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
def run_test_cycle():
|
|
"""Run a single test cycle"""
|
|
logger.info("Running test cycle...")
|
|
|
|
try:
|
|
# Validate configuration
|
|
Config.validate_config()
|
|
|
|
# Initialize scraper
|
|
db_config = Config.get_database_config()
|
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
|
|
|
# Run single scraping cycle
|
|
result = scraper.run_scraping_cycle()
|
|
|
|
if result:
|
|
logger.info("✅ Test cycle completed successfully")
|
|
|
|
# Show some statistics
|
|
latest_data = scraper.get_latest_data(5)
|
|
if latest_data:
|
|
logger.info(f"Latest data points: {len(latest_data)}")
|
|
for data in latest_data[:3]: # Show first 3
|
|
logger.info(f" • {data['station_code']}: {data['water_level']:.2f}m, {data['discharge']:.1f} cms")
|
|
else:
|
|
logger.warning("⚠️ Test cycle completed but no new data was found")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Test cycle failed: {e}")
|
|
return False
|
|
|
|
def run_continuous_monitoring():
|
|
"""Run continuous monitoring with adaptive scheduling and alerting"""
|
|
logger.info("Starting continuous monitoring...")
|
|
|
|
try:
|
|
# Validate configuration
|
|
Config.validate_config()
|
|
|
|
# Initialize scraper
|
|
db_config = Config.get_database_config()
|
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
|
|
|
# Initialize alerting system
|
|
from .alerting import WaterLevelAlertSystem
|
|
alerting = WaterLevelAlertSystem()
|
|
|
|
# Setup signal handlers
|
|
setup_signal_handlers(scraper)
|
|
|
|
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
|
|
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
|
|
logger.info("Alerts: automatic check after each successful data fetch")
|
|
logger.info("Press Ctrl+C to stop")
|
|
|
|
# Run initial cycle
|
|
logger.info("Running initial data collection...")
|
|
initial_success = scraper.run_scraping_cycle()
|
|
|
|
# Adaptive scheduling state
|
|
from datetime import datetime, timedelta
|
|
retry_mode = not initial_success
|
|
last_successful_fetch = None if not initial_success else datetime.now()
|
|
|
|
if retry_mode:
|
|
logger.warning("No data fetched in initial run - entering retry mode")
|
|
next_run = datetime.now() + timedelta(minutes=1)
|
|
else:
|
|
logger.info("Initial data fetch successful - using hourly schedule")
|
|
next_run = (datetime.now() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
|
|
|
logger.info(f"Next run at {next_run.strftime('%H:%M')}")
|
|
|
|
while True:
|
|
current_time = datetime.now()
|
|
|
|
if current_time >= next_run:
|
|
logger.info("Running scheduled data collection...")
|
|
success = scraper.run_scraping_cycle()
|
|
|
|
if success:
|
|
last_successful_fetch = current_time
|
|
|
|
# Run alert check after every successful new data fetch
|
|
logger.info("Running alert check...")
|
|
try:
|
|
alert_results = alerting.run_alert_check()
|
|
if alert_results.get('total_alerts', 0) > 0:
|
|
logger.info(f"Alerts: {alert_results['total_alerts']} generated, {alert_results['sent']} sent")
|
|
except Exception as e:
|
|
logger.error(f"Alert check failed: {e}")
|
|
|
|
if retry_mode:
|
|
logger.info("✅ Data fetch successful - switching back to hourly schedule")
|
|
retry_mode = False
|
|
# Schedule next run at the next full hour
|
|
next_run = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
|
else:
|
|
# Continue hourly schedule
|
|
next_run = (current_time + timedelta(hours=Config.SCRAPING_INTERVAL_HOURS)).replace(minute=0, second=0, microsecond=0)
|
|
|
|
logger.info(f"Next scheduled run at {next_run.strftime('%H:%M')}")
|
|
else:
|
|
if not retry_mode:
|
|
logger.warning("⚠️ No data fetched - switching to retry mode (1-minute intervals)")
|
|
retry_mode = True
|
|
|
|
# Schedule retry in 1 minute
|
|
next_run = current_time + timedelta(minutes=1)
|
|
logger.info(f"Retrying in 1 minute at {next_run.strftime('%H:%M')}")
|
|
|
|
# Sleep for 10 seconds and check again
|
|
time.sleep(10)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Monitoring stopped by user")
|
|
except Exception as e:
|
|
logger.error(f"Monitoring failed: {e}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def run_gap_filling(days_back: int):
|
|
"""Run gap filling for missing data"""
|
|
logger.info(f"Checking for data gaps in the last {days_back} days...")
|
|
|
|
try:
|
|
# Validate configuration
|
|
Config.validate_config()
|
|
|
|
# Initialize scraper
|
|
db_config = Config.get_database_config()
|
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
|
|
|
# Fill gaps
|
|
filled_count = scraper.fill_data_gaps(days_back)
|
|
|
|
if filled_count > 0:
|
|
logger.info(f"✅ Filled {filled_count} missing data points")
|
|
else:
|
|
logger.info("✅ No data gaps found")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Gap filling failed: {e}")
|
|
return False
|
|
|
|
def run_data_update(days_back: int):
|
|
"""Update existing data with latest values"""
|
|
logger.info(f"Updating existing data for the last {days_back} days...")
|
|
|
|
try:
|
|
# Validate configuration
|
|
Config.validate_config()
|
|
|
|
# Initialize scraper
|
|
db_config = Config.get_database_config()
|
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
|
|
|
# Update data
|
|
updated_count = scraper.update_existing_data(days_back)
|
|
|
|
if updated_count > 0:
|
|
logger.info(f"✅ Updated {updated_count} data points")
|
|
else:
|
|
logger.info("✅ No data updates needed")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Data update failed: {e}")
|
|
return False
|
|
|
|
def run_historical_import(start_date_str: str, end_date_str: str, skip_existing: bool = True):
|
|
"""Import historical data for a date range"""
|
|
try:
|
|
# Parse dates
|
|
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
|
|
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
|
|
|
|
if start_date > end_date:
|
|
logger.error("Start date must be before or equal to end date")
|
|
return False
|
|
|
|
logger.info(f"Importing historical data from {start_date.date()} to {end_date.date()}")
|
|
if skip_existing:
|
|
logger.info("Skipping dates that already have data")
|
|
|
|
# Validate configuration
|
|
Config.validate_config()
|
|
|
|
# Initialize scraper
|
|
db_config = Config.get_database_config()
|
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
|
|
|
# Import historical data
|
|
imported_count = scraper.import_historical_data(start_date, end_date, skip_existing)
|
|
|
|
if imported_count > 0:
|
|
logger.info(f"✅ Imported {imported_count} historical data points")
|
|
else:
|
|
logger.info("✅ No new data imported")
|
|
|
|
return True
|
|
|
|
except ValueError as e:
|
|
logger.error(f"❌ Invalid date format. Use YYYY-MM-DD: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"❌ Historical import failed: {e}")
|
|
return False
|
|
|
|
def run_web_api():
|
|
"""Run the FastAPI web interface"""
|
|
logger.info("Starting web API server...")
|
|
|
|
try:
|
|
import uvicorn
|
|
from .web_api import app
|
|
|
|
# Validate configuration
|
|
Config.validate_config()
|
|
|
|
# Run the server
|
|
uvicorn.run(
|
|
app,
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
log_config=None # Use our custom logging
|
|
)
|
|
|
|
except ImportError:
|
|
logger.error("FastAPI not installed. Run: pip install fastapi uvicorn")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Web API failed: {e}")
|
|
return False
|
|
|
|
def run_alert_check():
|
|
"""Run water level alert check"""
|
|
logger.info("Running water level alert check...")
|
|
|
|
try:
|
|
from .alerting import WaterLevelAlertSystem
|
|
|
|
# Initialize alerting system
|
|
alerting = WaterLevelAlertSystem()
|
|
|
|
# Run alert check
|
|
results = alerting.run_alert_check()
|
|
|
|
if 'error' in results:
|
|
logger.error("❌ Alert check failed due to database connection")
|
|
return False
|
|
|
|
logger.info(f"✅ Alert check completed:")
|
|
logger.info(f" • Water level alerts: {results['water_alerts']}")
|
|
logger.info(f" • Data freshness alerts: {results['data_alerts']}")
|
|
logger.info(f" • Total alerts generated: {results['total_alerts']}")
|
|
logger.info(f" • Alerts sent: {results['sent']}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Alert check failed: {e}")
|
|
return False
|
|
|
|
def run_alert_test():
|
|
"""Send test alert message"""
|
|
logger.info("Sending test alert message...")
|
|
|
|
try:
|
|
from .alerting import WaterLevelAlertSystem
|
|
|
|
# Initialize alerting system
|
|
alerting = WaterLevelAlertSystem()
|
|
|
|
if not alerting.matrix_notifier:
|
|
logger.error("❌ Matrix notifier not configured")
|
|
logger.info("Please set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in your .env file")
|
|
return False
|
|
|
|
# Send test message
|
|
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Northern Thailand Ping River Monitor.\n\nIf you received this, Matrix notifications are working correctly!"
|
|
success = alerting.matrix_notifier.send_message(test_message)
|
|
|
|
if success:
|
|
logger.info("✅ Test alert message sent successfully")
|
|
else:
|
|
logger.error("❌ Test alert message failed to send")
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Test alert failed: {e}")
|
|
return False
|
|
|
|
def show_status():
|
|
"""Show current system status"""
|
|
logger.info("=== Northern Thailand Ping River Monitor Status ===")
|
|
|
|
try:
|
|
# Show configuration
|
|
Config.print_settings()
|
|
|
|
# Test database connection
|
|
logger.info("\n=== Database Connection Test ===")
|
|
db_config = Config.get_database_config()
|
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
|
|
|
if scraper.db_adapter:
|
|
logger.info("✅ Database connection successful")
|
|
|
|
# Show latest data
|
|
latest_data = scraper.get_latest_data(3)
|
|
if latest_data:
|
|
logger.info(f"\n=== Latest Data ({len(latest_data)} points) ===")
|
|
for data in latest_data:
|
|
timestamp = data['timestamp']
|
|
if isinstance(timestamp, str):
|
|
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
|
logger.info(f" • {data['station_code']} ({timestamp}): {data['water_level']:.2f}m")
|
|
else:
|
|
logger.info("No data found in database")
|
|
else:
|
|
logger.error("❌ Database connection failed")
|
|
|
|
# Test alerting system
|
|
logger.info("\n=== Alerting System Status ===")
|
|
try:
|
|
from .alerting import WaterLevelAlertSystem
|
|
alerting = WaterLevelAlertSystem()
|
|
|
|
if alerting.matrix_notifier:
|
|
logger.info("✅ Matrix notifications configured")
|
|
else:
|
|
logger.warning("⚠️ Matrix notifications not configured")
|
|
logger.info("Set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in .env file")
|
|
except Exception as e:
|
|
logger.error(f"❌ Alerting system error: {e}")
|
|
|
|
# Show metrics if available
|
|
metrics_collector = get_metrics_collector()
|
|
metrics = metrics_collector.get_all_metrics()
|
|
|
|
if any(metrics.values()):
|
|
logger.info("\n=== Metrics Summary ===")
|
|
for metric_type, values in metrics.items():
|
|
if values:
|
|
logger.info(f"{metric_type.title()}: {len(values)} metrics")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Status check failed: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Northern Thailand Ping River Monitor",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s --test # Run single test cycle
|
|
%(prog)s # Run continuous monitoring
|
|
%(prog)s --web-api # Start web API server
|
|
%(prog)s --fill-gaps 7 # Fill missing data for last 7 days
|
|
%(prog)s --update-data 2 # Update existing data for last 2 days
|
|
%(prog)s --import-historical 2024-01-01 2024-01-31 # Import historical data
|
|
%(prog)s --status # Show system status
|
|
%(prog)s --alert-check # Check water levels and send alerts
|
|
%(prog)s --alert-test # Send test Matrix message
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--test",
|
|
action="store_true",
|
|
help="Run a single test cycle"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--web-api",
|
|
action="store_true",
|
|
help="Start the web API server"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--fill-gaps",
|
|
type=int,
|
|
metavar="DAYS",
|
|
help="Fill missing data gaps for the specified number of days back"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--update-data",
|
|
type=int,
|
|
metavar="DAYS",
|
|
help="Update existing data for the specified number of days back"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--import-historical",
|
|
nargs=2,
|
|
metavar=("START_DATE", "END_DATE"),
|
|
help="Import historical data for date range (YYYY-MM-DD format)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--force-overwrite",
|
|
action="store_true",
|
|
help="Overwrite existing data when importing historical data"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--status",
|
|
action="store_true",
|
|
help="Show current system status"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--alert-check",
|
|
action="store_true",
|
|
help="Run water level alert check"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--alert-test",
|
|
action="store_true",
|
|
help="Send test alert message to Matrix"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--log-level",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
default=Config.LOG_LEVEL,
|
|
help="Set logging level"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--log-file",
|
|
default=Config.LOG_FILE,
|
|
help="Log file path"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging
|
|
setup_logging(
|
|
log_level=args.log_level,
|
|
log_file=args.log_file,
|
|
enable_console=True,
|
|
enable_colors=True
|
|
)
|
|
|
|
logger.info("🏔️ Northern Thailand Ping River Monitor starting...")
|
|
logger.info(f"Version: 3.1.3")
|
|
logger.info(f"Log level: {args.log_level}")
|
|
|
|
try:
|
|
success = False
|
|
|
|
if args.test:
|
|
success = run_test_cycle()
|
|
elif args.web_api:
|
|
success = run_web_api()
|
|
elif args.fill_gaps is not None:
|
|
success = run_gap_filling(args.fill_gaps)
|
|
elif args.update_data is not None:
|
|
success = run_data_update(args.update_data)
|
|
elif args.import_historical is not None:
|
|
start_date, end_date = args.import_historical
|
|
skip_existing = not args.force_overwrite
|
|
success = run_historical_import(start_date, end_date, skip_existing)
|
|
elif args.status:
|
|
success = show_status()
|
|
elif args.alert_check:
|
|
success = run_alert_check()
|
|
elif args.alert_test:
|
|
success = run_alert_test()
|
|
else:
|
|
success = run_continuous_monitoring()
|
|
|
|
if success:
|
|
logger.info("✅ Operation completed successfully")
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("❌ Operation failed")
|
|
sys.exit(1)
|
|
|
|
except ConfigurationError as e:
|
|
logger.error(f"Configuration error: {e}")
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
logger.info("Operation cancelled by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |