Compare commits
5 Commits
de632cef90
...
master
Author | SHA1 | Date | |
---|---|---|---|
c57e46ae21 | |||
6a76a88f32 | |||
e62a20022e | |||
58cc60ba19 | |||
cc007f0e0c |
87
.env.back
Normal file
87
.env.back
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
# Northern Thailand Ping River Monitor Configuration
|
||||||
|
# Copy this file to .env and customize for your environment
|
||||||
|
|
||||||
|
# Database Configuration
|
||||||
|
DB_TYPE=postgresql
|
||||||
|
# Options: sqlite, mysql, postgresql, influxdb, victoriametrics
|
||||||
|
|
||||||
|
# SQLite Configuration (default)
|
||||||
|
WATER_DB_PATH=water_levels.db
|
||||||
|
|
||||||
|
# VictoriaMetrics Configuration
|
||||||
|
VM_HOST=localhost
|
||||||
|
VM_PORT=8428
|
||||||
|
VM_URL=
|
||||||
|
|
||||||
|
# InfluxDB Configuration
|
||||||
|
INFLUX_HOST=localhost
|
||||||
|
INFLUX_PORT=8086
|
||||||
|
INFLUX_DATABASE=ping_river_monitoring
|
||||||
|
INFLUX_USERNAME=
|
||||||
|
INFLUX_PASSWORD=
|
||||||
|
|
||||||
|
# PostgreSQL Configuration (Remote Server)
|
||||||
|
# Option 1: Full connection string (URL encode special characters in password)
|
||||||
|
#POSTGRES_CONNECTION_STRING=postgresql://username:url_encoded_password@your-postgres-host:5432/water_monitoring
|
||||||
|
|
||||||
|
# Option 2: Individual components (password will be automatically URL encoded)
|
||||||
|
POSTGRES_HOST=10.0.10.201
|
||||||
|
POSTGRES_PORT=5432
|
||||||
|
POSTGRES_DB=ping_river
|
||||||
|
POSTGRES_USER=ping_river
|
||||||
|
POSTGRES_PASSWORD=3_%m]k:+16"rx?M#`swIA
|
||||||
|
|
||||||
|
# Examples for connection string:
|
||||||
|
# - Local: postgresql://postgres:password@localhost:5432/water_monitoring
|
||||||
|
# - Remote: postgresql://user:pass@192.168.1.100:5432/water_monitoring
|
||||||
|
# - With special chars: postgresql://user:my%3Apass%40word@host:5432/db
|
||||||
|
# - With SSL: postgresql://user:pass@host:port/db?sslmode=require
|
||||||
|
# - Connection pooling: postgresql://user:pass@host:port/db?pool_size=20&max_overflow=0
|
||||||
|
|
||||||
|
# Special character URL encoding:
|
||||||
|
# : → %3A @ → %40 # → %23 ? → %3F & → %26 / → %2F % → %25
|
||||||
|
|
||||||
|
# MySQL Configuration
|
||||||
|
MYSQL_CONNECTION_STRING=mysql://user:password@localhost:3306/ping_river_monitoring
|
||||||
|
|
||||||
|
# API Configuration
|
||||||
|
API_HOST=0.0.0.0
|
||||||
|
API_PORT=8000
|
||||||
|
API_WORKERS=1
|
||||||
|
|
||||||
|
# Data Collection Settings
|
||||||
|
SCRAPING_INTERVAL_HOURS=1
|
||||||
|
REQUEST_TIMEOUT=30
|
||||||
|
MAX_RETRIES=3
|
||||||
|
RETRY_DELAY_SECONDS=60
|
||||||
|
|
||||||
|
# Data Retention
|
||||||
|
DATA_RETENTION_DAYS=365
|
||||||
|
|
||||||
|
# Logging Configuration
|
||||||
|
LOG_LEVEL=INFO
|
||||||
|
LOG_FILE=water_monitor.log
|
||||||
|
|
||||||
|
# Security (for production)
|
||||||
|
SECRET_KEY=your-secret-key-here
|
||||||
|
API_KEY=your-api-key-here
|
||||||
|
|
||||||
|
# Monitoring
|
||||||
|
ENABLE_METRICS=true
|
||||||
|
ENABLE_HEALTH_CHECKS=true
|
||||||
|
|
||||||
|
# Geographic Settings
|
||||||
|
TIMEZONE=Asia/Bangkok
|
||||||
|
DEFAULT_LATITUDE=18.7875
|
||||||
|
DEFAULT_LONGITUDE=99.0045
|
||||||
|
|
||||||
|
# External Services
|
||||||
|
NOTIFICATION_EMAIL=
|
||||||
|
SMTP_SERVER=
|
||||||
|
SMTP_PORT=587
|
||||||
|
SMTP_USERNAME=
|
||||||
|
SMTP_PASSWORD=
|
||||||
|
|
||||||
|
# Development Settings
|
||||||
|
DEBUG=false
|
||||||
|
DEVELOPMENT_MODE=false
|
40
.pre-commit-config.yaml
Normal file
40
.pre-commit-config.yaml
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
# Pre-commit hooks for Northern Thailand Ping River Monitor
|
||||||
|
# See https://pre-commit.com for more information
|
||||||
|
|
||||||
|
repos:
|
||||||
|
# General file checks
|
||||||
|
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||||
|
rev: v4.5.0
|
||||||
|
hooks:
|
||||||
|
- id: trailing-whitespace
|
||||||
|
- id: end-of-file-fixer
|
||||||
|
- id: check-yaml
|
||||||
|
- id: check-json
|
||||||
|
- id: check-toml
|
||||||
|
- id: check-added-large-files
|
||||||
|
args: ['--maxkb=1000']
|
||||||
|
- id: check-merge-conflict
|
||||||
|
- id: check-case-conflict
|
||||||
|
- id: mixed-line-ending
|
||||||
|
|
||||||
|
# Python code formatting with Black
|
||||||
|
- repo: https://github.com/psf/black
|
||||||
|
rev: 23.11.0
|
||||||
|
hooks:
|
||||||
|
- id: black
|
||||||
|
language_version: python3
|
||||||
|
args: ['--line-length=120']
|
||||||
|
|
||||||
|
# Import sorting with isort
|
||||||
|
- repo: https://github.com/pycqa/isort
|
||||||
|
rev: 5.12.0
|
||||||
|
hooks:
|
||||||
|
- id: isort
|
||||||
|
args: ['--profile', 'black', '--line-length', '120']
|
||||||
|
|
||||||
|
# Linting with flake8
|
||||||
|
- repo: https://github.com/pycqa/flake8
|
||||||
|
rev: 6.1.0
|
||||||
|
hooks:
|
||||||
|
- id: flake8
|
||||||
|
args: ['--max-line-length=120', '--extend-ignore=E203,W503']
|
@@ -96,8 +96,8 @@ Repository = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor"
|
|||||||
Issues = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/issues"
|
Issues = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/issues"
|
||||||
Documentation = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/wiki"
|
Documentation = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/wiki"
|
||||||
|
|
||||||
[tool.uv]
|
[dependency-groups]
|
||||||
dev-dependencies = [
|
dev = [
|
||||||
# Testing
|
# Testing
|
||||||
"pytest==7.4.3",
|
"pytest==7.4.3",
|
||||||
"pytest-cov==4.1.0",
|
"pytest-cov==4.1.0",
|
||||||
|
214
src/alerting.py
214
src/alerting.py
@@ -3,33 +3,38 @@
|
|||||||
Water Level Alerting System with Matrix Integration
|
Water Level Alerting System with Matrix Integration
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import requests
|
|
||||||
import datetime
|
import datetime
|
||||||
from typing import List, Dict, Optional
|
import os
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .config import Config
|
from .config import Config
|
||||||
from .database_adapters import create_database_adapter
|
from .database_adapters import create_database_adapter
|
||||||
from .logging_config import get_logger
|
from .logging_config import get_logger
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
import logging
|
||||||
|
|
||||||
from config import Config
|
from config import Config
|
||||||
from database_adapters import create_database_adapter
|
from database_adapters import create_database_adapter
|
||||||
import logging
|
|
||||||
def get_logger(name):
|
def get_logger(name):
|
||||||
return logging.getLogger(name)
|
return logging.getLogger(name)
|
||||||
|
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class AlertLevel(Enum):
|
class AlertLevel(Enum):
|
||||||
INFO = "info"
|
INFO = "info"
|
||||||
WARNING = "warning"
|
WARNING = "warning"
|
||||||
CRITICAL = "critical"
|
CRITICAL = "critical"
|
||||||
EMERGENCY = "emergency"
|
EMERGENCY = "emergency"
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class WaterAlert:
|
class WaterAlert:
|
||||||
station_code: str
|
station_code: str
|
||||||
@@ -42,9 +47,10 @@ class WaterAlert:
|
|||||||
timestamp: Optional[datetime.datetime] = None
|
timestamp: Optional[datetime.datetime] = None
|
||||||
message: Optional[str] = None
|
message: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class MatrixNotifier:
|
class MatrixNotifier:
|
||||||
def __init__(self, homeserver: str, access_token: str, room_id: str):
|
def __init__(self, homeserver: str, access_token: str, room_id: str):
|
||||||
self.homeserver = homeserver.rstrip('/')
|
self.homeserver = homeserver.rstrip("/")
|
||||||
self.access_token = access_token
|
self.access_token = access_token
|
||||||
self.room_id = room_id
|
self.room_id = room_id
|
||||||
self.session = requests.Session()
|
self.session = requests.Session()
|
||||||
@@ -56,15 +62,9 @@ class MatrixNotifier:
|
|||||||
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
||||||
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}"
|
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}"
|
||||||
|
|
||||||
headers = {
|
headers = {"Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json"}
|
||||||
"Authorization": f"Bearer {self.access_token}",
|
|
||||||
"Content-Type": "application/json"
|
|
||||||
}
|
|
||||||
|
|
||||||
data = {
|
data = {"msgtype": msgtype, "body": message}
|
||||||
"msgtype": msgtype,
|
|
||||||
"body": message
|
|
||||||
}
|
|
||||||
|
|
||||||
# Matrix API requires PUT when transaction ID is in the URL path
|
# Matrix API requires PUT when transaction ID is in the URL path
|
||||||
response = self.session.put(url, headers=headers, json=data, timeout=10)
|
response = self.session.put(url, headers=headers, json=data, timeout=10)
|
||||||
@@ -83,7 +83,7 @@ class MatrixNotifier:
|
|||||||
AlertLevel.INFO: "ℹ️",
|
AlertLevel.INFO: "ℹ️",
|
||||||
AlertLevel.WARNING: "⚠️",
|
AlertLevel.WARNING: "⚠️",
|
||||||
AlertLevel.CRITICAL: "🚨",
|
AlertLevel.CRITICAL: "🚨",
|
||||||
AlertLevel.EMERGENCY: "🆘"
|
AlertLevel.EMERGENCY: "🆘",
|
||||||
}
|
}
|
||||||
|
|
||||||
emoji = emoji_map.get(alert.level, "📊")
|
emoji = emoji_map.get(alert.level, "📊")
|
||||||
@@ -108,20 +108,38 @@ class MatrixNotifier:
|
|||||||
if alert.message:
|
if alert.message:
|
||||||
message += f"\n**Details:** {alert.message}\n"
|
message += f"\n**Details:** {alert.message}\n"
|
||||||
|
|
||||||
message += f"\n📈 View dashboard: {os.getenv('GRAFANA_URL', 'http://localhost:3000')}"
|
# Add Grafana dashboard link
|
||||||
|
grafana_url = (
|
||||||
|
"https://metrics.b4l.co.th/d/ac9b26b7-d898-49bd-ad8e-32f0496f6741/psql-water"
|
||||||
|
"?orgId=1&from=now-30d&to=now&timezone=browser"
|
||||||
|
)
|
||||||
|
message += f"\n📈 **View Dashboard:** {grafana_url}"
|
||||||
|
|
||||||
return self.send_message(message)
|
return self.send_message(message)
|
||||||
|
|
||||||
|
|
||||||
class WaterLevelAlertSystem:
|
class WaterLevelAlertSystem:
|
||||||
|
# Stations upstream of Chiang Mai (and CNX itself) to monitor
|
||||||
|
UPSTREAM_STATIONS = {
|
||||||
|
"P.20", # Ban Chiang Dao
|
||||||
|
"P.75", # Ban Chai Lat
|
||||||
|
"P.92", # Ban Muang Aut
|
||||||
|
"P.4A", # Ban Mae Taeng
|
||||||
|
"P.67", # Ban Tae
|
||||||
|
"P.21", # Ban Rim Tai
|
||||||
|
"P.103", # Ring Bridge 3
|
||||||
|
"P.1", # Nawarat Bridge (Chiang Mai)
|
||||||
|
}
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.db_adapter = None
|
self.db_adapter = None
|
||||||
self.matrix_notifier = None
|
self.matrix_notifier = None
|
||||||
self.thresholds = self._load_thresholds()
|
self.thresholds = self._load_thresholds()
|
||||||
|
|
||||||
# Matrix configuration from environment
|
# Matrix configuration from environment
|
||||||
matrix_homeserver = os.getenv('MATRIX_HOMESERVER', 'https://matrix.org')
|
matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
|
||||||
matrix_token = os.getenv('MATRIX_ACCESS_TOKEN')
|
matrix_token = os.getenv("MATRIX_ACCESS_TOKEN")
|
||||||
matrix_room = os.getenv('MATRIX_ROOM_ID')
|
matrix_room = os.getenv("MATRIX_ROOM_ID")
|
||||||
|
|
||||||
if matrix_token and matrix_room:
|
if matrix_token and matrix_room:
|
||||||
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
|
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
|
||||||
@@ -147,7 +165,7 @@ class WaterLevelAlertSystem:
|
|||||||
# Keep legacy thresholds for compatibility
|
# Keep legacy thresholds for compatibility
|
||||||
"warning": 3.7,
|
"warning": 3.7,
|
||||||
"critical": 4.3,
|
"critical": 4.3,
|
||||||
"emergency": 4.8
|
"emergency": 4.8,
|
||||||
},
|
},
|
||||||
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
|
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
|
||||||
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
|
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
|
||||||
@@ -156,7 +174,7 @@ class WaterLevelAlertSystem:
|
|||||||
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
|
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
|
||||||
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
|
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
|
||||||
# Default for unknown stations
|
# Default for unknown stations
|
||||||
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0}
|
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0},
|
||||||
}
|
}
|
||||||
|
|
||||||
def connect_database(self):
|
def connect_database(self):
|
||||||
@@ -164,8 +182,7 @@ class WaterLevelAlertSystem:
|
|||||||
try:
|
try:
|
||||||
db_config = Config.get_database_config()
|
db_config = Config.get_database_config()
|
||||||
self.db_adapter = create_database_adapter(
|
self.db_adapter = create_database_adapter(
|
||||||
db_config['type'],
|
db_config["type"], connection_string=db_config["connection_string"]
|
||||||
connection_string=db_config['connection_string']
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.db_adapter.connect():
|
if self.db_adapter.connect():
|
||||||
@@ -192,14 +209,18 @@ class WaterLevelAlertSystem:
|
|||||||
measurements = self.db_adapter.get_latest_measurements(limit=50)
|
measurements = self.db_adapter.get_latest_measurements(limit=50)
|
||||||
|
|
||||||
for measurement in measurements:
|
for measurement in measurements:
|
||||||
station_code = measurement.get('station_code', 'UNKNOWN')
|
station_code = measurement.get("station_code", "UNKNOWN")
|
||||||
water_level = measurement.get('water_level')
|
water_level = measurement.get("water_level")
|
||||||
|
|
||||||
if not water_level:
|
if not water_level:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Only alert for upstream stations and Chiang Mai
|
||||||
|
if station_code not in self.UPSTREAM_STATIONS:
|
||||||
|
continue
|
||||||
|
|
||||||
# Get thresholds for this station
|
# Get thresholds for this station
|
||||||
station_thresholds = self.thresholds.get(station_code, self.thresholds['default'])
|
station_thresholds = self.thresholds.get(station_code, self.thresholds["default"])
|
||||||
|
|
||||||
# Check each threshold level
|
# Check each threshold level
|
||||||
alert_level = None
|
alert_level = None
|
||||||
@@ -207,7 +228,7 @@ class WaterLevelAlertSystem:
|
|||||||
alert_type = None
|
alert_type = None
|
||||||
|
|
||||||
# Special handling for P.1 with zone-based thresholds
|
# Special handling for P.1 with zone-based thresholds
|
||||||
if station_code == "P.1" and 'zone_1' in station_thresholds:
|
if station_code == "P.1" and "zone_1" in station_thresholds:
|
||||||
# Check all zones in reverse order (highest to lowest)
|
# Check all zones in reverse order (highest to lowest)
|
||||||
zones = [
|
zones = [
|
||||||
("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"),
|
("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"),
|
||||||
@@ -230,29 +251,29 @@ class WaterLevelAlertSystem:
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
# Standard threshold checking for other stations
|
# Standard threshold checking for other stations
|
||||||
if water_level >= station_thresholds.get('emergency', float('inf')):
|
if water_level >= station_thresholds.get("emergency", float("inf")):
|
||||||
alert_level = AlertLevel.EMERGENCY
|
alert_level = AlertLevel.EMERGENCY
|
||||||
threshold_value = station_thresholds['emergency']
|
threshold_value = station_thresholds["emergency"]
|
||||||
alert_type = "Emergency Water Level"
|
alert_type = "Emergency Water Level"
|
||||||
elif water_level >= station_thresholds.get('critical', float('inf')):
|
elif water_level >= station_thresholds.get("critical", float("inf")):
|
||||||
alert_level = AlertLevel.CRITICAL
|
alert_level = AlertLevel.CRITICAL
|
||||||
threshold_value = station_thresholds['critical']
|
threshold_value = station_thresholds["critical"]
|
||||||
alert_type = "Critical Water Level"
|
alert_type = "Critical Water Level"
|
||||||
elif water_level >= station_thresholds.get('warning', float('inf')):
|
elif water_level >= station_thresholds.get("warning", float("inf")):
|
||||||
alert_level = AlertLevel.WARNING
|
alert_level = AlertLevel.WARNING
|
||||||
threshold_value = station_thresholds['warning']
|
threshold_value = station_thresholds["warning"]
|
||||||
alert_type = "High Water Level"
|
alert_type = "High Water Level"
|
||||||
|
|
||||||
if alert_level:
|
if alert_level:
|
||||||
alert = WaterAlert(
|
alert = WaterAlert(
|
||||||
station_code=station_code,
|
station_code=station_code,
|
||||||
station_name=measurement.get('station_name_th', f'Station {station_code}'),
|
station_name=measurement.get("station_name_th", f"Station {station_code}"),
|
||||||
alert_type=alert_type,
|
alert_type=alert_type,
|
||||||
level=alert_level,
|
level=alert_level,
|
||||||
water_level=water_level,
|
water_level=water_level,
|
||||||
threshold=threshold_value,
|
threshold=threshold_value,
|
||||||
discharge=measurement.get('discharge'),
|
discharge=measurement.get("discharge"),
|
||||||
timestamp=measurement.get('timestamp')
|
timestamp=measurement.get("timestamp"),
|
||||||
)
|
)
|
||||||
alerts.append(alert)
|
alerts.append(alert)
|
||||||
|
|
||||||
@@ -261,7 +282,7 @@ class WaterLevelAlertSystem:
|
|||||||
|
|
||||||
return alerts
|
return alerts
|
||||||
|
|
||||||
def check_data_freshness(self, max_age_hours: int = 2) -> List[WaterAlert]:
|
def check_data_freshness(self, max_age_hours: int = 4) -> List[WaterAlert]:
|
||||||
"""Check if data is fresh enough"""
|
"""Check if data is fresh enough"""
|
||||||
alerts = []
|
alerts = []
|
||||||
|
|
||||||
@@ -273,21 +294,21 @@ class WaterLevelAlertSystem:
|
|||||||
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
|
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
|
||||||
|
|
||||||
for measurement in measurements:
|
for measurement in measurements:
|
||||||
timestamp = measurement.get('timestamp')
|
timestamp = measurement.get("timestamp")
|
||||||
if timestamp and timestamp < cutoff_time:
|
if timestamp and timestamp < cutoff_time:
|
||||||
station_code = measurement.get('station_code', 'UNKNOWN')
|
station_code = measurement.get("station_code", "UNKNOWN")
|
||||||
|
|
||||||
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
|
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
|
||||||
|
|
||||||
alert = WaterAlert(
|
alert = WaterAlert(
|
||||||
station_code=station_code,
|
station_code=station_code,
|
||||||
station_name=measurement.get('station_name_th', f'Station {station_code}'),
|
station_name=measurement.get("station_name_th", f"Station {station_code}"),
|
||||||
alert_type="Stale Data",
|
alert_type="Stale Data",
|
||||||
level=AlertLevel.WARNING,
|
level=AlertLevel.WARNING,
|
||||||
water_level=measurement.get('water_level', 0),
|
water_level=measurement.get("water_level", 0),
|
||||||
threshold=max_age_hours,
|
threshold=max_age_hours,
|
||||||
timestamp=timestamp,
|
timestamp=timestamp,
|
||||||
message=f"No fresh data for {age_hours:.1f} hours"
|
message=f"No fresh data for {age_hours:.1f} hours",
|
||||||
)
|
)
|
||||||
alerts.append(alert)
|
alerts.append(alert)
|
||||||
|
|
||||||
@@ -307,15 +328,11 @@ class WaterLevelAlertSystem:
|
|||||||
# Define rate-of-change thresholds (meters per hour)
|
# Define rate-of-change thresholds (meters per hour)
|
||||||
rate_thresholds = {
|
rate_thresholds = {
|
||||||
"P.1": {
|
"P.1": {
|
||||||
"warning": 0.15, # 15cm/hour - moderate rise
|
"warning": 0.15, # 15cm/hour - moderate rise
|
||||||
"critical": 0.25, # 25cm/hour - rapid rise
|
"critical": 0.25, # 25cm/hour - rapid rise
|
||||||
"emergency": 0.40 # 40cm/hour - very rapid rise
|
"emergency": 0.40, # 40cm/hour - very rapid rise
|
||||||
},
|
},
|
||||||
"default": {
|
"default": {"warning": 0.20, "critical": 0.35, "emergency": 0.50},
|
||||||
"warning": 0.20,
|
|
||||||
"critical": 0.35,
|
|
||||||
"emergency": 0.50
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Get recent measurements for each station
|
# Get recent measurements for each station
|
||||||
@@ -323,48 +340,34 @@ class WaterLevelAlertSystem:
|
|||||||
|
|
||||||
# Get unique stations from latest data
|
# Get unique stations from latest data
|
||||||
latest = self.db_adapter.get_latest_measurements(limit=20)
|
latest = self.db_adapter.get_latest_measurements(limit=20)
|
||||||
station_ids = set(m.get('station_id') for m in latest if m.get('station_id'))
|
station_codes = set(m.get("station_code") for m in latest if m.get("station_code"))
|
||||||
|
|
||||||
for station_id in station_ids:
|
for station_code in station_codes:
|
||||||
try:
|
try:
|
||||||
# Get measurements for this station in the time window using database adapter
|
# Only alert for upstream stations and Chiang Mai
|
||||||
# Try direct connection for SQLite/PostgreSQL
|
if station_code not in self.UPSTREAM_STATIONS:
|
||||||
results = []
|
continue
|
||||||
|
|
||||||
try:
|
# Get measurements for this station in the time window
|
||||||
import sqlite3
|
current_time = datetime.datetime.now()
|
||||||
import psycopg2
|
measurements = self.db_adapter.get_measurements_by_timerange(
|
||||||
|
start_time=cutoff_time, end_time=current_time, station_codes=[station_code]
|
||||||
|
)
|
||||||
|
|
||||||
# Check if we have a connection object (SQLite or PostgreSQL)
|
if len(measurements) < 2:
|
||||||
if hasattr(self.db_adapter, 'conn') and self.db_adapter.conn:
|
|
||||||
# SQLite/PostgreSQL style query
|
|
||||||
query = """
|
|
||||||
SELECT timestamp, water_level, station_id
|
|
||||||
FROM water_measurements
|
|
||||||
WHERE station_id = ? AND timestamp >= ?
|
|
||||||
ORDER BY timestamp ASC
|
|
||||||
"""
|
|
||||||
|
|
||||||
cursor = self.db_adapter.conn.cursor()
|
|
||||||
cursor.execute(query, (station_id, cutoff_time))
|
|
||||||
results = cursor.fetchall()
|
|
||||||
except:
|
|
||||||
# Fallback: use get_latest_measurements and filter
|
|
||||||
all_measurements = self.db_adapter.get_latest_measurements(limit=500)
|
|
||||||
results = []
|
|
||||||
for m in all_measurements:
|
|
||||||
if m.get('station_id') == station_id and m.get('timestamp') and m.get('timestamp') >= cutoff_time:
|
|
||||||
results.append((m['timestamp'], m['water_level'], m.get('station_id')))
|
|
||||||
|
|
||||||
if len(results) < 2:
|
|
||||||
continue # Need at least 2 points to calculate rate
|
continue # Need at least 2 points to calculate rate
|
||||||
|
|
||||||
# Get oldest and newest measurements
|
# Sort by timestamp
|
||||||
oldest = results[0]
|
measurements = sorted(measurements, key=lambda m: m.get("timestamp"))
|
||||||
newest = results[-1]
|
|
||||||
|
|
||||||
oldest_time, oldest_level, _ = oldest
|
# Get oldest and newest measurements
|
||||||
newest_time, newest_level, _ = newest
|
oldest = measurements[0]
|
||||||
|
newest = measurements[-1]
|
||||||
|
|
||||||
|
oldest_time = oldest.get("timestamp")
|
||||||
|
oldest_level = oldest.get("water_level")
|
||||||
|
newest_time = newest.get("timestamp")
|
||||||
|
newest_level = newest.get("water_level")
|
||||||
|
|
||||||
# Convert timestamp strings to datetime if needed
|
# Convert timestamp strings to datetime if needed
|
||||||
if isinstance(oldest_time, str):
|
if isinstance(oldest_time, str):
|
||||||
@@ -385,47 +388,49 @@ class WaterLevelAlertSystem:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# Get station info from latest data
|
# Get station info from latest data
|
||||||
station_info = next((m for m in latest if m.get('station_id') == station_id), {})
|
station_info = next((m for m in latest if m.get("station_code") == station_code), {})
|
||||||
station_code = station_info.get('station_code', f'Station {station_id}')
|
station_name = station_info.get("station_name_th", station_code)
|
||||||
station_name = station_info.get('station_name_th', station_code)
|
|
||||||
|
|
||||||
# Get thresholds for this station
|
# Get thresholds for this station
|
||||||
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds['default'])
|
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds["default"])
|
||||||
|
|
||||||
alert_level = None
|
alert_level = None
|
||||||
threshold_value = None
|
threshold_value = None
|
||||||
alert_type = None
|
alert_type = None
|
||||||
|
|
||||||
if rate_per_hour >= station_rate_threshold['emergency']:
|
if rate_per_hour >= station_rate_threshold["emergency"]:
|
||||||
alert_level = AlertLevel.EMERGENCY
|
alert_level = AlertLevel.EMERGENCY
|
||||||
threshold_value = station_rate_threshold['emergency']
|
threshold_value = station_rate_threshold["emergency"]
|
||||||
alert_type = "Very Rapid Water Level Rise"
|
alert_type = "Very Rapid Water Level Rise"
|
||||||
elif rate_per_hour >= station_rate_threshold['critical']:
|
elif rate_per_hour >= station_rate_threshold["critical"]:
|
||||||
alert_level = AlertLevel.CRITICAL
|
alert_level = AlertLevel.CRITICAL
|
||||||
threshold_value = station_rate_threshold['critical']
|
threshold_value = station_rate_threshold["critical"]
|
||||||
alert_type = "Rapid Water Level Rise"
|
alert_type = "Rapid Water Level Rise"
|
||||||
elif rate_per_hour >= station_rate_threshold['warning']:
|
elif rate_per_hour >= station_rate_threshold["warning"]:
|
||||||
alert_level = AlertLevel.WARNING
|
alert_level = AlertLevel.WARNING
|
||||||
threshold_value = station_rate_threshold['warning']
|
threshold_value = station_rate_threshold["warning"]
|
||||||
alert_type = "Moderate Water Level Rise"
|
alert_type = "Moderate Water Level Rise"
|
||||||
|
|
||||||
if alert_level:
|
if alert_level:
|
||||||
message = f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h (change: {level_change:+.2f}m)"
|
message = (
|
||||||
|
f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h "
|
||||||
|
f"(change: {level_change:+.2f}m)"
|
||||||
|
)
|
||||||
|
|
||||||
alert = WaterAlert(
|
alert = WaterAlert(
|
||||||
station_code=station_code,
|
station_code=station_code,
|
||||||
station_name=station_name or f'Station {station_code}',
|
station_name=station_name or f"Station {station_code}",
|
||||||
alert_type=alert_type,
|
alert_type=alert_type,
|
||||||
level=alert_level,
|
level=alert_level,
|
||||||
water_level=newest_level,
|
water_level=newest_level,
|
||||||
threshold=threshold_value,
|
threshold=threshold_value,
|
||||||
timestamp=newest_time,
|
timestamp=newest_time,
|
||||||
message=message
|
message=message,
|
||||||
)
|
)
|
||||||
alerts.append(alert)
|
alerts.append(alert)
|
||||||
|
|
||||||
except Exception as station_error:
|
except Exception as station_error:
|
||||||
logger.debug(f"Error checking rate of change for station {station_id}: {station_error}")
|
logger.debug(f"Error checking rate of change for station {station_code}: {station_error}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -480,9 +485,10 @@ class WaterLevelAlertSystem:
|
|||||||
"data_alerts": len(data_alerts),
|
"data_alerts": len(data_alerts),
|
||||||
"rate_alerts": len(rate_alerts),
|
"rate_alerts": len(rate_alerts),
|
||||||
"total_alerts": len(all_alerts),
|
"total_alerts": len(all_alerts),
|
||||||
"sent": sent_count
|
"sent": sent_count,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Standalone alerting check"""
|
"""Standalone alerting check"""
|
||||||
import argparse
|
import argparse
|
||||||
@@ -496,7 +502,10 @@ def main():
|
|||||||
|
|
||||||
if args.test:
|
if args.test:
|
||||||
if alerting.matrix_notifier:
|
if alerting.matrix_notifier:
|
||||||
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\nIf you received this, Matrix notifications are working correctly!"
|
test_message = (
|
||||||
|
"🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\n"
|
||||||
|
"If you received this, Matrix notifications are working correctly!"
|
||||||
|
)
|
||||||
success = alerting.matrix_notifier.send_message(test_message)
|
success = alerting.matrix_notifier.send_message(test_message)
|
||||||
print(f"Test message sent: {success}")
|
print(f"Test message sent: {success}")
|
||||||
else:
|
else:
|
||||||
@@ -509,5 +518,6 @@ def main():
|
|||||||
else:
|
else:
|
||||||
print("Use --check or --test")
|
print("Use --check or --test")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
17
src/main.py
17
src/main.py
@@ -64,7 +64,7 @@ def run_test_cycle():
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def run_continuous_monitoring():
|
def run_continuous_monitoring():
|
||||||
"""Run continuous monitoring with adaptive scheduling"""
|
"""Run continuous monitoring with adaptive scheduling and alerting"""
|
||||||
logger.info("Starting continuous monitoring...")
|
logger.info("Starting continuous monitoring...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -75,11 +75,16 @@ def run_continuous_monitoring():
|
|||||||
db_config = Config.get_database_config()
|
db_config = Config.get_database_config()
|
||||||
scraper = EnhancedWaterMonitorScraper(db_config)
|
scraper = EnhancedWaterMonitorScraper(db_config)
|
||||||
|
|
||||||
|
# Initialize alerting system
|
||||||
|
from .alerting import WaterLevelAlertSystem
|
||||||
|
alerting = WaterLevelAlertSystem()
|
||||||
|
|
||||||
# Setup signal handlers
|
# Setup signal handlers
|
||||||
setup_signal_handlers(scraper)
|
setup_signal_handlers(scraper)
|
||||||
|
|
||||||
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
|
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
|
||||||
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
|
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
|
||||||
|
logger.info("Alerts: automatic check after each successful data fetch")
|
||||||
logger.info("Press Ctrl+C to stop")
|
logger.info("Press Ctrl+C to stop")
|
||||||
|
|
||||||
# Run initial cycle
|
# Run initial cycle
|
||||||
@@ -109,6 +114,16 @@ def run_continuous_monitoring():
|
|||||||
|
|
||||||
if success:
|
if success:
|
||||||
last_successful_fetch = current_time
|
last_successful_fetch = current_time
|
||||||
|
|
||||||
|
# Run alert check after every successful new data fetch
|
||||||
|
logger.info("Running alert check...")
|
||||||
|
try:
|
||||||
|
alert_results = alerting.run_alert_check()
|
||||||
|
if alert_results.get('total_alerts', 0) > 0:
|
||||||
|
logger.info(f"Alerts: {alert_results['total_alerts']} generated, {alert_results['sent']} sent")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Alert check failed: {e}")
|
||||||
|
|
||||||
if retry_mode:
|
if retry_mode:
|
||||||
logger.info("✅ Data fetch successful - switching back to hourly schedule")
|
logger.info("✅ Data fetch successful - switching back to hourly schedule")
|
||||||
retry_mode = False
|
retry_mode = False
|
||||||
|
383
tests/test_alerting.py
Normal file
383
tests/test_alerting.py
Normal file
@@ -0,0 +1,383 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Comprehensive tests for the alerting system
|
||||||
|
Tests both zone-based and rate-of-change alerts
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import datetime
|
||||||
|
import sqlite3
|
||||||
|
import time
|
||||||
|
import gc
|
||||||
|
|
||||||
|
# Add src directory to path
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
|
from src.alerting import WaterLevelAlertSystem, AlertLevel
|
||||||
|
from src.database_adapters import create_database_adapter
|
||||||
|
|
||||||
|
|
||||||
|
def setup_test_database(test_name='default'):
|
||||||
|
"""Create a test database with sample data"""
|
||||||
|
db_path = f'test_alerts_{test_name}.db'
|
||||||
|
|
||||||
|
# Remove existing test database
|
||||||
|
if os.path.exists(db_path):
|
||||||
|
try:
|
||||||
|
os.remove(db_path)
|
||||||
|
except PermissionError:
|
||||||
|
# If locked, use a different name with timestamp
|
||||||
|
import random
|
||||||
|
db_path = f'test_alerts_{test_name}_{random.randint(1000, 9999)}.db'
|
||||||
|
|
||||||
|
# Create new database
|
||||||
|
conn = sqlite3.connect(db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Create stations table
|
||||||
|
cursor.execute("""
|
||||||
|
CREATE TABLE stations (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
station_code TEXT NOT NULL UNIQUE,
|
||||||
|
english_name TEXT,
|
||||||
|
thai_name TEXT,
|
||||||
|
latitude REAL,
|
||||||
|
longitude REAL,
|
||||||
|
basin TEXT,
|
||||||
|
province TEXT,
|
||||||
|
status TEXT DEFAULT 'active',
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Create water_measurements table
|
||||||
|
cursor.execute("""
|
||||||
|
CREATE TABLE water_measurements (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
timestamp DATETIME NOT NULL,
|
||||||
|
station_id INTEGER NOT NULL,
|
||||||
|
water_level REAL NOT NULL,
|
||||||
|
discharge REAL,
|
||||||
|
discharge_percent REAL,
|
||||||
|
status TEXT DEFAULT 'active',
|
||||||
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
FOREIGN KEY (station_id) REFERENCES stations (id)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Insert P.1 station (id=8 to match existing data)
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO stations (id, station_code, english_name, thai_name, basin, province)
|
||||||
|
VALUES (8, 'P.1', 'Nawarat Bridge', 'สะพานนวรัฐ', 'Ping', 'Chiang Mai')
|
||||||
|
""")
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
return db_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_zone_level_alerts():
|
||||||
|
"""Test that zone-based alerts trigger correctly"""
|
||||||
|
print("="*70)
|
||||||
|
print("TEST 1: Zone-Based Water Level Alerts")
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
db_path = setup_test_database('zone_tests')
|
||||||
|
|
||||||
|
# Test cases for P.1 zone thresholds
|
||||||
|
test_cases = [
|
||||||
|
(2.5, None, "Below all zones"),
|
||||||
|
(3.7, AlertLevel.INFO, "Zone 1"),
|
||||||
|
(3.9, AlertLevel.INFO, "Zone 2"),
|
||||||
|
(4.0, AlertLevel.WARNING, "Zone 3"),
|
||||||
|
(4.2, AlertLevel.WARNING, "Zone 5"),
|
||||||
|
(4.3, AlertLevel.CRITICAL, "Zone 6"),
|
||||||
|
(4.6, AlertLevel.CRITICAL, "Zone 7"),
|
||||||
|
(4.8, AlertLevel.EMERGENCY, "Zone 8/NewEdge"),
|
||||||
|
(5.0, AlertLevel.EMERGENCY, "Above all zones"),
|
||||||
|
]
|
||||||
|
|
||||||
|
print("\nTesting P.1 (Nawarat Bridge) zone thresholds:")
|
||||||
|
print("-" * 70)
|
||||||
|
|
||||||
|
passed = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
for water_level, expected_level, zone_description in test_cases:
|
||||||
|
# Insert test data
|
||||||
|
conn = sqlite3.connect(db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("DELETE FROM water_measurements")
|
||||||
|
|
||||||
|
current_time = datetime.datetime.now()
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
|
||||||
|
VALUES (?, 8, ?, 350.0)
|
||||||
|
""", (current_time, water_level))
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Check alerts
|
||||||
|
alerting = WaterLevelAlertSystem()
|
||||||
|
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
|
||||||
|
alerting.db_adapter.connect()
|
||||||
|
|
||||||
|
alerts = alerting.check_water_levels()
|
||||||
|
|
||||||
|
# Verify result
|
||||||
|
if expected_level is None:
|
||||||
|
# Should not trigger any alert
|
||||||
|
if len(alerts) == 0:
|
||||||
|
print(f"[PASS] {water_level:.1f}m: {zone_description} - No alert")
|
||||||
|
passed += 1
|
||||||
|
else:
|
||||||
|
print(f"[FAIL] {water_level:.1f}m: {zone_description} - Unexpected alert")
|
||||||
|
failed += 1
|
||||||
|
else:
|
||||||
|
# Should trigger alert with specific level
|
||||||
|
if len(alerts) > 0 and alerts[0].level == expected_level:
|
||||||
|
print(f"[PASS] {water_level:.1f}m: {zone_description} - {expected_level.value.upper()} alert")
|
||||||
|
passed += 1
|
||||||
|
elif len(alerts) == 0:
|
||||||
|
print(f"[FAIL] {water_level:.1f}m: {zone_description} - No alert triggered")
|
||||||
|
failed += 1
|
||||||
|
else:
|
||||||
|
print(f"[FAIL] {water_level:.1f}m: {zone_description} - Wrong alert level: {alerts[0].level.value}")
|
||||||
|
failed += 1
|
||||||
|
|
||||||
|
print("-" * 70)
|
||||||
|
print(f"Zone Alert Tests: {passed} passed, {failed} failed")
|
||||||
|
|
||||||
|
# Cleanup - force garbage collection and wait briefly before removing file
|
||||||
|
gc.collect()
|
||||||
|
time.sleep(0.5)
|
||||||
|
try:
|
||||||
|
os.remove(db_path)
|
||||||
|
except PermissionError:
|
||||||
|
print(f"Warning: Could not remove test database {db_path}")
|
||||||
|
|
||||||
|
return failed == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_rate_of_change_alerts():
|
||||||
|
"""Test that rate-of-change alerts trigger correctly"""
|
||||||
|
print("\n" + "="*70)
|
||||||
|
print("TEST 2: Rate-of-Change Water Level Alerts")
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
db_path = setup_test_database('rate_tests')
|
||||||
|
|
||||||
|
# Test cases: (initial_level, final_level, hours_elapsed, expected_alert_level, description)
|
||||||
|
test_cases = [
|
||||||
|
(3.0, 3.1, 3.0, None, "Slow rise (0.03m/h)"),
|
||||||
|
(3.0, 3.5, 3.0, AlertLevel.WARNING, "Moderate rise (0.17m/h)"),
|
||||||
|
(3.0, 3.8, 3.0, AlertLevel.CRITICAL, "Rapid rise (0.27m/h)"),
|
||||||
|
(3.0, 4.2, 3.0, AlertLevel.EMERGENCY, "Very rapid rise (0.40m/h)"),
|
||||||
|
(4.0, 3.5, 3.0, None, "Falling water (negative rate)"),
|
||||||
|
]
|
||||||
|
|
||||||
|
print("\nTesting P.1 rate-of-change thresholds:")
|
||||||
|
print(" Warning: 0.15 m/h (15 cm/h)")
|
||||||
|
print(" Critical: 0.25 m/h (25 cm/h)")
|
||||||
|
print(" Emergency: 0.40 m/h (40 cm/h)")
|
||||||
|
print("-" * 70)
|
||||||
|
|
||||||
|
passed = 0
|
||||||
|
failed = 0
|
||||||
|
|
||||||
|
for initial_level, final_level, hours, expected_level, description in test_cases:
|
||||||
|
# Insert test data simulating water level change over time
|
||||||
|
conn = sqlite3.connect(db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute("DELETE FROM water_measurements")
|
||||||
|
|
||||||
|
current_time = datetime.datetime.now()
|
||||||
|
start_time = current_time - datetime.timedelta(hours=hours)
|
||||||
|
|
||||||
|
# Insert initial measurement
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
|
||||||
|
VALUES (?, 8, ?, 350.0)
|
||||||
|
""", (start_time, initial_level))
|
||||||
|
|
||||||
|
# Insert final measurement
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
|
||||||
|
VALUES (?, 8, ?, 380.0)
|
||||||
|
""", (current_time, final_level))
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Check rate-of-change alerts
|
||||||
|
alerting = WaterLevelAlertSystem()
|
||||||
|
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
|
||||||
|
alerting.db_adapter.connect()
|
||||||
|
|
||||||
|
rate_alerts = alerting.check_rate_of_change(lookback_hours=int(hours) + 1)
|
||||||
|
|
||||||
|
# Calculate actual rate for display
|
||||||
|
level_change = final_level - initial_level
|
||||||
|
rate = level_change / hours if hours > 0 else 0
|
||||||
|
|
||||||
|
# Verify result
|
||||||
|
if expected_level is None:
|
||||||
|
# Should not trigger any alert
|
||||||
|
if len(rate_alerts) == 0:
|
||||||
|
print(f"[PASS] {rate:+.2f}m/h: {description} - No alert")
|
||||||
|
passed += 1
|
||||||
|
else:
|
||||||
|
print(f"[FAIL] {rate:+.2f}m/h: {description} - Unexpected alert")
|
||||||
|
print(f" Alert: {rate_alerts[0].alert_type} - {rate_alerts[0].level.value}")
|
||||||
|
failed += 1
|
||||||
|
else:
|
||||||
|
# Should trigger alert with specific level
|
||||||
|
if len(rate_alerts) > 0 and rate_alerts[0].level == expected_level:
|
||||||
|
print(f"[PASS] {rate:+.2f}m/h: {description} - {expected_level.value.upper()} alert")
|
||||||
|
print(f" Message: {rate_alerts[0].message}")
|
||||||
|
passed += 1
|
||||||
|
elif len(rate_alerts) == 0:
|
||||||
|
print(f"[FAIL] {rate:+.2f}m/h: {description} - No alert triggered")
|
||||||
|
failed += 1
|
||||||
|
else:
|
||||||
|
print(f"[FAIL] {rate:+.2f}m/h: {description} - Wrong alert level: {rate_alerts[0].level.value}")
|
||||||
|
failed += 1
|
||||||
|
|
||||||
|
print("-" * 70)
|
||||||
|
print(f"Rate-of-Change Tests: {passed} passed, {failed} failed")
|
||||||
|
|
||||||
|
# Cleanup - force garbage collection and wait briefly before removing file
|
||||||
|
gc.collect()
|
||||||
|
time.sleep(0.5)
|
||||||
|
try:
|
||||||
|
os.remove(db_path)
|
||||||
|
except PermissionError:
|
||||||
|
print(f"Warning: Could not remove test database {db_path}")
|
||||||
|
|
||||||
|
return failed == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_combined_alerts():
|
||||||
|
"""Test scenario where both zone and rate-of-change alerts trigger"""
|
||||||
|
print("\n" + "="*70)
|
||||||
|
print("TEST 3: Combined Zone + Rate-of-Change Alerts")
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
db_path = setup_test_database('combined_tests')
|
||||||
|
|
||||||
|
print("\nScenario: Water rising rapidly from 3.5m to 4.5m over 3 hours")
|
||||||
|
print(" Expected: Both Zone 7 alert AND Critical rate-of-change alert")
|
||||||
|
print("-" * 70)
|
||||||
|
|
||||||
|
# Insert test data
|
||||||
|
conn = sqlite3.connect(db_path)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
current_time = datetime.datetime.now()
|
||||||
|
start_time = current_time - datetime.timedelta(hours=3)
|
||||||
|
|
||||||
|
# Water rising from 3.5m to 4.5m over 3 hours (0.33 m/h - Critical rate)
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
|
||||||
|
VALUES (?, 8, 3.5, 350.0)
|
||||||
|
""", (start_time,))
|
||||||
|
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge)
|
||||||
|
VALUES (?, 8, 4.5, 450.0)
|
||||||
|
""", (current_time,))
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
# Check both types of alerts
|
||||||
|
alerting = WaterLevelAlertSystem()
|
||||||
|
alerting.db_adapter = create_database_adapter('sqlite', connection_string=f'sqlite:///{db_path}')
|
||||||
|
alerting.db_adapter.connect()
|
||||||
|
|
||||||
|
zone_alerts = alerting.check_water_levels()
|
||||||
|
rate_alerts = alerting.check_rate_of_change(lookback_hours=4)
|
||||||
|
|
||||||
|
all_alerts = zone_alerts + rate_alerts
|
||||||
|
|
||||||
|
print(f"\nTotal alerts triggered: {len(all_alerts)}")
|
||||||
|
|
||||||
|
zone_alert_found = False
|
||||||
|
rate_alert_found = False
|
||||||
|
|
||||||
|
for alert in all_alerts:
|
||||||
|
print(f"\n Alert Type: {alert.alert_type}")
|
||||||
|
print(f" Severity: {alert.level.value.upper()}")
|
||||||
|
print(f" Water Level: {alert.water_level:.2f}m")
|
||||||
|
if alert.message:
|
||||||
|
print(f" Details: {alert.message}")
|
||||||
|
|
||||||
|
if "Zone" in alert.alert_type:
|
||||||
|
zone_alert_found = True
|
||||||
|
if "Rise" in alert.alert_type or "rate" in alert.alert_type.lower():
|
||||||
|
rate_alert_found = True
|
||||||
|
|
||||||
|
print("-" * 70)
|
||||||
|
|
||||||
|
if zone_alert_found and rate_alert_found:
|
||||||
|
print("[PASS] Combined Alert Test - Both alert types triggered")
|
||||||
|
success = True
|
||||||
|
else:
|
||||||
|
print("[FAIL] Combined Alert Test")
|
||||||
|
if not zone_alert_found:
|
||||||
|
print(" Missing: Zone-based alert")
|
||||||
|
if not rate_alert_found:
|
||||||
|
print(" Missing: Rate-of-change alert")
|
||||||
|
success = False
|
||||||
|
|
||||||
|
# Cleanup - force garbage collection and wait briefly before removing file
|
||||||
|
gc.collect()
|
||||||
|
time.sleep(0.5)
|
||||||
|
try:
|
||||||
|
os.remove(db_path)
|
||||||
|
except PermissionError:
|
||||||
|
print(f"Warning: Could not remove test database {db_path}")
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Run all alert tests"""
|
||||||
|
print("\n" + "="*70)
|
||||||
|
print("WATER LEVEL ALERTING SYSTEM - COMPREHENSIVE TESTS")
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
|
||||||
|
# Run tests
|
||||||
|
results.append(("Zone-Based Alerts", test_zone_level_alerts()))
|
||||||
|
results.append(("Rate-of-Change Alerts", test_rate_of_change_alerts()))
|
||||||
|
results.append(("Combined Alerts", test_combined_alerts()))
|
||||||
|
|
||||||
|
# Summary
|
||||||
|
print("\n" + "="*70)
|
||||||
|
print("TEST SUMMARY")
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
all_passed = True
|
||||||
|
for test_name, passed in results:
|
||||||
|
status = "PASS" if passed else "FAIL"
|
||||||
|
print(f"{test_name}: [{status}]")
|
||||||
|
if not passed:
|
||||||
|
all_passed = False
|
||||||
|
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
if all_passed:
|
||||||
|
print("\nAll tests PASSED!")
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
print("\nSome tests FAILED!")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
Reference in New Issue
Block a user