Filter alerts to upstream stations only and add Grafana dashboard link
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled

- Alerts now only sent for stations upstream of Chiang Mai:
  P.20, P.75, P.92, P.4A, P.67, P.21, P.103, and P.1
- Added Grafana dashboard link to all alert messages
- Dashboard URL: https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water
- Fixed flake8 linting issues (line length, undefined variable, unused import)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-16 17:25:27 +07:00
parent 6a76a88f32
commit c57e46ae21

View File

@@ -3,33 +3,38 @@
Water Level Alerting System with Matrix Integration
"""
import os
import json
import requests
import datetime
from typing import List, Dict, Optional
import os
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import requests
try:
from .config import Config
from .database_adapters import create_database_adapter
from .logging_config import get_logger
except ImportError:
import logging
from config import Config
from database_adapters import create_database_adapter
import logging
def get_logger(name):
return logging.getLogger(name)
logger = get_logger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class WaterAlert:
station_code: str
@@ -42,9 +47,10 @@ class WaterAlert:
timestamp: Optional[datetime.datetime] = None
message: Optional[str] = None
class MatrixNotifier:
def __init__(self, homeserver: str, access_token: str, room_id: str):
self.homeserver = homeserver.rstrip('/')
self.homeserver = homeserver.rstrip("/")
self.access_token = access_token
self.room_id = room_id
self.session = requests.Session()
@@ -56,15 +62,9 @@ class MatrixNotifier:
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json"}
data = {
"msgtype": msgtype,
"body": message
}
data = {"msgtype": msgtype, "body": message}
# Matrix API requires PUT when transaction ID is in the URL path
response = self.session.put(url, headers=headers, json=data, timeout=10)
@@ -83,7 +83,7 @@ class MatrixNotifier:
AlertLevel.INFO: "",
AlertLevel.WARNING: "⚠️",
AlertLevel.CRITICAL: "🚨",
AlertLevel.EMERGENCY: "🆘"
AlertLevel.EMERGENCY: "🆘",
}
emoji = emoji_map.get(alert.level, "📊")
@@ -108,20 +108,38 @@ class MatrixNotifier:
if alert.message:
message += f"\n**Details:** {alert.message}\n"
message += f"\n📈 View dashboard: {os.getenv('GRAFANA_URL', 'http://localhost:3000')}"
# Add Grafana dashboard link
grafana_url = (
"https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water"
"?orgId=1&from=now-30d&to=now&timezone=browser"
)
message += f"\n📈 **View Dashboard:** {grafana_url}"
return self.send_message(message)
class WaterLevelAlertSystem:
# Stations upstream of Chiang Mai (and CNX itself) to monitor
UPSTREAM_STATIONS = {
"P.20", # Ban Chiang Dao
"P.75", # Ban Chai Lat
"P.92", # Ban Muang Aut
"P.4A", # Ban Mae Taeng
"P.67", # Ban Tae
"P.21", # Ban Rim Tai
"P.103", # Ring Bridge 3
"P.1", # Nawarat Bridge (Chiang Mai)
}
def __init__(self):
self.db_adapter = None
self.matrix_notifier = None
self.thresholds = self._load_thresholds()
# Matrix configuration from environment
matrix_homeserver = os.getenv('MATRIX_HOMESERVER', 'https://matrix.org')
matrix_token = os.getenv('MATRIX_ACCESS_TOKEN')
matrix_room = os.getenv('MATRIX_ROOM_ID')
matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
matrix_token = os.getenv("MATRIX_ACCESS_TOKEN")
matrix_room = os.getenv("MATRIX_ROOM_ID")
if matrix_token and matrix_room:
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
@@ -147,7 +165,7 @@ class WaterLevelAlertSystem:
# Keep legacy thresholds for compatibility
"warning": 3.7,
"critical": 4.3,
"emergency": 4.8
"emergency": 4.8,
},
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
@@ -156,7 +174,7 @@ class WaterLevelAlertSystem:
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
# Default for unknown stations
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0}
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0},
}
def connect_database(self):
@@ -164,8 +182,7 @@ class WaterLevelAlertSystem:
try:
db_config = Config.get_database_config()
self.db_adapter = create_database_adapter(
db_config['type'],
connection_string=db_config['connection_string']
db_config["type"], connection_string=db_config["connection_string"]
)
if self.db_adapter.connect():
@@ -192,14 +209,18 @@ class WaterLevelAlertSystem:
measurements = self.db_adapter.get_latest_measurements(limit=50)
for measurement in measurements:
station_code = measurement.get('station_code', 'UNKNOWN')
water_level = measurement.get('water_level')
station_code = measurement.get("station_code", "UNKNOWN")
water_level = measurement.get("water_level")
if not water_level:
continue
# Only alert for upstream stations and Chiang Mai
if station_code not in self.UPSTREAM_STATIONS:
continue
# Get thresholds for this station
station_thresholds = self.thresholds.get(station_code, self.thresholds['default'])
station_thresholds = self.thresholds.get(station_code, self.thresholds["default"])
# Check each threshold level
alert_level = None
@@ -207,7 +228,7 @@ class WaterLevelAlertSystem:
alert_type = None
# Special handling for P.1 with zone-based thresholds
if station_code == "P.1" and 'zone_1' in station_thresholds:
if station_code == "P.1" and "zone_1" in station_thresholds:
# Check all zones in reverse order (highest to lowest)
zones = [
("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"),
@@ -230,29 +251,29 @@ class WaterLevelAlertSystem:
else:
# Standard threshold checking for other stations
if water_level >= station_thresholds.get('emergency', float('inf')):
if water_level >= station_thresholds.get("emergency", float("inf")):
alert_level = AlertLevel.EMERGENCY
threshold_value = station_thresholds['emergency']
threshold_value = station_thresholds["emergency"]
alert_type = "Emergency Water Level"
elif water_level >= station_thresholds.get('critical', float('inf')):
elif water_level >= station_thresholds.get("critical", float("inf")):
alert_level = AlertLevel.CRITICAL
threshold_value = station_thresholds['critical']
threshold_value = station_thresholds["critical"]
alert_type = "Critical Water Level"
elif water_level >= station_thresholds.get('warning', float('inf')):
elif water_level >= station_thresholds.get("warning", float("inf")):
alert_level = AlertLevel.WARNING
threshold_value = station_thresholds['warning']
threshold_value = station_thresholds["warning"]
alert_type = "High Water Level"
if alert_level:
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
station_name=measurement.get("station_name_th", f"Station {station_code}"),
alert_type=alert_type,
level=alert_level,
water_level=water_level,
threshold=threshold_value,
discharge=measurement.get('discharge'),
timestamp=measurement.get('timestamp')
discharge=measurement.get("discharge"),
timestamp=measurement.get("timestamp"),
)
alerts.append(alert)
@@ -273,21 +294,21 @@ class WaterLevelAlertSystem:
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
for measurement in measurements:
timestamp = measurement.get('timestamp')
timestamp = measurement.get("timestamp")
if timestamp and timestamp < cutoff_time:
station_code = measurement.get('station_code', 'UNKNOWN')
station_code = measurement.get("station_code", "UNKNOWN")
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
station_name=measurement.get("station_name_th", f"Station {station_code}"),
alert_type="Stale Data",
level=AlertLevel.WARNING,
water_level=measurement.get('water_level', 0),
water_level=measurement.get("water_level", 0),
threshold=max_age_hours,
timestamp=timestamp,
message=f"No fresh data for {age_hours:.1f} hours"
message=f"No fresh data for {age_hours:.1f} hours",
)
alerts.append(alert)
@@ -307,15 +328,11 @@ class WaterLevelAlertSystem:
# Define rate-of-change thresholds (meters per hour)
rate_thresholds = {
"P.1": {
"warning": 0.15, # 15cm/hour - moderate rise
"critical": 0.25, # 25cm/hour - rapid rise
"emergency": 0.40 # 40cm/hour - very rapid rise
"warning": 0.15, # 15cm/hour - moderate rise
"critical": 0.25, # 25cm/hour - rapid rise
"emergency": 0.40, # 40cm/hour - very rapid rise
},
"default": {
"warning": 0.20,
"critical": 0.35,
"emergency": 0.50
}
"default": {"warning": 0.20, "critical": 0.35, "emergency": 0.50},
}
# Get recent measurements for each station
@@ -323,32 +340,34 @@ class WaterLevelAlertSystem:
# Get unique stations from latest data
latest = self.db_adapter.get_latest_measurements(limit=20)
station_codes = set(m.get('station_code') for m in latest if m.get('station_code'))
station_codes = set(m.get("station_code") for m in latest if m.get("station_code"))
for station_code in station_codes:
try:
# Only alert for upstream stations and Chiang Mai
if station_code not in self.UPSTREAM_STATIONS:
continue
# Get measurements for this station in the time window
current_time = datetime.datetime.now()
measurements = self.db_adapter.get_measurements_by_timerange(
start_time=cutoff_time,
end_time=current_time,
station_codes=[station_code]
start_time=cutoff_time, end_time=current_time, station_codes=[station_code]
)
if len(measurements) < 2:
continue # Need at least 2 points to calculate rate
# Sort by timestamp
measurements = sorted(measurements, key=lambda m: m.get('timestamp'))
measurements = sorted(measurements, key=lambda m: m.get("timestamp"))
# Get oldest and newest measurements
oldest = measurements[0]
newest = measurements[-1]
oldest_time = oldest.get('timestamp')
oldest_level = oldest.get('water_level')
newest_time = newest.get('timestamp')
newest_level = newest.get('water_level')
oldest_time = oldest.get("timestamp")
oldest_level = oldest.get("water_level")
newest_time = newest.get("timestamp")
newest_level = newest.get("water_level")
# Convert timestamp strings to datetime if needed
if isinstance(oldest_time, str):
@@ -369,46 +388,49 @@ class WaterLevelAlertSystem:
continue
# Get station info from latest data
station_info = next((m for m in latest if m.get('station_code') == station_code), {})
station_name = station_info.get('station_name_th', station_code)
station_info = next((m for m in latest if m.get("station_code") == station_code), {})
station_name = station_info.get("station_name_th", station_code)
# Get thresholds for this station
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds['default'])
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds["default"])
alert_level = None
threshold_value = None
alert_type = None
if rate_per_hour >= station_rate_threshold['emergency']:
if rate_per_hour >= station_rate_threshold["emergency"]:
alert_level = AlertLevel.EMERGENCY
threshold_value = station_rate_threshold['emergency']
threshold_value = station_rate_threshold["emergency"]
alert_type = "Very Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold['critical']:
elif rate_per_hour >= station_rate_threshold["critical"]:
alert_level = AlertLevel.CRITICAL
threshold_value = station_rate_threshold['critical']
threshold_value = station_rate_threshold["critical"]
alert_type = "Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold['warning']:
elif rate_per_hour >= station_rate_threshold["warning"]:
alert_level = AlertLevel.WARNING
threshold_value = station_rate_threshold['warning']
threshold_value = station_rate_threshold["warning"]
alert_type = "Moderate Water Level Rise"
if alert_level:
message = f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h (change: {level_change:+.2f}m)"
message = (
f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h "
f"(change: {level_change:+.2f}m)"
)
alert = WaterAlert(
station_code=station_code,
station_name=station_name or f'Station {station_code}',
station_name=station_name or f"Station {station_code}",
alert_type=alert_type,
level=alert_level,
water_level=newest_level,
threshold=threshold_value,
timestamp=newest_time,
message=message
message=message,
)
alerts.append(alert)
except Exception as station_error:
logger.debug(f"Error checking rate of change for station {station_id}: {station_error}")
logger.debug(f"Error checking rate of change for station {station_code}: {station_error}")
continue
except Exception as e:
@@ -463,9 +485,10 @@ class WaterLevelAlertSystem:
"data_alerts": len(data_alerts),
"rate_alerts": len(rate_alerts),
"total_alerts": len(all_alerts),
"sent": sent_count
"sent": sent_count,
}
def main():
"""Standalone alerting check"""
import argparse
@@ -479,7 +502,10 @@ def main():
if args.test:
if alerting.matrix_notifier:
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\nIf you received this, Matrix notifications are working correctly!"
test_message = (
"🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\n"
"If you received this, Matrix notifications are working correctly!"
)
success = alerting.matrix_notifier.send_message(test_message)
print(f"Test message sent: {success}")
else:
@@ -492,5 +518,6 @@ def main():
else:
print("Use --check or --test")
if __name__ == "__main__":
main()
main()