5 Commits

Author SHA1 Message Date
c57e46ae21 Filter alerts to upstream stations only and add Grafana dashboard link
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Alerts now only sent for stations upstream of Chiang Mai:
  P.20, P.75, P.92, P.4A, P.67, P.21, P.103, and P.1
- Added Grafana dashboard link to all alert messages
- Dashboard URL: https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water
- Fixed flake8 linting issues (line length, undefined variable, unused import)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-16 17:25:27 +07:00
6a76a88f32 Update pyproject.toml to use dependency-groups instead of tool.uv.dev-dependencies
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Replaced deprecated tool.uv.dev-dependencies with dependency-groups.dev
- Follows new uv standard for dependency group declaration

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-09 11:56:54 +07:00
e62a20022e Add pre-commit configuration
- Added Black for code formatting (line-length 120)
- Added isort for import sorting
- Added flake8 for linting
- Added standard pre-commit hooks for file checks

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-09 11:52:17 +07:00
58cc60ba19 Integrate automatic alerting into continuous monitoring
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Alerts now run automatically after every successful new data fetch
- Works for both hourly fetches and retry mode exits
- Alert check runs when fresh data is saved to database
- Logs alert results (total generated and sent count)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:44:57 +07:00
cc007f0e0c Add comprehensive alerting system tests
- Created test suite for zone-based water level alerts (9 test cases)
- Created test suite for rate-of-change alerts (5 test cases)
- Created combined alert scenario test
- Fixed rate-of-change detection to use station_code instead of station_id
- All 3 test suites passing (14 total test cases)

Test coverage:
  - Zone alerts: P.1 zones 1-8 with INFO/WARNING/CRITICAL/EMERGENCY levels
  - Rate-of-change: 0.15/0.25/0.40 m/h thresholds for WARNING/CRITICAL/EMERGENCY
  - Combined: Simultaneous zone and rate-of-change alert triggering

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:35:34 +07:00
6 changed files with 640 additions and 105 deletions

87
.env.back Normal file
View File

@@ -0,0 +1,87 @@
# Northern Thailand Ping River Monitor Configuration
# Copy this file to .env and customize for your environment
# Database Configuration
DB_TYPE=postgresql
# Options: sqlite, mysql, postgresql, influxdb, victoriametrics
# SQLite Configuration (default)
WATER_DB_PATH=water_levels.db
# VictoriaMetrics Configuration
VM_HOST=localhost
VM_PORT=8428
VM_URL=
# InfluxDB Configuration
INFLUX_HOST=localhost
INFLUX_PORT=8086
INFLUX_DATABASE=ping_river_monitoring
INFLUX_USERNAME=
INFLUX_PASSWORD=
# PostgreSQL Configuration (Remote Server)
# Option 1: Full connection string (URL encode special characters in password)
#POSTGRES_CONNECTION_STRING=postgresql://username:url_encoded_password@your-postgres-host:5432/water_monitoring
# Option 2: Individual components (password will be automatically URL encoded)
POSTGRES_HOST=10.0.10.201
POSTGRES_PORT=5432
POSTGRES_DB=ping_river
POSTGRES_USER=ping_river
POSTGRES_PASSWORD=3_%m]k:+16"rx?M#`swIA
# Examples for connection string:
# - Local: postgresql://postgres:password@localhost:5432/water_monitoring
# - Remote: postgresql://user:pass@192.168.1.100:5432/water_monitoring
# - With special chars: postgresql://user:my%3Apass%40word@host:5432/db
# - With SSL: postgresql://user:pass@host:port/db?sslmode=require
# - Connection pooling: postgresql://user:pass@host:port/db?pool_size=20&max_overflow=0
# Special character URL encoding:
# : → %3A @ → %40 # → %23 ? → %3F & → %26 / → %2F % → %25
# MySQL Configuration
MYSQL_CONNECTION_STRING=mysql://user:password@localhost:3306/ping_river_monitoring
# API Configuration
API_HOST=0.0.0.0
API_PORT=8000
API_WORKERS=1
# Data Collection Settings
SCRAPING_INTERVAL_HOURS=1
REQUEST_TIMEOUT=30
MAX_RETRIES=3
RETRY_DELAY_SECONDS=60
# Data Retention
DATA_RETENTION_DAYS=365
# Logging Configuration
LOG_LEVEL=INFO
LOG_FILE=water_monitor.log
# Security (for production)
SECRET_KEY=your-secret-key-here
API_KEY=your-api-key-here
# Monitoring
ENABLE_METRICS=true
ENABLE_HEALTH_CHECKS=true
# Geographic Settings
TIMEZONE=Asia/Bangkok
DEFAULT_LATITUDE=18.7875
DEFAULT_LONGITUDE=99.0045
# External Services
NOTIFICATION_EMAIL=
SMTP_SERVER=
SMTP_PORT=587
SMTP_USERNAME=
SMTP_PASSWORD=
# Development Settings
DEBUG=false
DEVELOPMENT_MODE=false

