From c57e46ae21b3ffca8cb908db054e210830a46e9b Mon Sep 17 00:00:00 2001 From: grabowski Date: Thu, 16 Oct 2025 17:25:27 +0700 Subject: [PATCH] Filter alerts to upstream stations only and add Grafana dashboard link - Alerts now only sent for stations upstream of Chiang Mai: P.20, P.75, P.92, P.4A, P.67, P.21, P.103, and P.1 - Added Grafana dashboard link to all alert messages - Dashboard URL: https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water - Fixed flake8 linting issues (line length, undefined variable, unused import) Co-Authored-By: Claude --- src/alerting.py | 175 ++++++++++++++++++++++++++++-------------------- 1 file changed, 101 insertions(+), 74 deletions(-) diff --git a/src/alerting.py b/src/alerting.py index 305f4d3..f33360e 100644 --- a/src/alerting.py +++ b/src/alerting.py @@ -3,33 +3,38 @@ Water Level Alerting System with Matrix Integration """ -import os -import json -import requests import datetime -from typing import List, Dict, Optional +import os from dataclasses import dataclass from enum import Enum +from typing import Dict, List, Optional + +import requests try: from .config import Config from .database_adapters import create_database_adapter from .logging_config import get_logger except ImportError: + import logging + from config import Config from database_adapters import create_database_adapter - import logging + def get_logger(name): return logging.getLogger(name) + logger = get_logger(__name__) + class AlertLevel(Enum): INFO = "info" WARNING = "warning" CRITICAL = "critical" EMERGENCY = "emergency" + @dataclass class WaterAlert: station_code: str @@ -42,9 +47,10 @@ class WaterAlert: timestamp: Optional[datetime.datetime] = None message: Optional[str] = None + class MatrixNotifier: def __init__(self, homeserver: str, access_token: str, room_id: str): - self.homeserver = homeserver.rstrip('/') + self.homeserver = homeserver.rstrip("/") self.access_token = access_token self.room_id = room_id self.session = requests.Session() @@ -56,15 +62,9 @@ class MatrixNotifier: txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}" - headers = { - "Authorization": f"Bearer {self.access_token}", - "Content-Type": "application/json" - } + headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json"} - data = { - "msgtype": msgtype, - "body": message - } + data = {"msgtype": msgtype, "body": message} # Matrix API requires PUT when transaction ID is in the URL path response = self.session.put(url, headers=headers, json=data, timeout=10) @@ -83,7 +83,7 @@ class MatrixNotifier: AlertLevel.INFO: "โ„น๏ธ", AlertLevel.WARNING: "โš ๏ธ", AlertLevel.CRITICAL: "๐Ÿšจ", - AlertLevel.EMERGENCY: "๐Ÿ†˜" + AlertLevel.EMERGENCY: "๐Ÿ†˜", } emoji = emoji_map.get(alert.level, "๐Ÿ“Š") @@ -108,20 +108,38 @@ class MatrixNotifier: if alert.message: message += f"\n**Details:** {alert.message}\n" - message += f"\n๐Ÿ“ˆ View dashboard: {os.getenv('GRAFANA_URL', 'http://localhost:3000')}" + # Add Grafana dashboard link + grafana_url = ( + "https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water" + "?orgId=1&from=now-30d&to=now&timezone=browser" + ) + message += f"\n๐Ÿ“ˆ **View Dashboard:** {grafana_url}" return self.send_message(message) + class WaterLevelAlertSystem: + # Stations upstream of Chiang Mai (and CNX itself) to monitor + UPSTREAM_STATIONS = { + "P.20", # Ban Chiang Dao + "P.75", # Ban Chai Lat + "P.92", # Ban Muang Aut + "P.4A", # Ban Mae Taeng + "P.67", # Ban Tae + "P.21", # Ban Rim Tai + "P.103", # Ring Bridge 3 + "P.1", # Nawarat Bridge (Chiang Mai) + } + def __init__(self): self.db_adapter = None self.matrix_notifier = None self.thresholds = self._load_thresholds() # Matrix configuration from environment - matrix_homeserver = os.getenv('MATRIX_HOMESERVER', 'https://matrix.org') - matrix_token = os.getenv('MATRIX_ACCESS_TOKEN') - matrix_room = os.getenv('MATRIX_ROOM_ID') + matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org") + matrix_token = os.getenv("MATRIX_ACCESS_TOKEN") + matrix_room = os.getenv("MATRIX_ROOM_ID") if matrix_token and matrix_room: self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room) @@ -147,7 +165,7 @@ class WaterLevelAlertSystem: # Keep legacy thresholds for compatibility "warning": 3.7, "critical": 4.3, - "emergency": 4.8 + "emergency": 4.8, }, "P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5}, "P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0}, @@ -156,7 +174,7 @@ class WaterLevelAlertSystem: "P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5}, "P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0}, # Default for unknown stations - "default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0} + "default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0}, } def connect_database(self): @@ -164,8 +182,7 @@ class WaterLevelAlertSystem: try: db_config = Config.get_database_config() self.db_adapter = create_database_adapter( - db_config['type'], - connection_string=db_config['connection_string'] + db_config["type"], connection_string=db_config["connection_string"] ) if self.db_adapter.connect(): @@ -192,14 +209,18 @@ class WaterLevelAlertSystem: measurements = self.db_adapter.get_latest_measurements(limit=50) for measurement in measurements: - station_code = measurement.get('station_code', 'UNKNOWN') - water_level = measurement.get('water_level') + station_code = measurement.get("station_code", "UNKNOWN") + water_level = measurement.get("water_level") if not water_level: continue + # Only alert for upstream stations and Chiang Mai + if station_code not in self.UPSTREAM_STATIONS: + continue + # Get thresholds for this station - station_thresholds = self.thresholds.get(station_code, self.thresholds['default']) + station_thresholds = self.thresholds.get(station_code, self.thresholds["default"]) # Check each threshold level alert_level = None @@ -207,7 +228,7 @@ class WaterLevelAlertSystem: alert_type = None # Special handling for P.1 with zone-based thresholds - if station_code == "P.1" and 'zone_1' in station_thresholds: + if station_code == "P.1" and "zone_1" in station_thresholds: # Check all zones in reverse order (highest to lowest) zones = [ ("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"), @@ -230,29 +251,29 @@ class WaterLevelAlertSystem: else: # Standard threshold checking for other stations - if water_level >= station_thresholds.get('emergency', float('inf')): + if water_level >= station_thresholds.get("emergency", float("inf")): alert_level = AlertLevel.EMERGENCY - threshold_value = station_thresholds['emergency'] + threshold_value = station_thresholds["emergency"] alert_type = "Emergency Water Level" - elif water_level >= station_thresholds.get('critical', float('inf')): + elif water_level >= station_thresholds.get("critical", float("inf")): alert_level = AlertLevel.CRITICAL - threshold_value = station_thresholds['critical'] + threshold_value = station_thresholds["critical"] alert_type = "Critical Water Level" - elif water_level >= station_thresholds.get('warning', float('inf')): + elif water_level >= station_thresholds.get("warning", float("inf")): alert_level = AlertLevel.WARNING - threshold_value = station_thresholds['warning'] + threshold_value = station_thresholds["warning"] alert_type = "High Water Level" if alert_level: alert = WaterAlert( station_code=station_code, - station_name=measurement.get('station_name_th', f'Station {station_code}'), + station_name=measurement.get("station_name_th", f"Station {station_code}"), alert_type=alert_type, level=alert_level, water_level=water_level, threshold=threshold_value, - discharge=measurement.get('discharge'), - timestamp=measurement.get('timestamp') + discharge=measurement.get("discharge"), + timestamp=measurement.get("timestamp"), ) alerts.append(alert) @@ -273,21 +294,21 @@ class WaterLevelAlertSystem: cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours) for measurement in measurements: - timestamp = measurement.get('timestamp') + timestamp = measurement.get("timestamp") if timestamp and timestamp < cutoff_time: - station_code = measurement.get('station_code', 'UNKNOWN') + station_code = measurement.get("station_code", "UNKNOWN") age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600 alert = WaterAlert( station_code=station_code, - station_name=measurement.get('station_name_th', f'Station {station_code}'), + station_name=measurement.get("station_name_th", f"Station {station_code}"), alert_type="Stale Data", level=AlertLevel.WARNING, - water_level=measurement.get('water_level', 0), + water_level=measurement.get("water_level", 0), threshold=max_age_hours, timestamp=timestamp, - message=f"No fresh data for {age_hours:.1f} hours" + message=f"No fresh data for {age_hours:.1f} hours", ) alerts.append(alert) @@ -307,15 +328,11 @@ class WaterLevelAlertSystem: # Define rate-of-change thresholds (meters per hour) rate_thresholds = { "P.1": { - "warning": 0.15, # 15cm/hour - moderate rise - "critical": 0.25, # 25cm/hour - rapid rise - "emergency": 0.40 # 40cm/hour - very rapid rise + "warning": 0.15, # 15cm/hour - moderate rise + "critical": 0.25, # 25cm/hour - rapid rise + "emergency": 0.40, # 40cm/hour - very rapid rise }, - "default": { - "warning": 0.20, - "critical": 0.35, - "emergency": 0.50 - } + "default": {"warning": 0.20, "critical": 0.35, "emergency": 0.50}, } # Get recent measurements for each station @@ -323,32 +340,34 @@ class WaterLevelAlertSystem: # Get unique stations from latest data latest = self.db_adapter.get_latest_measurements(limit=20) - station_codes = set(m.get('station_code') for m in latest if m.get('station_code')) + station_codes = set(m.get("station_code") for m in latest if m.get("station_code")) for station_code in station_codes: try: + # Only alert for upstream stations and Chiang Mai + if station_code not in self.UPSTREAM_STATIONS: + continue + # Get measurements for this station in the time window current_time = datetime.datetime.now() measurements = self.db_adapter.get_measurements_by_timerange( - start_time=cutoff_time, - end_time=current_time, - station_codes=[station_code] + start_time=cutoff_time, end_time=current_time, station_codes=[station_code] ) if len(measurements) < 2: continue # Need at least 2 points to calculate rate # Sort by timestamp - measurements = sorted(measurements, key=lambda m: m.get('timestamp')) + measurements = sorted(measurements, key=lambda m: m.get("timestamp")) # Get oldest and newest measurements oldest = measurements[0] newest = measurements[-1] - oldest_time = oldest.get('timestamp') - oldest_level = oldest.get('water_level') - newest_time = newest.get('timestamp') - newest_level = newest.get('water_level') + oldest_time = oldest.get("timestamp") + oldest_level = oldest.get("water_level") + newest_time = newest.get("timestamp") + newest_level = newest.get("water_level") # Convert timestamp strings to datetime if needed if isinstance(oldest_time, str): @@ -369,46 +388,49 @@ class WaterLevelAlertSystem: continue # Get station info from latest data - station_info = next((m for m in latest if m.get('station_code') == station_code), {}) - station_name = station_info.get('station_name_th', station_code) + station_info = next((m for m in latest if m.get("station_code") == station_code), {}) + station_name = station_info.get("station_name_th", station_code) # Get thresholds for this station - station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds['default']) + station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds["default"]) alert_level = None threshold_value = None alert_type = None - if rate_per_hour >= station_rate_threshold['emergency']: + if rate_per_hour >= station_rate_threshold["emergency"]: alert_level = AlertLevel.EMERGENCY - threshold_value = station_rate_threshold['emergency'] + threshold_value = station_rate_threshold["emergency"] alert_type = "Very Rapid Water Level Rise" - elif rate_per_hour >= station_rate_threshold['critical']: + elif rate_per_hour >= station_rate_threshold["critical"]: alert_level = AlertLevel.CRITICAL - threshold_value = station_rate_threshold['critical'] + threshold_value = station_rate_threshold["critical"] alert_type = "Rapid Water Level Rise" - elif rate_per_hour >= station_rate_threshold['warning']: + elif rate_per_hour >= station_rate_threshold["warning"]: alert_level = AlertLevel.WARNING - threshold_value = station_rate_threshold['warning'] + threshold_value = station_rate_threshold["warning"] alert_type = "Moderate Water Level Rise" if alert_level: - message = f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h (change: {level_change:+.2f}m)" + message = ( + f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h " + f"(change: {level_change:+.2f}m)" + ) alert = WaterAlert( station_code=station_code, - station_name=station_name or f'Station {station_code}', + station_name=station_name or f"Station {station_code}", alert_type=alert_type, level=alert_level, water_level=newest_level, threshold=threshold_value, timestamp=newest_time, - message=message + message=message, ) alerts.append(alert) except Exception as station_error: - logger.debug(f"Error checking rate of change for station {station_id}: {station_error}") + logger.debug(f"Error checking rate of change for station {station_code}: {station_error}") continue except Exception as e: @@ -463,9 +485,10 @@ class WaterLevelAlertSystem: "data_alerts": len(data_alerts), "rate_alerts": len(rate_alerts), "total_alerts": len(all_alerts), - "sent": sent_count + "sent": sent_count, } + def main(): """Standalone alerting check""" import argparse @@ -479,7 +502,10 @@ def main(): if args.test: if alerting.matrix_notifier: - test_message = "๐Ÿงช **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\nIf you received this, Matrix notifications are working correctly!" + test_message = ( + "๐Ÿงช **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\n" + "If you received this, Matrix notifications are working correctly!" + ) success = alerting.matrix_notifier.send_message(test_message) print(f"Test message sent: {success}") else: @@ -492,5 +518,6 @@ def main(): else: print("Use --check or --test") + if __name__ == "__main__": - main() \ No newline at end of file + main()