Add comprehensive alerting system tests

- Created test suite for zone-based water level alerts (9 test cases)
- Created test suite for rate-of-change alerts (5 test cases)
- Created combined alert scenario test
- Fixed rate-of-change detection to use station_code instead of station_id
- All 3 test suites passing (14 total test cases)

Test coverage:
  - Zone alerts: P.1 zones 1-8 with INFO/WARNING/CRITICAL/EMERGENCY levels
  - Rate-of-change: 0.15/0.25/0.40 m/h thresholds for WARNING/CRITICAL/EMERGENCY
  - Combined: Simultaneous zone and rate-of-change alert triggering

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-03 16:35:34 +07:00
parent de632cef90
commit cc007f0e0c
2 changed files with 404 additions and 38 deletions

View File

@@ -323,48 +323,32 @@ class WaterLevelAlertSystem:
# Get unique stations from latest data
latest = self.db_adapter.get_latest_measurements(limit=20)
station_ids = set(m.get('station_id') for m in latest if m.get('station_id'))
station_codes = set(m.get('station_code') for m in latest if m.get('station_code'))
for station_id in station_ids:
for station_code in station_codes:
try:
# Get measurements for this station in the time window using database adapter
# Try direct connection for SQLite/PostgreSQL
results = []
# Get measurements for this station in the time window
current_time = datetime.datetime.now()
measurements = self.db_adapter.get_measurements_by_timerange(
start_time=cutoff_time,
end_time=current_time,
station_codes=[station_code]
)
try:
import sqlite3
import psycopg2
# Check if we have a connection object (SQLite or PostgreSQL)
if hasattr(self.db_adapter, 'conn') and self.db_adapter.conn:
# SQLite/PostgreSQL style query
query = """
SELECT timestamp, water_level, station_id
FROM water_measurements
WHERE station_id = ? AND timestamp >= ?
ORDER BY timestamp ASC
"""
cursor = self.db_adapter.conn.cursor()
cursor.execute(query, (station_id, cutoff_time))
results = cursor.fetchall()
except:
# Fallback: use get_latest_measurements and filter
all_measurements = self.db_adapter.get_latest_measurements(limit=500)
results = []
for m in all_measurements:
if m.get('station_id') == station_id and m.get('timestamp') and m.get('timestamp') >= cutoff_time:
results.append((m['timestamp'], m['water_level'], m.get('station_id')))
if len(results) < 2:
if len(measurements) < 2:
continue # Need at least 2 points to calculate rate
# Get oldest and newest measurements
oldest = results[0]
newest = results[-1]
# Sort by timestamp
measurements = sorted(measurements, key=lambda m: m.get('timestamp'))
oldest_time, oldest_level, _ = oldest
newest_time, newest_level, _ = newest
# Get oldest and newest measurements
oldest = measurements[0]
newest = measurements[-1]
oldest_time = oldest.get('timestamp')
oldest_level = oldest.get('water_level')
newest_time = newest.get('timestamp')
newest_level = newest.get('water_level')
# Convert timestamp strings to datetime if needed
if isinstance(oldest_time, str):
@@ -385,8 +369,7 @@ class WaterLevelAlertSystem:
continue
# Get station info from latest data
station_info = next((m for m in latest if m.get('station_id') == station_id), {})
station_code = station_info.get('station_code', f'Station {station_id}')
station_info = next((m for m in latest if m.get('station_code') == station_code), {})
station_name = station_info.get('station_name_th', station_code)
# Get thresholds for this station