2 Commits

Author SHA1 Message Date
58cc60ba19 Integrate automatic alerting into continuous monitoring
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Alerts now run automatically after every successful new data fetch
- Works for both hourly fetches and retry mode exits
- Alert check runs when fresh data is saved to database
- Logs alert results (total generated and sent count)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:44:57 +07:00
cc007f0e0c Add comprehensive alerting system tests
- Created test suite for zone-based water level alerts (9 test cases)
- Created test suite for rate-of-change alerts (5 test cases)
- Created combined alert scenario test
- Fixed rate-of-change detection to use station_code instead of station_id
- All 3 test suites passing (14 total test cases)

Test coverage:
  - Zone alerts: P.1 zones 1-8 with INFO/WARNING/CRITICAL/EMERGENCY levels
  - Rate-of-change: 0.15/0.25/0.40 m/h thresholds for WARNING/CRITICAL/EMERGENCY
  - Combined: Simultaneous zone and rate-of-change alert triggering

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:35:34 +07:00
3 changed files with 420 additions and 39 deletions

View File

@@ -323,48 +323,32 @@ class WaterLevelAlertSystem:
# Get unique stations from latest data
latest = self.db_adapter.get_latest_measurements(limit=20)
station_ids = set(m.get('station_id') for m in latest if m.get('station_id'))
station_codes = set(m.get('station_code') for m in latest if m.get('station_code'))
for station_id in station_ids:
for station_code in station_codes:
try:
# Get measurements for this station in the time window using database adapter
# Try direct connection for SQLite/PostgreSQL
results = []
# Get measurements for this station in the time window
current_time = datetime.datetime.now()
measurements = self.db_adapter.get_measurements_by_timerange(
start_time=cutoff_time,
end_time=current_time,
station_codes=[station_code]
)
try:
import sqlite3
import psycopg2
# Check if we have a connection object (SQLite or PostgreSQL)
if hasattr(self.db_adapter, 'conn') and self.db_adapter.conn:
# SQLite/PostgreSQL style query
query = """
SELECT timestamp, water_level, station_id
FROM water_measurements
WHERE station_id = ? AND timestamp >= ?
ORDER BY timestamp ASC
"""
cursor = self.db_adapter.conn.cursor()
cursor.execute(query, (station_id, cutoff_time))
results = cursor.fetchall()
except:
# Fallback: use get_latest_measurements and filter
all_measurements = self.db_adapter.get_latest_measurements(limit=500)
results = []
for m in all_measurements:
if m.get('station_id') == station_id and m.get('timestamp') and m.get('timestamp') >= cutoff_time:
results.append((m['timestamp'], m['water_level'], m.get('station_id')))
if len(results) < 2:
if len(measurements) < 2:
continue # Need at least 2 points to calculate rate
# Get oldest and newest measurements
oldest = results[0]
newest = results[-1]
# Sort by timestamp
measurements = sorted(measurements, key=lambda m: m.get('timestamp'))
oldest_time, oldest_level, _ = oldest
newest_time, newest_level, _ = newest
# Get oldest and newest measurements
oldest = measurements[0]
newest = measurements[-1]
oldest_time = oldest.get('timestamp')
oldest_level = oldest.get('water_level')
newest_time = newest.get('timestamp')
newest_level = newest.get('water_level')
# Convert timestamp strings to datetime if needed
if isinstance(oldest_time, str):
@@ -385,8 +369,7 @@ class WaterLevelAlertSystem:
continue
# Get station info from latest data
station_info = next((m for m in latest if m.get('station_id') == station_id), {})
station_code = station_info.get('station_code', f'Station {station_id}')
station_info = next((m for m in latest if m.get('station_code') == station_code), {})
station_name = station_info.get('station_name_th', station_code)
# Get thresholds for this station

View File

@@ -64,7 +64,7 @@ def run_test_cycle():
return False
def run_continuous_monitoring():
"""Run continuous monitoring with adaptive scheduling"""
"""Run continuous monitoring with adaptive scheduling and alerting"""
logger.info("Starting continuous monitoring...")
try:
@@ -75,11 +75,16 @@ def run_continuous_monitoring():
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Initialize alerting system
from .alerting import WaterLevelAlertSystem
alerting = WaterLevelAlertSystem()
# Setup signal handlers
setup_signal_handlers(scraper)
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
logger.info("Alerts: automatic check after each successful data fetch")
logger.info("Press Ctrl+C to stop")
# Run initial cycle
@@ -109,6 +114,16 @@ def run_continuous_monitoring():
if success:
last_successful_fetch = current_time
# Run alert check after every successful new data fetch
logger.info("Running alert check...")
try:
alert_results = alerting.run_alert_check()
if alert_results.get('total_alerts', 0) > 0:
logger.info(f"Alerts: {alert_results['total_alerts']} generated, {alert_results['sent']} sent")
except Exception as e:
logger.error(f"Alert check failed: {e}")
if retry_mode:
logger.info("✅ Data fetch successful - switching back to hourly schedule")
retry_mode = False

