Files
Northern-Thailand-Ping-Rive…/src/alerting.py
grabowski a424c50c5e Change stale data alert threshold from 4 to 12 hours
- Stale data alerts now only trigger after 12 hours without new data
- Reduces false alerts during expected data gaps

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-31 11:29:52 +07:00

524 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Water Level Alerting System with Matrix Integration
"""
import datetime
import os
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import requests
try:
from .config import Config
from .database_adapters import create_database_adapter
from .logging_config import get_logger
except ImportError:
import logging
from config import Config
from database_adapters import create_database_adapter
def get_logger(name):
return logging.getLogger(name)
logger = get_logger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class WaterAlert:
station_code: str
station_name: str
alert_type: str
level: AlertLevel
water_level: float
threshold: float
discharge: Optional[float] = None
timestamp: Optional[datetime.datetime] = None
message: Optional[str] = None
class MatrixNotifier:
def __init__(self, homeserver: str, access_token: str, room_id: str):
self.homeserver = homeserver.rstrip("/")
self.access_token = access_token
self.room_id = room_id
self.session = requests.Session()
def send_message(self, message: str, msgtype: str = "m.text") -> bool:
"""Send message to Matrix room"""
try:
# Add transaction ID to prevent duplicates
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}"
headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json"}
data = {"msgtype": msgtype, "body": message}
# Matrix API requires PUT when transaction ID is in the URL path
response = self.session.put(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
logger.info(f"Matrix message sent successfully: {response.json().get('event_id')}")
return True
except Exception as e:
logger.error(f"Failed to send Matrix message: {e}")
return False
def send_alert(self, alert: WaterAlert) -> bool:
"""Send formatted water alert to Matrix"""
emoji_map = {
AlertLevel.INFO: "",
AlertLevel.WARNING: "⚠️",
AlertLevel.CRITICAL: "🚨",
AlertLevel.EMERGENCY: "🆘",
}
emoji = emoji_map.get(alert.level, "📊")
message = f"""{emoji} **WATER LEVEL ALERT**
**Station:** {alert.station_code} ({alert.station_name})
**Alert Type:** {alert.alert_type}
**Severity:** {alert.level.value.upper()}
**Current Level:** {alert.water_level:.2f}m
**Threshold:** {alert.threshold:.2f}m
**Difference:** {(alert.water_level - alert.threshold):+.2f}m
"""
if alert.discharge:
message += f"**Discharge:** {alert.discharge:.1f} cms\n"
if alert.timestamp:
message += f"**Time:** {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n"
if alert.message:
message += f"\n**Details:** {alert.message}\n"
# Add Grafana dashboard link
grafana_url = (
"https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water"
"?orgId=1&from=now-30d&to=now&timezone=browser"
)
message += f"\n📈 **View Dashboard:** {grafana_url}"
return self.send_message(message)
class WaterLevelAlertSystem:
# Stations upstream of Chiang Mai (and CNX itself) to monitor
UPSTREAM_STATIONS = {
"P.20", # Ban Chiang Dao
"P.75", # Ban Chai Lat
"P.92", # Ban Muang Aut
"P.4A", # Ban Mae Taeng
"P.67", # Ban Tae
"P.21", # Ban Rim Tai
"P.103", # Ring Bridge 3
"P.1", # Nawarat Bridge (Chiang Mai)
}
def __init__(self):
self.db_adapter = None
self.matrix_notifier = None
self.thresholds = self._load_thresholds()
# Matrix configuration from environment
matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
matrix_token = os.getenv("MATRIX_ACCESS_TOKEN")
matrix_room = os.getenv("MATRIX_ROOM_ID")
if matrix_token and matrix_room:
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
logger.info("Matrix notifications enabled")
else:
logger.warning("Matrix configuration missing - notifications disabled")
def _load_thresholds(self) -> Dict[str, Dict[str, float]]:
"""Load alert thresholds from config or database"""
# Default thresholds for Northern Thailand stations
return {
"P.1": {
# Zone-based thresholds for Nawarat Bridge (P.1)
"zone_1": 3.7,
"zone_2": 3.9,
"zone_3": 4.0,
"zone_4": 4.1,
"zone_5": 4.2,
"zone_6": 4.3,
"zone_7": 4.6,
"zone_8": 4.8,
"newedge": 4.8, # Same as zone 8 or adjust as needed
# Keep legacy thresholds for compatibility
"warning": 3.7,
"critical": 4.3,
"emergency": 4.8,
},
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
"P.21": {"warning": 4.0, "critical": 5.5, "emergency": 7.0},
"P.67": {"warning": 6.0, "critical": 8.0, "emergency": 10.0},
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
# Default for unknown stations
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0},
}
def connect_database(self):
"""Initialize database connection"""
try:
db_config = Config.get_database_config()
self.db_adapter = create_database_adapter(
db_config["type"], connection_string=db_config["connection_string"]
)
if self.db_adapter.connect():
logger.info("Database connection established for alerting")
return True
else:
logger.error("Failed to connect to database")
return False
except Exception as e:
logger.error(f"Database connection error: {e}")
return False
def check_water_levels(self) -> List[WaterAlert]:
"""Check current water levels against thresholds"""
alerts = []
if not self.db_adapter:
logger.error("Database not connected")
return alerts
try:
# Get latest measurements
measurements = self.db_adapter.get_latest_measurements(limit=50)
for measurement in measurements:
station_code = measurement.get("station_code", "UNKNOWN")
water_level = measurement.get("water_level")
if not water_level:
continue
# Only alert for upstream stations and Chiang Mai
if station_code not in self.UPSTREAM_STATIONS:
continue
# Get thresholds for this station
station_thresholds = self.thresholds.get(station_code, self.thresholds["default"])
# Check each threshold level
alert_level = None
threshold_value = None
alert_type = None
# Special handling for P.1 with zone-based thresholds
if station_code == "P.1" and "zone_1" in station_thresholds:
# Check all zones in reverse order (highest to lowest)
zones = [
("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"),
("newedge", 4.8, AlertLevel.EMERGENCY, "NewEdge Alert Level"),
("zone_7", 4.6, AlertLevel.CRITICAL, "Zone 7 - Critical"),
("zone_6", 4.3, AlertLevel.CRITICAL, "Zone 6 - Critical"),
("zone_5", 4.2, AlertLevel.WARNING, "Zone 5 - Warning"),
("zone_4", 4.1, AlertLevel.WARNING, "Zone 4 - Warning"),
("zone_3", 4.0, AlertLevel.WARNING, "Zone 3 - Warning"),
("zone_2", 3.9, AlertLevel.INFO, "Zone 2 - Info"),
("zone_1", 3.7, AlertLevel.INFO, "Zone 1 - Info"),
]
for zone_name, zone_threshold, zone_alert_level, zone_description in zones:
if water_level >= zone_threshold:
alert_level = zone_alert_level
threshold_value = zone_threshold
alert_type = zone_description
break
else:
# Standard threshold checking for other stations
if water_level >= station_thresholds.get("emergency", float("inf")):
alert_level = AlertLevel.EMERGENCY
threshold_value = station_thresholds["emergency"]
alert_type = "Emergency Water Level"
elif water_level >= station_thresholds.get("critical", float("inf")):
alert_level = AlertLevel.CRITICAL
threshold_value = station_thresholds["critical"]
alert_type = "Critical Water Level"
elif water_level >= station_thresholds.get("warning", float("inf")):
alert_level = AlertLevel.WARNING
threshold_value = station_thresholds["warning"]
alert_type = "High Water Level"
if alert_level:
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get("station_name_th", f"Station {station_code}"),
alert_type=alert_type,
level=alert_level,
water_level=water_level,
threshold=threshold_value,
discharge=measurement.get("discharge"),
timestamp=measurement.get("timestamp"),
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking water levels: {e}")
return alerts
def check_data_freshness(self, max_age_hours: int = 12) -> List[WaterAlert]:
"""Check if data is fresh enough"""
alerts = []
if not self.db_adapter:
return alerts
try:
measurements = self.db_adapter.get_latest_measurements(limit=20)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
for measurement in measurements:
timestamp = measurement.get("timestamp")
if timestamp and timestamp < cutoff_time:
station_code = measurement.get("station_code", "UNKNOWN")
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get("station_name_th", f"Station {station_code}"),
alert_type="Stale Data",
level=AlertLevel.WARNING,
water_level=measurement.get("water_level", 0),
threshold=max_age_hours,
timestamp=timestamp,
message=f"No fresh data for {age_hours:.1f} hours",
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking data freshness: {e}")
return alerts
def check_rate_of_change(self, lookback_hours: int = 3) -> List[WaterAlert]:
"""Check for rapid water level changes over recent hours"""
alerts = []
if not self.db_adapter:
return alerts
try:
# Define rate-of-change thresholds (meters per hour)
rate_thresholds = {
"P.1": {
"warning": 0.15, # 15cm/hour - moderate rise
"critical": 0.25, # 25cm/hour - rapid rise
"emergency": 0.40, # 40cm/hour - very rapid rise
},
"default": {"warning": 0.20, "critical": 0.35, "emergency": 0.50},
}
# Get recent measurements for each station
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=lookback_hours)
# Get unique stations from latest data
latest = self.db_adapter.get_latest_measurements(limit=20)
station_codes = set(m.get("station_code") for m in latest if m.get("station_code"))
for station_code in station_codes:
try:
# Only alert for upstream stations and Chiang Mai
if station_code not in self.UPSTREAM_STATIONS:
continue
# Get measurements for this station in the time window
current_time = datetime.datetime.now()
measurements = self.db_adapter.get_measurements_by_timerange(
start_time=cutoff_time, end_time=current_time, station_codes=[station_code]
)
if len(measurements) < 2:
continue # Need at least 2 points to calculate rate
# Sort by timestamp
measurements = sorted(measurements, key=lambda m: m.get("timestamp"))
# Get oldest and newest measurements
oldest = measurements[0]
newest = measurements[-1]
oldest_time = oldest.get("timestamp")
oldest_level = oldest.get("water_level")
newest_time = newest.get("timestamp")
newest_level = newest.get("water_level")
# Convert timestamp strings to datetime if needed
if isinstance(oldest_time, str):
oldest_time = datetime.datetime.fromisoformat(oldest_time)
if isinstance(newest_time, str):
newest_time = datetime.datetime.fromisoformat(newest_time)
# Calculate rate of change
time_diff_hours = (newest_time - oldest_time).total_seconds() / 3600
if time_diff_hours == 0:
continue
level_change = newest_level - oldest_level
rate_per_hour = level_change / time_diff_hours
# Only alert on rising water (positive rate)
if rate_per_hour <= 0:
continue
# Get station info from latest data
station_info = next((m for m in latest if m.get("station_code") == station_code), {})
station_name = station_info.get("station_name_th", station_code)
# Get thresholds for this station
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds["default"])
alert_level = None
threshold_value = None
alert_type = None
if rate_per_hour >= station_rate_threshold["emergency"]:
alert_level = AlertLevel.EMERGENCY
threshold_value = station_rate_threshold["emergency"]
alert_type = "Very Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold["critical"]:
alert_level = AlertLevel.CRITICAL
threshold_value = station_rate_threshold["critical"]
alert_type = "Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold["warning"]:
alert_level = AlertLevel.WARNING
threshold_value = station_rate_threshold["warning"]
alert_type = "Moderate Water Level Rise"
if alert_level:
message = (
f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h "
f"(change: {level_change:+.2f}m)"
)
alert = WaterAlert(
station_code=station_code,
station_name=station_name or f"Station {station_code}",
alert_type=alert_type,
level=alert_level,
water_level=newest_level,
threshold=threshold_value,
timestamp=newest_time,
message=message,
)
alerts.append(alert)
except Exception as station_error:
logger.debug(f"Error checking rate of change for station {station_code}: {station_error}")
continue
except Exception as e:
logger.error(f"Error checking rate of change: {e}")
return alerts
def send_alerts(self, alerts: List[WaterAlert]) -> int:
"""Send alerts via configured channels"""
sent_count = 0
if not alerts:
return sent_count
if self.matrix_notifier:
for alert in alerts:
if self.matrix_notifier.send_alert(alert):
sent_count += 1
# Could add other notification channels here:
# - Email
# - Discord
# - Telegram
# - SMS
return sent_count
def run_alert_check(self) -> Dict[str, int]:
"""Run complete alert check cycle"""
if not self.connect_database():
return {"error": 1}
# Check water levels
water_alerts = self.check_water_levels()
# Check data freshness
data_alerts = self.check_data_freshness()
# Check rate of change (rapid rises)
rate_alerts = self.check_rate_of_change()
# Combine alerts
all_alerts = water_alerts + data_alerts + rate_alerts
# Send alerts
sent_count = self.send_alerts(all_alerts)
logger.info(f"Alert check complete: {len(all_alerts)} alerts, {sent_count} sent")
return {
"water_alerts": len(water_alerts),
"data_alerts": len(data_alerts),
"rate_alerts": len(rate_alerts),
"total_alerts": len(all_alerts),
"sent": sent_count,
}
def main():
"""Standalone alerting check"""
import argparse
parser = argparse.ArgumentParser(description="Water Level Alert System")
parser.add_argument("--check", action="store_true", help="Run alert check")
parser.add_argument("--test", action="store_true", help="Send test message")
args = parser.parse_args()
alerting = WaterLevelAlertSystem()
if args.test:
if alerting.matrix_notifier:
test_message = (
"🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\n"
"If you received this, Matrix notifications are working correctly!"
)
success = alerting.matrix_notifier.send_message(test_message)
print(f"Test message sent: {success}")
else:
print("Matrix notifier not configured")
elif args.check:
results = alerting.run_alert_check()
print(f"Alert check results: {results}")
else:
print("Use --check or --test")
if __name__ == "__main__":
main()