Add comprehensive Matrix alerting system with Grafana integration

- Implement custom Python alerting system (src/alerting.py) with water level monitoring, data freshness checks, and Matrix notifications
- Add complete Grafana Matrix alerting setup guide (docs/GRAFANA_MATRIX_SETUP.md) with webhook configuration, alert rules, and notification policies
- Create Matrix quick start guide (docs/MATRIX_QUICK_START.md) for rapid deployment
- Integrate alerting commands into main application (--alert-check, --alert-test)
- Add Matrix configuration to environment variables (.env.example)
- Update Makefile with alerting targets (alert-check, alert-test)
- Enhance status command to show Matrix notification status
- Support station-specific water level thresholds and escalation rules
- Provide dual alerting approach: native Grafana alerts and custom Python system

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-26 16:18:02 +07:00
parent 6c7c128b4d
commit ca730e484b
7 changed files with 1062 additions and 9 deletions

334
src/alerting.py Normal file
View File

@@ -0,0 +1,334 @@
#!/usr/bin/env python3
"""
Water Level Alerting System with Matrix Integration
"""
import os
import json
import requests
import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
try:
from .config import Config
from .database_adapters import create_database_adapter
from .logging_config import get_logger
except ImportError:
from config import Config
from database_adapters import create_database_adapter
import logging
def get_logger(name):
return logging.getLogger(name)
logger = get_logger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class WaterAlert:
station_code: str
station_name: str
alert_type: str
level: AlertLevel
water_level: float
threshold: float
discharge: Optional[float] = None
timestamp: Optional[datetime.datetime] = None
message: Optional[str] = None
class MatrixNotifier:
def __init__(self, homeserver: str, access_token: str, room_id: str):
self.homeserver = homeserver.rstrip('/')
self.access_token = access_token
self.room_id = room_id
self.session = requests.Session()
def send_message(self, message: str, msgtype: str = "m.text") -> bool:
"""Send message to Matrix room"""
try:
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
data = {
"msgtype": msgtype,
"body": message
}
# Add transaction ID to prevent duplicates
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
url += f"/{txn_id}"
response = self.session.post(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
logger.info(f"Matrix message sent successfully: {response.json().get('event_id')}")
return True
except Exception as e:
logger.error(f"Failed to send Matrix message: {e}")
return False
def send_alert(self, alert: WaterAlert) -> bool:
"""Send formatted water alert to Matrix"""
emoji_map = {
AlertLevel.INFO: "",
AlertLevel.WARNING: "⚠️",
AlertLevel.CRITICAL: "🚨",
AlertLevel.EMERGENCY: "🆘"
}
emoji = emoji_map.get(alert.level, "📊")
message = f"""{emoji} **WATER LEVEL ALERT**
**Station:** {alert.station_code} ({alert.station_name})
**Alert Type:** {alert.alert_type}
**Severity:** {alert.level.value.upper()}
**Current Level:** {alert.water_level:.2f}m
**Threshold:** {alert.threshold:.2f}m
**Difference:** {(alert.water_level - alert.threshold):+.2f}m
"""
if alert.discharge:
message += f"**Discharge:** {alert.discharge:.1f} cms\n"
if alert.timestamp:
message += f"**Time:** {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n"
if alert.message:
message += f"\n**Details:** {alert.message}\n"
message += f"\n📈 View dashboard: {os.getenv('GRAFANA_URL', 'http://localhost:3000')}"
return self.send_message(message)
class WaterLevelAlertSystem:
def __init__(self):
self.db_adapter = None
self.matrix_notifier = None
self.thresholds = self._load_thresholds()
# Matrix configuration from environment
matrix_homeserver = os.getenv('MATRIX_HOMESERVER', 'https://matrix.org')
matrix_token = os.getenv('MATRIX_ACCESS_TOKEN')
matrix_room = os.getenv('MATRIX_ROOM_ID')
if matrix_token and matrix_room:
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
logger.info("Matrix notifications enabled")
else:
logger.warning("Matrix configuration missing - notifications disabled")
def _load_thresholds(self) -> Dict[str, Dict[str, float]]:
"""Load alert thresholds from config or database"""
# Default thresholds for Northern Thailand stations
return {
"P.1": {"warning": 5.0, "critical": 6.5, "emergency": 8.0},
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
"P.21": {"warning": 4.0, "critical": 5.5, "emergency": 7.0},
"P.67": {"warning": 6.0, "critical": 8.0, "emergency": 10.0},
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
# Default for unknown stations
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0}
}
def connect_database(self):
"""Initialize database connection"""
try:
db_config = Config.get_database_config()
self.db_adapter = create_database_adapter(
db_config['type'],
connection_string=db_config['connection_string']
)
if self.db_adapter.connect():
logger.info("Database connection established for alerting")
return True
else:
logger.error("Failed to connect to database")
return False
except Exception as e:
logger.error(f"Database connection error: {e}")
return False
def check_water_levels(self) -> List[WaterAlert]:
"""Check current water levels against thresholds"""
alerts = []
if not self.db_adapter:
logger.error("Database not connected")
return alerts
try:
# Get latest measurements
measurements = self.db_adapter.get_latest_measurements(limit=50)
for measurement in measurements:
station_code = measurement.get('station_code', 'UNKNOWN')
water_level = measurement.get('water_level')
if not water_level:
continue
# Get thresholds for this station
station_thresholds = self.thresholds.get(station_code, self.thresholds['default'])
# Check each threshold level
alert_level = None
threshold_value = None
alert_type = None
if water_level >= station_thresholds['emergency']:
alert_level = AlertLevel.EMERGENCY
threshold_value = station_thresholds['emergency']
alert_type = "Emergency Water Level"
elif water_level >= station_thresholds['critical']:
alert_level = AlertLevel.CRITICAL
threshold_value = station_thresholds['critical']
alert_type = "Critical Water Level"
elif water_level >= station_thresholds['warning']:
alert_level = AlertLevel.WARNING
threshold_value = station_thresholds['warning']
alert_type = "High Water Level"
if alert_level:
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
alert_type=alert_type,
level=alert_level,
water_level=water_level,
threshold=threshold_value,
discharge=measurement.get('discharge'),
timestamp=measurement.get('timestamp')
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking water levels: {e}")
return alerts
def check_data_freshness(self, max_age_hours: int = 2) -> List[WaterAlert]:
"""Check if data is fresh enough"""
alerts = []
if not self.db_adapter:
return alerts
try:
measurements = self.db_adapter.get_latest_measurements(limit=20)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
for measurement in measurements:
timestamp = measurement.get('timestamp')
if timestamp and timestamp < cutoff_time:
station_code = measurement.get('station_code', 'UNKNOWN')
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
alert_type="Stale Data",
level=AlertLevel.WARNING,
water_level=measurement.get('water_level', 0),
threshold=max_age_hours,
timestamp=timestamp,
message=f"No fresh data for {age_hours:.1f} hours"
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking data freshness: {e}")
return alerts
def send_alerts(self, alerts: List[WaterAlert]) -> int:
"""Send alerts via configured channels"""
sent_count = 0
if not alerts:
return sent_count
if self.matrix_notifier:
for alert in alerts:
if self.matrix_notifier.send_alert(alert):
sent_count += 1
# Could add other notification channels here:
# - Email
# - Discord
# - Telegram
# - SMS
return sent_count
def run_alert_check(self) -> Dict[str, int]:
"""Run complete alert check cycle"""
if not self.connect_database():
return {"error": 1}
# Check water levels
water_alerts = self.check_water_levels()
# Check data freshness
data_alerts = self.check_data_freshness()
# Combine alerts
all_alerts = water_alerts + data_alerts
# Send alerts
sent_count = self.send_alerts(all_alerts)
logger.info(f"Alert check complete: {len(all_alerts)} alerts, {sent_count} sent")
return {
"water_alerts": len(water_alerts),
"data_alerts": len(data_alerts),
"total_alerts": len(all_alerts),
"sent": sent_count
}
def main():
"""Standalone alerting check"""
import argparse
parser = argparse.ArgumentParser(description="Water Level Alert System")
parser.add_argument("--check", action="store_true", help="Run alert check")
parser.add_argument("--test", action="store_true", help="Send test message")
args = parser.parse_args()
alerting = WaterLevelAlertSystem()
if args.test:
if alerting.matrix_notifier:
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\nIf you received this, Matrix notifications are working correctly!"
success = alerting.matrix_notifier.send_message(test_message)
print(f"Test message sent: {success}")
else:
print("Matrix notifier not configured")
elif args.check:
results = alerting.run_alert_check()
print(f"Alert check results: {results}")
else:
print("Use --check or --test")
if __name__ == "__main__":
main()