383
tests/test_alerting.py Normal file
View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
Comprehensive tests for the alerting system
Tests both zone-based and rate-of-change alerts
"""
import sys
import os
import datetime
import sqlite3
import time
import gc
# Add src directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.alerting import WaterLevelAlertSystem, AlertLevel
from src.database_adapters import create_database_adapter
def setup_test_database(test_name='default'):
"""Create a test database with sample data"""
db_path = f'test_alerts_{test_name}.db'
# Remove existing test database
if os.path.exists(db_path):
try:
os.remove(db_path)
except PermissionError:
# If locked, use a different name with timestamp
import random
db_path = f'test_alerts_{test_name}_{random.randint(1000, 9999)}.db'
# Create new database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create stations table
cursor.execute("""
CREATE TABLE stations (
id INTEGER PRIMARY KEY,
station_code TEXT NOT NULL UNIQUE,
english_name TEXT,
thai_name TEXT,
latitude REAL,
longitude REAL,
basin TEXT,
province TEXT,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create water_measurements table
cursor.execute("""
CREATE TABLE water_measurements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
station_id INTEGER NOT NULL,
water_level REAL NOT NULL,
discharge REAL,
discharge_percent REAL,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (station_id) REFERENCES stations (id)
)
""")
# Insert P.1 station (id=8 to match existing data)
cursor.execute("""
INSERT INTO stations (id, station_code, english_name, thai_name, basin, province)
VALUES (8, 'P.1', 'Nawarat Bridge', 'สะพานนวรัฐ', 'Ping', 'Chiang Mai')
""")
conn.commit()
conn.close()
return db_path
def test_zone_level_alerts():
"""Test that zone-based alerts trigger correctly"""
print("="*70)
print("TEST 1: Zone-Based Water Level Alerts")
print("="*70)
db_path = setup_test_database('zone_tests')
# Test cases for P.1 zone thresholds
test_cases = [
(2.5, None, "Below all zones"),
(3.7, AlertLevel.INFO, "Zone 1"),
(3.9, AlertLevel.INFO, "Zone 2"),
(4.0, AlertLevel.WARNING, "Zone 3"),
(4.2, AlertLevel.WARNING, "Zone 5"),
(4.3, AlertLevel.CRITICAL, "Zone 6"),
(4.6, AlertLevel.CRITICAL, "Zone 7"),
(4.8, AlertLevel.EMERGENCY, "Zone 8/NewEdge"),
(5.0, AlertLevel.EMERGENCY, "Above all zones"),
]
print("\nTesting P.1 (Nawarat Bridge) zone thresholds:")
print("-" * 70)
passed = 0
failed = 0
for water_level, expected_level, zone_description in test_cases:
# Insert test data
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM water_measurements")
current_time = datetime.datetime.now()
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, ?, 350.0)
""", (current_time, water_level))
conn.commit()
conn.close()
# Check alerts
alerting = WaterLevelAlertSystem()
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
alerting.db_adapter.connect()
alerts = alerting.check_water_levels()
# Verify result
if expected_level is None:
# Should not trigger any alert
if len(alerts) == 0:
print(f"[PASS] {water_level:.1f}m: {zone_description} - No alert")
passed += 1
else:
print(f"[FAIL] {water_level:.1f}m: {zone_description} - Unexpected alert")
failed += 1
else:
# Should trigger alert with specific level
if len(alerts) > 0 and alerts[0].level == expected_level:
print(f"[PASS] {water_level:.1f}m: {zone_description} - {expected_level.value.upper()} alert")
passed += 1
elif len(alerts) == 0:
print(f"[FAIL] {water_level:.1f}m: {zone_description} - No alert triggered")
failed += 1
else:
print(f"[FAIL] {water_level:.1f}m: {zone_description} - Wrong alert level: {alerts[0].level.value}")
failed += 1
print("-" * 70)
print(f"Zone Alert Tests: {passed} passed, {failed} failed")
# Cleanup - force garbage collection and wait briefly before removing file
gc.collect()
time.sleep(0.5)
try:
os.remove(db_path)
except PermissionError:
print(f"Warning: Could not remove test database {db_path}")
return failed == 0
def test_rate_of_change_alerts():
"""Test that rate-of-change alerts trigger correctly"""
print("\n" + "="*70)
print("TEST 2: Rate-of-Change Water Level Alerts")
print("="*70)
db_path = setup_test_database('rate_tests')
# Test cases: (initial_level, final_level, hours_elapsed, expected_alert_level, description)
test_cases = [
(3.0, 3.1, 3.0, None, "Slow rise (0.03m/h)"),
(3.0, 3.5, 3.0, AlertLevel.WARNING, "Moderate rise (0.17m/h)"),
(3.0, 3.8, 3.0, AlertLevel.CRITICAL, "Rapid rise (0.27m/h)"),
(3.0, 4.2, 3.0, AlertLevel.EMERGENCY, "Very rapid rise (0.40m/h)"),
(4.0, 3.5, 3.0, None, "Falling water (negative rate)"),
]
print("\nTesting P.1 rate-of-change thresholds:")
print(" Warning: 0.15 m/h (15 cm/h)")
print(" Critical: 0.25 m/h (25 cm/h)")
print(" Emergency: 0.40 m/h (40 cm/h)")
print("-" * 70)
passed = 0
failed = 0
for initial_level, final_level, hours, expected_level, description in test_cases:
# Insert test data simulating water level change over time
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM water_measurements")
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(hours=hours)
# Insert initial measurement
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, ?, 350.0)
""", (start_time, initial_level))
# Insert final measurement
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, ?, 380.0)
""", (current_time, final_level))
conn.commit()
conn.close()
# Check rate-of-change alerts
alerting = WaterLevelAlertSystem()
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
alerting.db_adapter.connect()
rate_alerts = alerting.check_rate_of_change(lookback_hours=int(hours) + 1)
# Calculate actual rate for display
level_change = final_level - initial_level
rate = level_change / hours if hours > 0 else 0
# Verify result
if expected_level is None:
# Should not trigger any alert
if len(rate_alerts) == 0:
print(f"[PASS] {rate:+.2f}m/h: {description} - No alert")
passed += 1
else:
print(f"[FAIL] {rate:+.2f}m/h: {description} - Unexpected alert")
print(f" Alert: {rate_alerts[0].alert_type} - {rate_alerts[0].level.value}")
failed += 1
else:
# Should trigger alert with specific level
if len(rate_alerts) > 0 and rate_alerts[0].level == expected_level:
print(f"[PASS] {rate:+.2f}m/h: {description} - {expected_level.value.upper()} alert")
print(f" Message: {rate_alerts[0].message}")
passed += 1
elif len(rate_alerts) == 0:
print(f"[FAIL] {rate:+.2f}m/h: {description} - No alert triggered")
failed += 1
else:
print(f"[FAIL] {rate:+.2f}m/h: {description} - Wrong alert level: {rate_alerts[0].level.value}")
failed += 1
print("-" * 70)
print(f"Rate-of-Change Tests: {passed} passed, {failed} failed")
# Cleanup - force garbage collection and wait briefly before removing file
gc.collect()
time.sleep(0.5)
try:
os.remove(db_path)
except PermissionError:
print(f"Warning: Could not remove test database {db_path}")
return failed == 0
def test_combined_alerts():
"""Test scenario where both zone and rate-of-change alerts trigger"""
print("\n" + "="*70)
print("TEST 3: Combined Zone + Rate-of-Change Alerts")
print("="*70)
db_path = setup_test_database('combined_tests')
print("\nScenario: Water rising rapidly from 3.5m to 4.5m over 3 hours")
print(" Expected: Both Zone 7 alert AND Critical rate-of-change alert")
print("-" * 70)
# Insert test data
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(hours=3)
# Water rising from 3.5m to 4.5m over 3 hours (0.33 m/h - Critical rate)
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, 3.5, 350.0)
""", (start_time,))
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, 4.5, 450.0)
""", (current_time,))
conn.commit()
conn.close()
# Check both types of alerts
alerting = WaterLevelAlertSystem()
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
alerting.db_adapter.connect()
zone_alerts = alerting.check_water_levels()
rate_alerts = alerting.check_rate_of_change(lookback_hours=4)
all_alerts = zone_alerts + rate_alerts
print(f"\nTotal alerts triggered: {len(all_alerts)}")
zone_alert_found = False
rate_alert_found = False
for alert in all_alerts:
print(f"\n Alert Type: {alert.alert_type}")
print(f" Severity: {alert.level.value.upper()}")
print(f" Water Level: {alert.water_level:.2f}m")
if alert.message:
print(f" Details: {alert.message}")
if "Zone" in alert.alert_type:
zone_alert_found = True
if "Rise" in alert.alert_type or "rate" in alert.alert_type.lower():
rate_alert_found = True
print("-" * 70)
if zone_alert_found and rate_alert_found:
print("[PASS] Combined Alert Test - Both alert types triggered")
success = True
else:
print("[FAIL] Combined Alert Test")
if not zone_alert_found:
print(" Missing: Zone-based alert")
if not rate_alert_found:
print(" Missing: Rate-of-change alert")
success = False
# Cleanup - force garbage collection and wait briefly before removing file
gc.collect()
time.sleep(0.5)
try:
os.remove(db_path)
except PermissionError:
print(f"Warning: Could not remove test database {db_path}")
return success
def main():
"""Run all alert tests"""
print("\n" + "="*70)
print("WATER LEVEL ALERTING SYSTEM - COMPREHENSIVE TESTS")
print("="*70)
results = []
# Run tests
results.append(("Zone-Based Alerts", test_zone_level_alerts()))
results.append(("Rate-of-Change Alerts", test_rate_of_change_alerts()))
results.append(("Combined Alerts", test_combined_alerts()))
# Summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
all_passed = True
for test_name, passed in results:
status = "PASS" if passed else "FAIL"
print(f"{test_name}: [{status}]")
if not passed:
all_passed = False
print("="*70)
if all_passed:
print("\nAll tests PASSED!")
return 0
else:
print("\nSome tests FAILED!")
return 1
if __name__ == "__main__":
sys.exit(main())