Add rate-of-change alerting for sudden water level increases
- Implement check_rate_of_change() to detect rapid water level rises - Monitor water level changes over configurable lookback period (default 3 hours) - Define rate-of-change thresholds for P.1 and other stations - Alert on moderate (15cm/h), rapid (25cm/h), and very rapid (40cm/h) rises - Only alert on rising water levels (positive rate of change) - Integrate rate-of-change checks into run_alert_check() cycle - Support both SQLite and PostgreSQL database adapters with fallback Rate thresholds for P.1 (Nawarat Bridge): - Warning: 0.15 m/h (15 cm/hour) - moderate rise - Critical: 0.25 m/h (25 cm/hour) - rapid rise - Emergency: 0.40 m/h (40 cm/hour) - very rapid rise Default thresholds for other stations: - Warning: 0.20 m/h, Critical: 0.35 m/h, Emergency: 0.50 m/h Alert messages include: - Rate of change in m/h and cm/h - Total level change over period - Time period analyzed This early warning system detects dangerous trends before absolute thresholds are reached, allowing for earlier response to flooding events. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
143
src/alerting.py
143
src/alerting.py
@@ -296,6 +296,143 @@ class WaterLevelAlertSystem:
|
||||
|
||||
return alerts
|
||||
|
||||
def check_rate_of_change(self, lookback_hours: int = 3) -> List[WaterAlert]:
|
||||
"""Check for rapid water level changes over recent hours"""
|
||||
alerts = []
|
||||
|
||||
if not self.db_adapter:
|
||||
return alerts
|
||||
|
||||
try:
|
||||
# Define rate-of-change thresholds (meters per hour)
|
||||
rate_thresholds = {
|
||||
"P.1": {
|
||||
"warning": 0.15, # 15cm/hour - moderate rise
|
||||
"critical": 0.25, # 25cm/hour - rapid rise
|
||||
"emergency": 0.40 # 40cm/hour - very rapid rise
|
||||
},
|
||||
"default": {
|
||||
"warning": 0.20,
|
||||
"critical": 0.35,
|
||||
"emergency": 0.50
|
||||
}
|
||||
}
|
||||
|
||||
# Get recent measurements for each station
|
||||
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=lookback_hours)
|
||||
|
||||
# Get unique stations from latest data
|
||||
latest = self.db_adapter.get_latest_measurements(limit=20)
|
||||
station_ids = set(m.get('station_id') for m in latest if m.get('station_id'))
|
||||
|
||||
for station_id in station_ids:
|
||||
try:
|
||||
# Get measurements for this station in the time window using database adapter
|
||||
# Try direct connection for SQLite/PostgreSQL
|
||||
results = []
|
||||
|
||||
try:
|
||||
import sqlite3
|
||||
import psycopg2
|
||||
|
||||
# Check if we have a connection object (SQLite or PostgreSQL)
|
||||
if hasattr(self.db_adapter, 'conn') and self.db_adapter.conn:
|
||||
# SQLite/PostgreSQL style query
|
||||
query = """
|
||||
SELECT timestamp, water_level, station_id
|
||||
FROM water_measurements
|
||||
WHERE station_id = ? AND timestamp >= ?
|
||||
ORDER BY timestamp ASC
|
||||
"""
|
||||
|
||||
cursor = self.db_adapter.conn.cursor()
|
||||
cursor.execute(query, (station_id, cutoff_time))
|
||||
results = cursor.fetchall()
|
||||
except:
|
||||
# Fallback: use get_latest_measurements and filter
|
||||
all_measurements = self.db_adapter.get_latest_measurements(limit=500)
|
||||
results = []
|
||||
for m in all_measurements:
|
||||
if m.get('station_id') == station_id and m.get('timestamp') and m.get('timestamp') >= cutoff_time:
|
||||
results.append((m['timestamp'], m['water_level'], m.get('station_id')))
|
||||
|
||||
if len(results) < 2:
|
||||
continue # Need at least 2 points to calculate rate
|
||||
|
||||
# Get oldest and newest measurements
|
||||
oldest = results[0]
|
||||
newest = results[-1]
|
||||
|
||||
oldest_time, oldest_level, _ = oldest
|
||||
newest_time, newest_level, _ = newest
|
||||
|
||||
# Convert timestamp strings to datetime if needed
|
||||
if isinstance(oldest_time, str):
|
||||
oldest_time = datetime.datetime.fromisoformat(oldest_time)
|
||||
if isinstance(newest_time, str):
|
||||
newest_time = datetime.datetime.fromisoformat(newest_time)
|
||||
|
||||
# Calculate rate of change
|
||||
time_diff_hours = (newest_time - oldest_time).total_seconds() / 3600
|
||||
if time_diff_hours == 0:
|
||||
continue
|
||||
|
||||
level_change = newest_level - oldest_level
|
||||
rate_per_hour = level_change / time_diff_hours
|
||||
|
||||
# Only alert on rising water (positive rate)
|
||||
if rate_per_hour <= 0:
|
||||
continue
|
||||
|
||||
# Get station info from latest data
|
||||
station_info = next((m for m in latest if m.get('station_id') == station_id), {})
|
||||
station_code = station_info.get('station_code', f'Station {station_id}')
|
||||
station_name = station_info.get('station_name_th', station_code)
|
||||
|
||||
# Get thresholds for this station
|
||||
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds['default'])
|
||||
|
||||
alert_level = None
|
||||
threshold_value = None
|
||||
alert_type = None
|
||||
|
||||
if rate_per_hour >= station_rate_threshold['emergency']:
|
||||
alert_level = AlertLevel.EMERGENCY
|
||||
threshold_value = station_rate_threshold['emergency']
|
||||
alert_type = "Very Rapid Water Level Rise"
|
||||
elif rate_per_hour >= station_rate_threshold['critical']:
|
||||
alert_level = AlertLevel.CRITICAL
|
||||
threshold_value = station_rate_threshold['critical']
|
||||
alert_type = "Rapid Water Level Rise"
|
||||
elif rate_per_hour >= station_rate_threshold['warning']:
|
||||
alert_level = AlertLevel.WARNING
|
||||
threshold_value = station_rate_threshold['warning']
|
||||
alert_type = "Moderate Water Level Rise"
|
||||
|
||||
if alert_level:
|
||||
message = f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h (change: {level_change:+.2f}m)"
|
||||
|
||||
alert = WaterAlert(
|
||||
station_code=station_code,
|
||||
station_name=station_name or f'Station {station_code}',
|
||||
alert_type=alert_type,
|
||||
level=alert_level,
|
||||
water_level=newest_level,
|
||||
threshold=threshold_value,
|
||||
timestamp=newest_time,
|
||||
message=message
|
||||
)
|
||||
alerts.append(alert)
|
||||
|
||||
except Exception as station_error:
|
||||
logger.debug(f"Error checking rate of change for station {station_id}: {station_error}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking rate of change: {e}")
|
||||
|
||||
return alerts
|
||||
|
||||
def send_alerts(self, alerts: List[WaterAlert]) -> int:
|
||||
"""Send alerts via configured channels"""
|
||||
sent_count = 0
|
||||
@@ -327,8 +464,11 @@ class WaterLevelAlertSystem:
|
||||
# Check data freshness
|
||||
data_alerts = self.check_data_freshness()
|
||||
|
||||
# Check rate of change (rapid rises)
|
||||
rate_alerts = self.check_rate_of_change()
|
||||
|
||||
# Combine alerts
|
||||
all_alerts = water_alerts + data_alerts
|
||||
all_alerts = water_alerts + data_alerts + rate_alerts
|
||||
|
||||
# Send alerts
|
||||
sent_count = self.send_alerts(all_alerts)
|
||||
@@ -338,6 +478,7 @@ class WaterLevelAlertSystem:
|
||||
return {
|
||||
"water_alerts": len(water_alerts),
|
||||
"data_alerts": len(data_alerts),
|
||||
"rate_alerts": len(rate_alerts),
|
||||
"total_alerts": len(all_alerts),
|
||||
"sent": sent_count
|
||||
}
|
||||
|
Reference in New Issue
Block a user