Files
Northern-Thailand-Ping-Rive…/src/main.py
grabowski 0ff58ecb13 Add historical data import functionality
- Add import_historical_data() method to EnhancedWaterMonitorScraper
- Support date range imports with Buddhist calendar API format
- Add CLI arguments --import-historical and --force-overwrite
- Include API rate limiting and skip existing data option
- Enable importing years of historical water level data

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 14:46:54 +07:00

495 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Main entry point for the Thailand Water Monitor system
"""
import argparse
import asyncio
import sys
import signal
import time
from datetime import datetime
from typing import Optional
from .config import Config
from .water_scraper_v3 import EnhancedWaterMonitorScraper
from .logging_config import setup_logging, get_logger
from .exceptions import ConfigurationError, DatabaseConnectionError
from .metrics import get_metrics_collector
logger = get_logger(__name__)
def setup_signal_handlers(scraper: Optional[EnhancedWaterMonitorScraper] = None):
"""Setup signal handlers for graceful shutdown"""
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down gracefully...")
if scraper:
logger.info("Stopping scraper...")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def run_test_cycle():
"""Run a single test cycle"""
logger.info("Running test cycle...")
try:
# Validate configuration
Config.validate_config()
# Initialize scraper
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Run single scraping cycle
result = scraper.run_scraping_cycle()
if result:
logger.info("✅ Test cycle completed successfully")
# Show some statistics
latest_data = scraper.get_latest_data(5)
if latest_data:
logger.info(f"Latest data points: {len(latest_data)}")
for data in latest_data[:3]: # Show first 3
logger.info(f"{data['station_code']}: {data['water_level']:.2f}m, {data['discharge']:.1f} cms")
else:
logger.warning("⚠️ Test cycle completed but no new data was found")
return True
except Exception as e:
logger.error(f"❌ Test cycle failed: {e}")
return False
def run_continuous_monitoring():
"""Run continuous monitoring with scheduling"""
logger.info("Starting continuous monitoring...")
try:
# Validate configuration
Config.validate_config()
# Initialize scraper
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Setup signal handlers
setup_signal_handlers(scraper)
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
logger.info("Press Ctrl+C to stop")
# Run initial cycle
logger.info("Running initial data collection...")
scraper.run_scraping_cycle()
# Start scheduled monitoring
import schedule
from datetime import datetime, timedelta
# Calculate next full hour
now = datetime.now()
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
minutes_to_wait = (next_hour - now).total_seconds() / 60
logger.info(f"Next scheduled run at {next_hour.strftime('%H:%M')} (waiting {minutes_to_wait:.1f} minutes)")
# Schedule at the top of each hour
schedule.every().hour.at(":00").do(scraper.run_scraping_cycle)
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
logger.info("Monitoring stopped by user")
except Exception as e:
logger.error(f"Monitoring failed: {e}")
return False
return True
def run_gap_filling(days_back: int):
"""Run gap filling for missing data"""
logger.info(f"Checking for data gaps in the last {days_back} days...")
try:
# Validate configuration
Config.validate_config()
# Initialize scraper
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Fill gaps
filled_count = scraper.fill_data_gaps(days_back)
if filled_count > 0:
logger.info(f"✅ Filled {filled_count} missing data points")
else:
logger.info("✅ No data gaps found")
return True
except Exception as e:
logger.error(f"❌ Gap filling failed: {e}")
return False
def run_data_update(days_back: int):
"""Update existing data with latest values"""
logger.info(f"Updating existing data for the last {days_back} days...")
try:
# Validate configuration
Config.validate_config()
# Initialize scraper
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Update data
updated_count = scraper.update_existing_data(days_back)
if updated_count > 0:
logger.info(f"✅ Updated {updated_count} data points")
else:
logger.info("✅ No data updates needed")
return True
except Exception as e:
logger.error(f"❌ Data update failed: {e}")
return False
def run_historical_import(start_date_str: str, end_date_str: str, skip_existing: bool = True):
"""Import historical data for a date range"""
try:
# Parse dates
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
if start_date > end_date:
logger.error("Start date must be before or equal to end date")
return False
logger.info(f"Importing historical data from {start_date.date()} to {end_date.date()}")
if skip_existing:
logger.info("Skipping dates that already have data")
# Validate configuration
Config.validate_config()
# Initialize scraper
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Import historical data
imported_count = scraper.import_historical_data(start_date, end_date, skip_existing)
if imported_count > 0:
logger.info(f"✅ Imported {imported_count} historical data points")
else:
logger.info("✅ No new data imported")
return True
except ValueError as e:
logger.error(f"❌ Invalid date format. Use YYYY-MM-DD: {e}")
return False
except Exception as e:
logger.error(f"❌ Historical import failed: {e}")
return False
def run_web_api():
"""Run the FastAPI web interface"""
logger.info("Starting web API server...")
try:
import uvicorn
from .web_api import app
# Validate configuration
Config.validate_config()
# Run the server
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_config=None # Use our custom logging
)
except ImportError:
logger.error("FastAPI not installed. Run: pip install fastapi uvicorn")
return False
except Exception as e:
logger.error(f"Web API failed: {e}")
return False
def run_alert_check():
"""Run water level alert check"""
logger.info("Running water level alert check...")
try:
from .alerting import WaterLevelAlertSystem
# Initialize alerting system
alerting = WaterLevelAlertSystem()
# Run alert check
results = alerting.run_alert_check()
if 'error' in results:
logger.error("❌ Alert check failed due to database connection")
return False
logger.info(f"✅ Alert check completed:")
logger.info(f" • Water level alerts: {results['water_alerts']}")
logger.info(f" • Data freshness alerts: {results['data_alerts']}")
logger.info(f" • Total alerts generated: {results['total_alerts']}")
logger.info(f" • Alerts sent: {results['sent']}")
return True
except Exception as e:
logger.error(f"❌ Alert check failed: {e}")
return False
def run_alert_test():
"""Send test alert message"""
logger.info("Sending test alert message...")
try:
from .alerting import WaterLevelAlertSystem
# Initialize alerting system
alerting = WaterLevelAlertSystem()
if not alerting.matrix_notifier:
logger.error("❌ Matrix notifier not configured")
logger.info("Please set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in your .env file")
return False
# Send test message
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Northern Thailand Ping River Monitor.\n\nIf you received this, Matrix notifications are working correctly!"
success = alerting.matrix_notifier.send_message(test_message)
if success:
logger.info("✅ Test alert message sent successfully")
else:
logger.error("❌ Test alert message failed to send")
return success
except Exception as e:
logger.error(f"❌ Test alert failed: {e}")
return False
def show_status():
"""Show current system status"""
logger.info("=== Northern Thailand Ping River Monitor Status ===")
try:
# Show configuration
Config.print_settings()
# Test database connection
logger.info("\n=== Database Connection Test ===")
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
if scraper.db_adapter:
logger.info("✅ Database connection successful")
# Show latest data
latest_data = scraper.get_latest_data(3)
if latest_data:
logger.info(f"\n=== Latest Data ({len(latest_data)} points) ===")
for data in latest_data:
timestamp = data['timestamp']
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
logger.info(f"{data['station_code']} ({timestamp}): {data['water_level']:.2f}m")
else:
logger.info("No data found in database")
else:
logger.error("❌ Database connection failed")
# Test alerting system
logger.info("\n=== Alerting System Status ===")
try:
from .alerting import WaterLevelAlertSystem
alerting = WaterLevelAlertSystem()
if alerting.matrix_notifier:
logger.info("✅ Matrix notifications configured")
else:
logger.warning("⚠️ Matrix notifications not configured")
logger.info("Set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in .env file")
except Exception as e:
logger.error(f"❌ Alerting system error: {e}")
# Show metrics if available
metrics_collector = get_metrics_collector()
metrics = metrics_collector.get_all_metrics()
if any(metrics.values()):
logger.info("\n=== Metrics Summary ===")
for metric_type, values in metrics.items():
if values:
logger.info(f"{metric_type.title()}: {len(values)} metrics")
return True
except Exception as e:
logger.error(f"Status check failed: {e}")
return False
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Northern Thailand Ping River Monitor",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --test # Run single test cycle
%(prog)s # Run continuous monitoring
%(prog)s --web-api # Start web API server
%(prog)s --fill-gaps 7 # Fill missing data for last 7 days
%(prog)s --update-data 2 # Update existing data for last 2 days
%(prog)s --import-historical 2024-01-01 2024-01-31 # Import historical data
%(prog)s --status # Show system status
%(prog)s --alert-check # Check water levels and send alerts
%(prog)s --alert-test # Send test Matrix message
"""
)
parser.add_argument(
"--test",
action="store_true",
help="Run a single test cycle"
)
parser.add_argument(
"--web-api",
action="store_true",
help="Start the web API server"
)
parser.add_argument(
"--fill-gaps",
type=int,
metavar="DAYS",
help="Fill missing data gaps for the specified number of days back"
)
parser.add_argument(
"--update-data",
type=int,
metavar="DAYS",
help="Update existing data for the specified number of days back"
)
parser.add_argument(
"--import-historical",
nargs=2,
metavar=("START_DATE", "END_DATE"),
help="Import historical data for date range (YYYY-MM-DD format)"
)
parser.add_argument(
"--force-overwrite",
action="store_true",
help="Overwrite existing data when importing historical data"
)
parser.add_argument(
"--status",
action="store_true",
help="Show current system status"
)
parser.add_argument(
"--alert-check",
action="store_true",
help="Run water level alert check"
)
parser.add_argument(
"--alert-test",
action="store_true",
help="Send test alert message to Matrix"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=Config.LOG_LEVEL,
help="Set logging level"
)
parser.add_argument(
"--log-file",
default=Config.LOG_FILE,
help="Log file path"
)
args = parser.parse_args()
# Setup logging
setup_logging(
log_level=args.log_level,
log_file=args.log_file,
enable_console=True,
enable_colors=True
)
logger.info("🏔️ Northern Thailand Ping River Monitor starting...")
logger.info(f"Version: 3.1.3")
logger.info(f"Log level: {args.log_level}")
try:
success = False
if args.test:
success = run_test_cycle()
elif args.web_api:
success = run_web_api()
elif args.fill_gaps is not None:
success = run_gap_filling(args.fill_gaps)
elif args.update_data is not None:
success = run_data_update(args.update_data)
elif args.import_historical is not None:
start_date, end_date = args.import_historical
skip_existing = not args.force_overwrite
success = run_historical_import(start_date, end_date, skip_existing)
elif args.status:
success = show_status()
elif args.alert_check:
success = run_alert_check()
elif args.alert_test:
success = run_alert_test()
else:
success = run_continuous_monitoring()
if success:
logger.info("✅ Operation completed successfully")
sys.exit(0)
else:
logger.error("❌ Operation failed")
sys.exit(1)
except ConfigurationError as e:
logger.error(f"Configuration error: {e}")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Operation cancelled by user")
sys.exit(0)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()