40
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,40 @@
# Pre-commit hooks for Northern Thailand Ping River Monitor
# See https://pre-commit.com for more information
repos:
# General file checks
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-json
- id: check-toml
- id: check-added-large-files
args: ['--maxkb=1000']
- id: check-merge-conflict
- id: check-case-conflict
- id: mixed-line-ending
# Python code formatting with Black
- repo: https://github.com/psf/black
rev: 23.11.0
hooks:
- id: black
language_version: python3
args: ['--line-length=120']
# Import sorting with isort
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ['--profile', 'black', '--line-length', '120']
# Linting with flake8
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
args: ['--max-line-length=120', '--extend-ignore=E203,W503']

View File

@@ -96,8 +96,8 @@ Repository = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor"
Issues = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/issues"
Documentation = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/wiki"
[tool.uv]
dev-dependencies = [
[dependency-groups]
dev = [
# Testing
"pytest==7.4.3",
"pytest-cov==4.1.0",

View File

@@ -3,33 +3,38 @@
Water Level Alerting System with Matrix Integration
"""
import os
import json
import requests
import datetime
from typing import List, Dict, Optional
import os
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import requests
try:
from .config import Config
from .database_adapters import create_database_adapter
from .logging_config import get_logger
except ImportError:
import logging
from config import Config
from database_adapters import create_database_adapter
import logging
def get_logger(name):
return logging.getLogger(name)
logger = get_logger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class WaterAlert:
station_code: str
@@ -42,9 +47,10 @@ class WaterAlert:
timestamp: Optional[datetime.datetime] = None
message: Optional[str] = None
class MatrixNotifier:
def __init__(self, homeserver: str, access_token: str, room_id: str):
self.homeserver = homeserver.rstrip('/')
self.homeserver = homeserver.rstrip("/")
self.access_token = access_token
self.room_id = room_id
self.session = requests.Session()
@@ -56,15 +62,9 @@ class MatrixNotifier:
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json"}
data = {
"msgtype": msgtype,
"body": message
}
data = {"msgtype": msgtype, "body": message}
# Matrix API requires PUT when transaction ID is in the URL path
response = self.session.put(url, headers=headers, json=data, timeout=10)
@@ -83,7 +83,7 @@ class MatrixNotifier:
AlertLevel.INFO: "",
AlertLevel.WARNING: "⚠️",
AlertLevel.CRITICAL: "🚨",
AlertLevel.EMERGENCY: "🆘"
AlertLevel.EMERGENCY: "🆘",
}
emoji = emoji_map.get(alert.level, "📊")
@@ -108,20 +108,38 @@ class MatrixNotifier:
if alert.message:
message += f"\n**Details:** {alert.message}\n"
message += f"\n📈 View dashboard: {os.getenv('GRAFANA_URL', 'http://localhost:3000')}"
# Add Grafana dashboard link
grafana_url = (
"https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water"
"?orgId=1&from=now-30d&to=now&timezone=browser"
)
message += f"\n📈 **View Dashboard:** {grafana_url}"
return self.send_message(message)
class WaterLevelAlertSystem:
# Stations upstream of Chiang Mai (and CNX itself) to monitor
UPSTREAM_STATIONS = {
"P.20", # Ban Chiang Dao
"P.75", # Ban Chai Lat
"P.92", # Ban Muang Aut
"P.4A", # Ban Mae Taeng
"P.67", # Ban Tae
"P.21", # Ban Rim Tai
"P.103", # Ring Bridge 3
"P.1", # Nawarat Bridge (Chiang Mai)
}
def __init__(self):
self.db_adapter = None
self.matrix_notifier = None
self.thresholds = self._load_thresholds()
# Matrix configuration from environment
matrix_homeserver = os.getenv('MATRIX_HOMESERVER', 'https://matrix.org')
matrix_token = os.getenv('MATRIX_ACCESS_TOKEN')
matrix_room = os.getenv('MATRIX_ROOM_ID')
matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
matrix_token = os.getenv("MATRIX_ACCESS_TOKEN")
matrix_room = os.getenv("MATRIX_ROOM_ID")
if matrix_token and matrix_room:
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
@@ -147,7 +165,7 @@ class WaterLevelAlertSystem:
# Keep legacy thresholds for compatibility
"warning": 3.7,
"critical": 4.3,
"emergency": 4.8
"emergency": 4.8,
},
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
@@ -156,7 +174,7 @@ class WaterLevelAlertSystem:
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
# Default for unknown stations
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0}
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0},
}
def connect_database(self):
@@ -164,8 +182,7 @@ class WaterLevelAlertSystem:
try:
db_config = Config.get_database_config()
self.db_adapter = create_database_adapter(
db_config['type'],
connection_string=db_config['connection_string']
db_config["type"], connection_string=db_config["connection_string"]
)
if self.db_adapter.connect():
@@ -192,14 +209,18 @@ class WaterLevelAlertSystem:
measurements = self.db_adapter.get_latest_measurements(limit=50)
for measurement in measurements:
station_code = measurement.get('station_code', 'UNKNOWN')
water_level = measurement.get('water_level')
station_code = measurement.get("station_code", "UNKNOWN")
water_level = measurement.get("water_level")
if not water_level:
continue
# Only alert for upstream stations and Chiang Mai
if station_code not in self.UPSTREAM_STATIONS:
continue
# Get thresholds for this station
station_thresholds = self.thresholds.get(station_code, self.thresholds['default'])
station_thresholds = self.thresholds.get(station_code, self.thresholds["default"])
# Check each threshold level
alert_level = None
@@ -207,7 +228,7 @@ class WaterLevelAlertSystem:
alert_type = None
# Special handling for P.1 with zone-based thresholds
if station_code == "P.1" and 'zone_1' in station_thresholds:
if station_code == "P.1" and "zone_1" in station_thresholds:
# Check all zones in reverse order (highest to lowest)
zones = [
("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"),
@@ -230,29 +251,29 @@ class WaterLevelAlertSystem:
else:
# Standard threshold checking for other stations
if water_level >= station_thresholds.get('emergency', float('inf')):
if water_level >= station_thresholds.get("emergency", float("inf")):
alert_level = AlertLevel.EMERGENCY
threshold_value = station_thresholds['emergency']
threshold_value = station_thresholds["emergency"]
alert_type = "Emergency Water Level"
elif water_level >= station_thresholds.get('critical', float('inf')):
elif water_level >= station_thresholds.get("critical", float("inf")):
alert_level = AlertLevel.CRITICAL
threshold_value = station_thresholds['critical']
threshold_value = station_thresholds["critical"]
alert_type = "Critical Water Level"
elif water_level >= station_thresholds.get('warning', float('inf')):
elif water_level >= station_thresholds.get("warning", float("inf")):
alert_level = AlertLevel.WARNING
threshold_value = station_thresholds['warning']
threshold_value = station_thresholds["warning"]
alert_type = "High Water Level"
if alert_level:
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
station_name=measurement.get("station_name_th", f"Station {station_code}"),
alert_type=alert_type,
level=alert_level,
water_level=water_level,
threshold=threshold_value,
discharge=measurement.get('discharge'),
timestamp=measurement.get('timestamp')
discharge=measurement.get("discharge"),
timestamp=measurement.get("timestamp"),
)
alerts.append(alert)
@@ -261,7 +282,7 @@ class WaterLevelAlertSystem:
return alerts
def check_data_freshness(self, max_age_hours: int = 2) -> List[WaterAlert]:
def check_data_freshness(self, max_age_hours: int = 4) -> List[WaterAlert]:
"""Check if data is fresh enough"""
alerts = []
@@ -273,21 +294,21 @@ class WaterLevelAlertSystem:
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
for measurement in measurements:
timestamp = measurement.get('timestamp')
timestamp = measurement.get("timestamp")
if timestamp and timestamp < cutoff_time:
station_code = measurement.get('station_code', 'UNKNOWN')
station_code = measurement.get("station_code", "UNKNOWN")
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
station_name=measurement.get("station_name_th", f"Station {station_code}"),
alert_type="Stale Data",
level=AlertLevel.WARNING,
water_level=measurement.get('water_level', 0),
water_level=measurement.get("water_level", 0),
threshold=max_age_hours,
timestamp=timestamp,
message=f"No fresh data for {age_hours:.1f} hours"
message=f"No fresh data for {age_hours:.1f} hours",
)
alerts.append(alert)
@@ -307,15 +328,11 @@ class WaterLevelAlertSystem:
# Define rate-of-change thresholds (meters per hour)
rate_thresholds = {
"P.1": {
"warning": 0.15, # 15cm/hour - moderate rise
"critical": 0.25, # 25cm/hour - rapid rise
"emergency": 0.40 # 40cm/hour - very rapid rise
"warning": 0.15, # 15cm/hour - moderate rise
"critical": 0.25, # 25cm/hour - rapid rise
"emergency": 0.40, # 40cm/hour - very rapid rise
},
"default": {
"warning": 0.20,
"critical": 0.35,
"emergency": 0.50
}
"default": {"warning": 0.20, "critical": 0.35, "emergency": 0.50},
}
# Get recent measurements for each station
@@ -323,48 +340,34 @@ class WaterLevelAlertSystem:
# Get unique stations from latest data
latest = self.db_adapter.get_latest_measurements(limit=20)
station_ids = set(m.get('station_id') for m in latest if m.get('station_id'))
station_codes = set(m.get("station_code") for m in latest if m.get("station_code"))
for station_id in station_ids:
for station_code in station_codes:
try:
# Get measurements for this station in the time window using database adapter
# Try direct connection for SQLite/PostgreSQL
results = []
# Only alert for upstream stations and Chiang Mai
if station_code not in self.UPSTREAM_STATIONS:
continue
try:
import sqlite3
import psycopg2
# Get measurements for this station in the time window
current_time = datetime.datetime.now()
measurements = self.db_adapter.get_measurements_by_timerange(
start_time=cutoff_time, end_time=current_time, station_codes=[station_code]
)
# Check if we have a connection object (SQLite or PostgreSQL)
if hasattr(self.db_adapter, 'conn') and self.db_adapter.conn:
# SQLite/PostgreSQL style query
query = """
SELECT timestamp, water_level, station_id
FROM water_measurements
WHERE station_id = ? AND timestamp >= ?
ORDER BY timestamp ASC
"""
cursor = self.db_adapter.conn.cursor()
cursor.execute(query, (station_id, cutoff_time))
results = cursor.fetchall()
except:
# Fallback: use get_latest_measurements and filter
all_measurements = self.db_adapter.get_latest_measurements(limit=500)
results = []
for m in all_measurements:
if m.get('station_id') == station_id and m.get('timestamp') and m.get('timestamp') >= cutoff_time:
results.append((m['timestamp'], m['water_level'], m.get('station_id')))
if len(results) < 2:
if len(measurements) < 2:
continue # Need at least 2 points to calculate rate
# Get oldest and newest measurements
oldest = results[0]
newest = results[-1]
# Sort by timestamp
measurements = sorted(measurements, key=lambda m: m.get("timestamp"))
oldest_time, oldest_level, _ = oldest
newest_time, newest_level, _ = newest
# Get oldest and newest measurements
oldest = measurements[0]
newest = measurements[-1]
oldest_time = oldest.get("timestamp")
oldest_level = oldest.get("water_level")
newest_time = newest.get("timestamp")
newest_level = newest.get("water_level")
# Convert timestamp strings to datetime if needed
if isinstance(oldest_time, str):
@@ -385,47 +388,49 @@ class WaterLevelAlertSystem:
continue
# Get station info from latest data
station_info = next((m for m in latest if m.get('station_id') == station_id), {})
station_code = station_info.get('station_code', f'Station {station_id}')
station_name = station_info.get('station_name_th', station_code)
station_info = next((m for m in latest if m.get("station_code") == station_code), {})
station_name = station_info.get("station_name_th", station_code)
# Get thresholds for this station
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds['default'])
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds["default"])
alert_level = None
threshold_value = None
alert_type = None
if rate_per_hour >= station_rate_threshold['emergency']:
if rate_per_hour >= station_rate_threshold["emergency"]:
alert_level = AlertLevel.EMERGENCY
threshold_value = station_rate_threshold['emergency']
threshold_value = station_rate_threshold["emergency"]
alert_type = "Very Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold['critical']:
elif rate_per_hour >= station_rate_threshold["critical"]:
alert_level = AlertLevel.CRITICAL
threshold_value = station_rate_threshold['critical']
threshold_value = station_rate_threshold["critical"]
alert_type = "Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold['warning']:
elif rate_per_hour >= station_rate_threshold["warning"]:
alert_level = AlertLevel.WARNING
threshold_value = station_rate_threshold['warning']
threshold_value = station_rate_threshold["warning"]
alert_type = "Moderate Water Level Rise"
if alert_level:
message = f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h (change: {level_change:+.2f}m)"
message = (
f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h "
f"(change: {level_change:+.2f}m)"
)
alert = WaterAlert(
station_code=station_code,
station_name=station_name or f'Station {station_code}',
station_name=station_name or f"Station {station_code}",
alert_type=alert_type,
level=alert_level,
water_level=newest_level,
threshold=threshold_value,
timestamp=newest_time,
message=message
message=message,
)
alerts.append(alert)
except Exception as station_error:
logger.debug(f"Error checking rate of change for station {station_id}: {station_error}")
logger.debug(f"Error checking rate of change for station {station_code}: {station_error}")
continue
except Exception as e:
@@ -480,9 +485,10 @@ class WaterLevelAlertSystem:
"data_alerts": len(data_alerts),
"rate_alerts": len(rate_alerts),
"total_alerts": len(all_alerts),
"sent": sent_count
"sent": sent_count,
}
def main():
"""Standalone alerting check"""
import argparse
@@ -496,7 +502,10 @@ def main():
if args.test:
if alerting.matrix_notifier:
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\nIf you received this, Matrix notifications are working correctly!"
test_message = (
"🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\n"
"If you received this, Matrix notifications are working correctly!"
)
success = alerting.matrix_notifier.send_message(test_message)
print(f"Test message sent: {success}")
else:
@@ -509,5 +518,6 @@ def main():
else:
print("Use --check or --test")
if __name__ == "__main__":
main()
main()

View File

@@ -64,7 +64,7 @@ def run_test_cycle():
return False
def run_continuous_monitoring():
"""Run continuous monitoring with adaptive scheduling"""
"""Run continuous monitoring with adaptive scheduling and alerting"""
logger.info("Starting continuous monitoring...")
try:
@@ -75,11 +75,16 @@ def run_continuous_monitoring():
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Initialize alerting system
from .alerting import WaterLevelAlertSystem
alerting = WaterLevelAlertSystem()
# Setup signal handlers
setup_signal_handlers(scraper)
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
logger.info("Alerts: automatic check after each successful data fetch")
logger.info("Press Ctrl+C to stop")
# Run initial cycle
@@ -109,6 +114,16 @@ def run_continuous_monitoring():
if success:
last_successful_fetch = current_time
# Run alert check after every successful new data fetch
logger.info("Running alert check...")
try:
alert_results = alerting.run_alert_check()
if alert_results.get('total_alerts', 0) > 0:
logger.info(f"Alerts: {alert_results['total_alerts']} generated, {alert_results['sent']} sent")
except Exception as e:
logger.error(f"Alert check failed: {e}")
if retry_mode:
logger.info("✅ Data fetch successful - switching back to hourly schedule")
retry_mode = False

383
tests/test_alerting.py Normal file
View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
Comprehensive tests for the alerting system
Tests both zone-based and rate-of-change alerts
"""
import sys
import os
import datetime
import sqlite3
import time
import gc
# Add src directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.alerting import WaterLevelAlertSystem, AlertLevel
from src.database_adapters import create_database_adapter
def setup_test_database(test_name='default'):
"""Create a test database with sample data"""
db_path = f'test_alerts_{test_name}.db'
# Remove existing test database
if os.path.exists(db_path):
try:
os.remove(db_path)
except PermissionError:
# If locked, use a different name with timestamp
import random
db_path = f'test_alerts_{test_name}_{random.randint(1000, 9999)}.db'
# Create new database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create stations table
cursor.execute("""
CREATE TABLE stations (
id INTEGER PRIMARY KEY,
station_code TEXT NOT NULL UNIQUE,
english_name TEXT,
thai_name TEXT,
latitude REAL,
longitude REAL,
basin TEXT,
province TEXT,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create water_measurements table
cursor.execute("""
CREATE TABLE water_measurements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
station_id INTEGER NOT NULL,
water_level REAL NOT NULL,
discharge REAL,
discharge_percent REAL,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (station_id) REFERENCES stations (id)
)
""")
# Insert P.1 station (id=8 to match existing data)
cursor.execute("""
INSERT INTO stations (id, station_code, english_name, thai_name, basin, province)
VALUES (8, 'P.1', 'Nawarat Bridge', 'สะพานนวรัฐ', 'Ping', 'Chiang Mai')
""")
conn.commit()
conn.close()
return db_path
def test_zone_level_alerts():
"""Test that zone-based alerts trigger correctly"""
print("="*70)
print("TEST 1: Zone-Based Water Level Alerts")
print("="*70)
db_path = setup_test_database('zone_tests')
# Test cases for P.1 zone thresholds
test_cases = [
(2.5, None, "Below all zones"),
(3.7, AlertLevel.INFO, "Zone 1"),
(3.9, AlertLevel.INFO, "Zone 2"),
(4.0, AlertLevel.WARNING, "Zone 3"),
(4.2, AlertLevel.WARNING, "Zone 5"),
(4.3, AlertLevel.CRITICAL, "Zone 6"),
(4.6, AlertLevel.CRITICAL, "Zone 7"),
(4.8, AlertLevel.EMERGENCY, "Zone 8/NewEdge"),
(5.0, AlertLevel.EMERGENCY, "Above all zones"),
]
print("\nTesting P.1 (Nawarat Bridge) zone thresholds:")
print("-" * 70)
passed = 0
failed = 0
for water_level, expected_level, zone_description in test_cases:
# Insert test data
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM water_measurements")
current_time = datetime.datetime.now()
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, ?, 350.0)
""", (current_time, water_level))
conn.commit()
conn.close()
# Check alerts
alerting = WaterLevelAlertSystem()
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
alerting.db_adapter.connect()
alerts = alerting.check_water_levels()
# Verify result
if expected_level is None:
# Should not trigger any alert
if len(alerts) == 0:
print(f"[PASS] {water_level:.1f}m: {zone_description} - No alert")
passed += 1
else:
print(f"[FAIL] {water_level:.1f}m: {zone_description} - Unexpected alert")
failed += 1
else:
# Should trigger alert with specific level
if len(alerts) > 0 and alerts[0].level == expected_level:
print(f"[PASS] {water_level:.1f}m: {zone_description} - {expected_level.value.upper()} alert")
passed += 1
elif len(alerts) == 0:
print(f"[FAIL] {water_level:.1f}m: {zone_description} - No alert triggered")
failed += 1
else:
print(f"[FAIL] {water_level:.1f}m: {zone_description} - Wrong alert level: {alerts[0].level.value}")
failed += 1
print("-" * 70)
print(f"Zone Alert Tests: {passed} passed, {failed} failed")
# Cleanup - force garbage collection and wait briefly before removing file
gc.collect()
time.sleep(0.5)
try:
os.remove(db_path)
except PermissionError:
print(f"Warning: Could not remove test database {db_path}")
return failed == 0
def test_rate_of_change_alerts():
"""Test that rate-of-change alerts trigger correctly"""
print("\n" + "="*70)
print("TEST 2: Rate-of-Change Water Level Alerts")
print("="*70)
db_path = setup_test_database('rate_tests')
# Test cases: (initial_level, final_level, hours_elapsed, expected_alert_level, description)
test_cases = [
(3.0, 3.1, 3.0, None, "Slow rise (0.03m/h)"),
(3.0, 3.5, 3.0, AlertLevel.WARNING, "Moderate rise (0.17m/h)"),
(3.0, 3.8, 3.0, AlertLevel.CRITICAL, "Rapid rise (0.27m/h)"),
(3.0, 4.2, 3.0, AlertLevel.EMERGENCY, "Very rapid rise (0.40m/h)"),
(4.0, 3.5, 3.0, None, "Falling water (negative rate)"),
]
print("\nTesting P.1 rate-of-change thresholds:")
print(" Warning: 0.15 m/h (15 cm/h)")
print(" Critical: 0.25 m/h (25 cm/h)")
print(" Emergency: 0.40 m/h (40 cm/h)")
print("-" * 70)
passed = 0
failed = 0
for initial_level, final_level, hours, expected_level, description in test_cases:
# Insert test data simulating water level change over time
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM water_measurements")
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(hours=hours)
# Insert initial measurement
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, ?, 350.0)
""", (start_time, initial_level))
# Insert final measurement
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, ?, 380.0)
""", (current_time, final_level))
conn.commit()
conn.close()
# Check rate-of-change alerts
alerting = WaterLevelAlertSystem()
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
alerting.db_adapter.connect()
rate_alerts = alerting.check_rate_of_change(lookback_hours=int(hours) + 1)
# Calculate actual rate for display
level_change = final_level - initial_level
rate = level_change / hours if hours > 0 else 0
# Verify result
if expected_level is None:
# Should not trigger any alert
if len(rate_alerts) == 0:
print(f"[PASS] {rate:+.2f}m/h: {description} - No alert")
passed += 1
else:
print(f"[FAIL] {rate:+.2f}m/h: {description} - Unexpected alert")
print(f" Alert: {rate_alerts[0].alert_type} - {rate_alerts[0].level.value}")
failed += 1
else:
# Should trigger alert with specific level
if len(rate_alerts) > 0 and rate_alerts[0].level == expected_level:
print(f"[PASS] {rate:+.2f}m/h: {description} - {expected_level.value.upper()} alert")
print(f" Message: {rate_alerts[0].message}")
passed += 1
elif len(rate_alerts) == 0:
print(f"[FAIL] {rate:+.2f}m/h: {description} - No alert triggered")
failed += 1
else:
print(f"[FAIL] {rate:+.2f}m/h: {description} - Wrong alert level: {rate_alerts[0].level.value}")
failed += 1
print("-" * 70)
print(f"Rate-of-Change Tests: {passed} passed, {failed} failed")
# Cleanup - force garbage collection and wait briefly before removing file
gc.collect()
time.sleep(0.5)
try:
os.remove(db_path)
except PermissionError:
print(f"Warning: Could not remove test database {db_path}")
return failed == 0
def test_combined_alerts():
"""Test scenario where both zone and rate-of-change alerts trigger"""
print("\n" + "="*70)
print("TEST 3: Combined Zone + Rate-of-Change Alerts")
print("="*70)
db_path = setup_test_database('combined_tests')
print("\nScenario: Water rising rapidly from 3.5m to 4.5m over 3 hours")
print(" Expected: Both Zone 7 alert AND Critical rate-of-change alert")
print("-" * 70)
# Insert test data
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(hours=3)
# Water rising from 3.5m to 4.5m over 3 hours (0.33 m/h - Critical rate)
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, 3.5, 350.0)
""", (start_time,))
cursor.execute("""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
VALUES (?, 8, 4.5, 450.0)
""", (current_time,))
conn.commit()
conn.close()
# Check both types of alerts
alerting = WaterLevelAlertSystem()
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
alerting.db_adapter.connect()
zone_alerts = alerting.check_water_levels()
rate_alerts = alerting.check_rate_of_change(lookback_hours=4)
all_alerts = zone_alerts + rate_alerts
print(f"\nTotal alerts triggered: {len(all_alerts)}")
zone_alert_found = False
rate_alert_found = False
for alert in all_alerts:
print(f"\n Alert Type: {alert.alert_type}")
print(f" Severity: {alert.level.value.upper()}")
print(f" Water Level: {alert.water_level:.2f}m")
if alert.message:
print(f" Details: {alert.message}")
if "Zone" in alert.alert_type:
zone_alert_found = True
if "Rise" in alert.alert_type or "rate" in alert.alert_type.lower():
rate_alert_found = True
print("-" * 70)
if zone_alert_found and rate_alert_found:
print("[PASS] Combined Alert Test - Both alert types triggered")
success = True
else:
print("[FAIL] Combined Alert Test")
if not zone_alert_found:
print(" Missing: Zone-based alert")
if not rate_alert_found:
print(" Missing: Rate-of-change alert")
success = False
# Cleanup - force garbage collection and wait briefly before removing file
gc.collect()
time.sleep(0.5)
try:
os.remove(db_path)
except PermissionError:
print(f"Warning: Could not remove test database {db_path}")
return success
def main():
"""Run all alert tests"""
print("\n" + "="*70)
print("WATER LEVEL ALERTING SYSTEM - COMPREHENSIVE TESTS")
print("="*70)
results = []
# Run tests
results.append(("Zone-Based Alerts", test_zone_level_alerts()))
results.append(("Rate-of-Change Alerts", test_rate_of_change_alerts()))
results.append(("Combined Alerts", test_combined_alerts()))
# Summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
all_passed = True
for test_name, passed in results:
status = "PASS" if passed else "FAIL"
print(f"{test_name}: [{status}]")
if not passed:
all_passed = False
print("="*70)
if all_passed:
print("\nAll tests PASSED!")
return 0
else:
print("\nSome tests FAILED!")
return 1
if __name__ == "__main__":
sys.exit(main())