View File

@@ -180,22 +180,81 @@ def run_web_api():
logger.error(f"Web API failed: {e}")
return False
def run_alert_check():
"""Run water level alert check"""
logger.info("Running water level alert check...")
try:
from .alerting import WaterLevelAlertSystem
# Initialize alerting system
alerting = WaterLevelAlertSystem()
# Run alert check
results = alerting.run_alert_check()
if 'error' in results:
logger.error("❌ Alert check failed due to database connection")
return False
logger.info(f"✅ Alert check completed:")
logger.info(f" • Water level alerts: {results['water_alerts']}")
logger.info(f" • Data freshness alerts: {results['data_alerts']}")
logger.info(f" • Total alerts generated: {results['total_alerts']}")
logger.info(f" • Alerts sent: {results['sent']}")
return True
except Exception as e:
logger.error(f"❌ Alert check failed: {e}")
return False
def run_alert_test():
"""Send test alert message"""
logger.info("Sending test alert message...")
try:
from .alerting import WaterLevelAlertSystem
# Initialize alerting system
alerting = WaterLevelAlertSystem()
if not alerting.matrix_notifier:
logger.error("❌ Matrix notifier not configured")
logger.info("Please set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in your .env file")
return False
# Send test message
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Northern Thailand Ping River Monitor.\n\nIf you received this, Matrix notifications are working correctly!"
success = alerting.matrix_notifier.send_message(test_message)
if success:
logger.info("✅ Test alert message sent successfully")
else:
logger.error("❌ Test alert message failed to send")
return success
except Exception as e:
logger.error(f"❌ Test alert failed: {e}")
return False
def show_status():
"""Show current system status"""
logger.info("=== Northern Thailand Ping River Monitor Status ===")
try:
# Show configuration
Config.print_settings()
# Test database connection
logger.info("\n=== Database Connection Test ===")
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
if scraper.db_adapter:
logger.info("✅ Database connection successful")
# Show latest data
latest_data = scraper.get_latest_data(3)
if latest_data:
@@ -209,19 +268,33 @@ def show_status():
logger.info("No data found in database")
else:
logger.error("❌ Database connection failed")
# Test alerting system
logger.info("\n=== Alerting System Status ===")
try:
from .alerting import WaterLevelAlertSystem
alerting = WaterLevelAlertSystem()
if alerting.matrix_notifier:
logger.info("✅ Matrix notifications configured")
else:
logger.warning("⚠️ Matrix notifications not configured")
logger.info("Set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in .env file")
except Exception as e:
logger.error(f"❌ Alerting system error: {e}")
# Show metrics if available
metrics_collector = get_metrics_collector()
metrics = metrics_collector.get_all_metrics()
if any(metrics.values()):
logger.info("\n=== Metrics Summary ===")
for metric_type, values in metrics.items():
if values:
logger.info(f"{metric_type.title()}: {len(values)} metrics")
return True
except Exception as e:
logger.error(f"Status check failed: {e}")
return False
@@ -239,6 +312,8 @@ Examples:
%(prog)s --fill-gaps 7 # Fill missing data for last 7 days
%(prog)s --update-data 2 # Update existing data for last 2 days
%(prog)s --status # Show system status
%(prog)s --alert-check # Check water levels and send alerts
%(prog)s --alert-test # Send test Matrix message
"""
)
@@ -273,7 +348,19 @@ Examples:
action="store_true",
help="Show current system status"
)
parser.add_argument(
"--alert-check",
action="store_true",
help="Run water level alert check"
)
parser.add_argument(
"--alert-test",
action="store_true",
help="Send test alert message to Matrix"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
@@ -314,6 +401,10 @@ Examples:
success = run_data_update(args.update_data)
elif args.status:
success = show_status()
elif args.alert_check:
success = run_alert_check()
elif args.alert_test:
success = run_alert_test()
else:
success = run_continuous_monitoring()