15 Commits

Author SHA1 Message Date
de632cef90 Add rate-of-change alerting for sudden water level increases
- Implement check_rate_of_change() to detect rapid water level rises
- Monitor water level changes over configurable lookback period (default 3 hours)
- Define rate-of-change thresholds for P.1 and other stations
- Alert on moderate (15cm/h), rapid (25cm/h), and very rapid (40cm/h) rises
- Only alert on rising water levels (positive rate of change)
- Integrate rate-of-change checks into run_alert_check() cycle
- Support both SQLite and PostgreSQL database adapters with fallback

Rate thresholds for P.1 (Nawarat Bridge):
- Warning: 0.15 m/h (15 cm/hour) - moderate rise
- Critical: 0.25 m/h (25 cm/hour) - rapid rise
- Emergency: 0.40 m/h (40 cm/hour) - very rapid rise

Default thresholds for other stations:
- Warning: 0.20 m/h, Critical: 0.35 m/h, Emergency: 0.50 m/h

Alert messages include:
- Rate of change in m/h and cm/h
- Total level change over period
- Time period analyzed

This early warning system detects dangerous trends before absolute
thresholds are reached, allowing for earlier response to flooding events.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:17:08 +07:00
e94b5b13f8 Fix Matrix API notification to use PUT method with transaction ID
- Change HTTP method from POST to PUT for Matrix API v3
- Matrix API requires PUT when transaction ID is included in URL path
- Move transaction ID construction before URL building for clarity
- Fixes "405 Method Not Allowed" error when sending notifications

The Matrix API v3 endpoint structure:
PUT /_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}

Previous error:
POST request was being rejected with 405 Method Not Allowed

Now working:
PUT request successfully sends messages to Matrix rooms

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:09:23 +07:00
c93d340f8e Implement zone-based alerting for P.1 (Nawarat Bridge) station
- Add 8 water level zones plus NewEdge threshold for P.1 station
- Zone 1: 3.7m (Info), Zone 2: 3.9m (Info)
- Zone 3-5: 4.0-4.2m (Warning levels)
- Zone 6-7: 4.3-4.6m (Critical levels)
- Zone 8/NewEdge: 4.8m (Emergency level)
- Implement special zone-based checking logic for P.1
- Maintain backward compatibility with standard warning/critical/emergency thresholds
- Keep standard threshold checking for other stations

Zone progression for P.1:
- 3.7m: Zone 1 alert (Info)
- 3.9m: Zone 2 alert (Info)
- 4.0m: Zone 3 alert (Warning)
- 4.1m: Zone 4 alert (Warning)
- 4.2m: Zone 5 alert (Warning)
- 4.3m: Zone 6 alert (Critical)
- 4.6m: Zone 7 alert (Critical)
- 4.8m: Zone 8/NewEdge alert (Emergency)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:04:12 +07:00
dff4dd067d Implement strict freshness detection without grace periods
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Remove tolerance windows and grace periods from data freshness checks
- Require data from current hour only - no exceptions or fallbacks
- If hourly check runs at 21:xx but only has data up to 20:xx, immediately switch to retry mode
- Simplify logic: latest_hour >= current_hour for fresh data
- Remove complex age calculations and tolerance conditions

This ensures the scheduler immediately detects when new hourly data
is not yet available and switches to minute-based retries without delay.

Behavior:
- 21:02 with data up to 21:xx → Fresh (continue hourly)
- 21:02 with data up to 20:xx → Stale (immediate retry mode)
- No grace periods, no tolerance windows, strict hour-based detection

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 21:03:03 +07:00
5c6a41b2b9 Enhance freshness detection to check for current hour data availability
- Modify _check_data_freshness() to verify current hour data exists
- If running at 20:00 but only have data up to 19:xx, consider it stale
- Add tolerance: accept previous hour data if within first 10 minutes
- Combine current hour check with age limit (≤2 hours) for robustness
- Add detailed logging for current vs latest hour comparison

This solves the core issue where scheduler stayed in hourly mode despite
missing the expected current hour data from the API.

Example scenarios:
- 20:57 with data up to 20:xx: Fresh (has current hour)
- 20:57 with data up to 19:xx: Stale (missing current hour) → Retry mode
- 20:05 with data up to 19:xx: Fresh (tolerance for early hour)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 20:58:19 +07:00
1c023369b3 Implement intelligent data freshness detection for adaptive scheduler
- Add _check_data_freshness() method to detect stale vs fresh data
- Consider data fresh only if latest timestamp is within 2 hours
- Modify run_scraping_cycle() to check data freshness, not just existence
- Return False for stale data to trigger adaptive scheduler retry mode
- Add detailed logging for data age and freshness decisions

This solves the issue where scheduler stayed in hourly mode despite getting
stale data from the API. Now it correctly detects when API returns old data
and switches to retry mode until fresh data becomes available.

Example behavior:
- Fresh data (0.6 hours old): Returns True, stays in hourly mode
- Stale data (68.6 hours old): Returns False, switches to retry mode

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 20:35:59 +07:00
60e70c2192 Fix validator to handle null discharge values properly
- Make discharge field optional in data validator
- Remove discharge from required fields list
- Add explicit null check for discharge before float conversion
- Prevent "float() argument must be a string or a real number, not 'NoneType'" errors
- Allow records with valid water levels but malformed/null discharge data

This completes the malformed data handling fix by updating the validator
to match the parser's new behavior of allowing null discharge values.

Before: Validator rejected records with null discharge
After: Validator accepts records with null discharge, validates only if present

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 18:55:20 +07:00
cc5c4522b8 Fix malformed discharge data handling to preserve water level data
- Change data parsing logic to make discharge data optional
- Water level data is now saved even when discharge values are malformed (e.g., "***")
- Handle malformed discharge values gracefully with null instead of skipping entire record
- Add specific handling for "***" discharge values from API
- Improve data completeness by not discarding valid water level measurements

Before: Entire station record was skipped if discharge was malformed
After: Water level data is preserved, discharge set to null for malformed values

Example fix:
- wlvalues8: 1.6 (valid) + qvalues8: "***" (malformed)
- Before: No record saved
- After: Record saved with water_level=1.6, discharge=null

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 18:41:39 +07:00
6846091522 Implement smart date selection for data fetching
- Add intelligent date selection based on current time
- Before 01:00: fetch yesterday's data only (API not updated yet)
- After 01:00: try today's data first, fallback to yesterday if needed
- Improve data availability by adapting to API update patterns
- Add comprehensive logging for date selection decisions

This ensures optimal data fetching regardless of the time of day:
- Early morning (00:00-00:59): fetches yesterday (reliable)
- Rest of day (01:00-23:59): tries today first, falls back to yesterday

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 18:30:53 +07:00
4cc792157f Implement adaptive scheduler with intelligent retry logic
- Replace fixed hourly schedule with adaptive scheduling system
- Switch to 1-minute retries when no data is available from API
- Return to hourly schedule once data is successfully fetched
- Fix data fetching to use yesterday's date (API has 1-day delay)
- Add comprehensive logging for scheduler mode changes
- Improve resilience against API data availability issues

The scheduler now intelligently adapts to data availability:
- Normal mode: hourly runs at top of each hour
- Retry mode: minute-based retries until data is available
- Automatic mode switching based on fetch success/failure

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 18:24:33 +07:00
0ff58ecb13 Add historical data import functionality
- Add import_historical_data() method to EnhancedWaterMonitorScraper
- Support date range imports with Buddhist calendar API format
- Add CLI arguments --import-historical and --force-overwrite
- Include API rate limiting and skip existing data option
- Enable importing years of historical water level data

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 14:46:54 +07:00
bd812ca5ca Improve scheduler to run immediately then wait for next full hour
Some checks failed
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been cancelled
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Has been cancelled
Security & Dependency Updates / Dependency Security Scan (push) Has been cancelled
Security & Dependency Updates / License Compliance (push) Has been cancelled
Security & Dependency Updates / Check for Dependency Updates (push) Has been cancelled
Security & Dependency Updates / Code Quality Metrics (push) Has been cancelled
Security & Dependency Updates / Security Summary (push) Has been cancelled
- Run initial data collection immediately on startup
- Calculate wait time to next full hour (e.g., 22:12 start waits until 23:00)
- Schedule subsequent runs at top of each hour (:00 minutes)
- Display next scheduled run time to user for better visibility

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-26 22:51:00 +07:00
ca730e484b Add comprehensive Matrix alerting system with Grafana integration
- Implement custom Python alerting system (src/alerting.py) with water level monitoring, data freshness checks, and Matrix notifications
- Add complete Grafana Matrix alerting setup guide (docs/GRAFANA_MATRIX_SETUP.md) with webhook configuration, alert rules, and notification policies
- Create Matrix quick start guide (docs/MATRIX_QUICK_START.md) for rapid deployment
- Integrate alerting commands into main application (--alert-check, --alert-test)
- Add Matrix configuration to environment variables (.env.example)
- Update Makefile with alerting targets (alert-check, alert-test)
- Enhance status command to show Matrix notification status
- Support station-specific water level thresholds and escalation rules
- Provide dual alerting approach: native Grafana alerts and custom Python system

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-26 16:18:02 +07:00
6c7c128b4d Major refactor: Migrate to uv, add PostgreSQL support, and comprehensive tooling
- **Migration to uv package manager**: Replace pip/requirements with modern pyproject.toml
  - Add pyproject.toml with complete dependency management
  - Update all scripts and Makefile to use uv commands
  - Maintain backward compatibility with existing workflows

- **PostgreSQL integration and migration tools**:
  - Enhanced config.py with automatic password URL encoding
  - Complete PostgreSQL setup scripts and documentation
  - High-performance SQLite to PostgreSQL migration tool (91x speed improvement)
  - Support for both connection strings and individual components

- **Executable distribution system**:
  - PyInstaller integration for standalone .exe creation
  - Automated build scripts with batch file generation
  - Complete packaging system for end-user distribution

- **Enhanced data management**:
  - Fix --fill-gaps command with proper method implementation
  - Add gap detection and historical data backfill capabilities
  - Implement data update functionality for existing records
  - Add comprehensive database adapter methods

- **Developer experience improvements**:
  - Password encoding tools for special characters
  - Interactive setup wizards for PostgreSQL configuration
  - Comprehensive documentation and migration guides
  - Automated testing and validation tools

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-26 15:10:10 +07:00
730cbac7ae fixed time import
Some checks failed
Release - Northern Thailand Ping River Monitor / Create Release (push) Successful in 5s
Release - Northern Thailand Ping River Monitor / Test Release Build (3.10) (push) Successful in 17s
Release - Northern Thailand Ping River Monitor / Test Release Build (3.11) (push) Successful in 14s
Release - Northern Thailand Ping River Monitor / Test Release Build (3.12) (push) Successful in 16s
Release - Northern Thailand Ping River Monitor / Test Release Build (3.9) (push) Successful in 14s
Release - Northern Thailand Ping River Monitor / Build Release Images (push) Successful in 33m48s
Release - Northern Thailand Ping River Monitor / Security Scan (push) Successful in 5s
Release - Northern Thailand Ping River Monitor / Test Release Deployment (push) Successful in 52s
Release - Northern Thailand Ping River Monitor / Notify Release (push) Successful in 2s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.10) (push) Failing after 45s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.11) (push) Failing after 26s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.12) (push) Failing after 1m32s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Build Docker Image (push) Has been skipped
CI/CD Pipeline - Northern Thailand Ping River Monitor / Test Suite (3.9) (push) Failing after 29s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Integration Test with Services (push) Has been skipped
CI/CD Pipeline - Northern Thailand Ping River Monitor / Code Quality (push) Successful in 12s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Staging (push) Has been skipped
CI/CD Pipeline - Northern Thailand Ping River Monitor / Deploy to Production (push) Has been skipped
CI/CD Pipeline - Northern Thailand Ping River Monitor / Cleanup (push) Successful in 1s
CI/CD Pipeline - Northern Thailand Ping River Monitor / Performance Test (push) Has been skipped
Security & Dependency Updates / Dependency Security Scan (push) Successful in 43s
Security & Dependency Updates / License Compliance (push) Successful in 15s
Security & Dependency Updates / Check for Dependency Updates (push) Successful in 1m31s
Security & Dependency Updates / Code Quality Metrics (push) Successful in 35s
Security & Dependency Updates / Security Summary (push) Successful in 7s
2025-08-14 10:49:31 +07:00
27 changed files with 7885 additions and 103 deletions

View File

@@ -2,7 +2,7 @@
# Copy this file to .env and customize for your environment
# Database Configuration
DB_TYPE=sqlite
DB_TYPE=postgresql
# Options: sqlite, mysql, postgresql, influxdb, victoriametrics
# SQLite Configuration (default)
@@ -20,8 +20,26 @@ INFLUX_DATABASE=ping_river_monitoring
INFLUX_USERNAME=
INFLUX_PASSWORD=
# PostgreSQL Configuration
POSTGRES_CONNECTION_STRING=postgresql://user:password@localhost:5432/ping_river_monitoring
# PostgreSQL Configuration (Remote Server)
# Option 1: Full connection string (URL encode special characters in password)
POSTGRES_CONNECTION_STRING=postgresql://username:url_encoded_password@your-postgres-host:5432/water_monitoring
# Option 2: Individual components (password will be automatically URL encoded)
POSTGRES_HOST=your-postgres-host
POSTGRES_PORT=5432
POSTGRES_DB=water_monitoring
POSTGRES_USER=username
POSTGRES_PASSWORD=your:password@with!special#chars
# Examples for connection string:
# - Local: postgresql://postgres:password@localhost:5432/water_monitoring
# - Remote: postgresql://user:pass@192.168.1.100:5432/water_monitoring
# - With special chars: postgresql://user:my%3Apass%40word@host:5432/db
# - With SSL: postgresql://user:pass@host:port/db?sslmode=require
# - Connection pooling: postgresql://user:pass@host:port/db?pool_size=20&max_overflow=0
# Special character URL encoding:
# : → %3A @ → %40 # → %23 ? → %3F & → %26 / → %2F % → %25
# MySQL Configuration
MYSQL_CONNECTION_STRING=mysql://user:password@localhost:3306/ping_river_monitoring
@@ -64,6 +82,18 @@ SMTP_PORT=587
SMTP_USERNAME=
SMTP_PASSWORD=
# Matrix Alerting Configuration
MATRIX_HOMESERVER=https://matrix.org
MATRIX_ACCESS_TOKEN=
MATRIX_ROOM_ID=
# Grafana Integration
GRAFANA_URL=http://localhost:3000
# Alert Configuration
ALERT_MAX_AGE_HOURS=2
ALERT_CHECK_INTERVAL_MINUTES=15
# Development Settings
DEBUG=false
DEVELOPMENT_MODE=false

2
.env.postgres Normal file
View File

@@ -0,0 +1,2 @@
DB_TYPE=postgresql
POSTGRES_CONNECTION_STRING=postgresql://postgres:password@localhost:5432/water_monitoring

165
MIGRATION_TO_UV.md Normal file
View File

@@ -0,0 +1,165 @@
# Migration to uv
This document describes the migration from traditional Python package management (pip + requirements.txt) to [uv](https://docs.astral.sh/uv/), a fast Python package installer and resolver.
## What Changed
### Files Added
- `pyproject.toml` - Modern Python project configuration combining dependencies and metadata
- `.python-version` - Specifies Python version for uv
- `scripts/setup_uv.sh` - Unix setup script for uv environment
- `scripts/setup_uv.bat` - Windows setup script for uv environment
- This migration guide
### Files Modified
- `Makefile` - Updated all commands to use `uv run` instead of direct Python execution
### Files That Can Be Removed (Optional)
- `requirements.txt` - Dependencies now in pyproject.toml
- `requirements-dev.txt` - Dev dependencies now in pyproject.toml
- `setup.py` - Configuration now in pyproject.toml
## Installation
### Install uv
**Unix/macOS:**
```bash
curl -LsSf https://astral.sh/uv/install.sh | sh
```
**Windows (PowerShell):**
```powershell
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
```
### Setup Project
**Unix/macOS:**
```bash
# Run the setup script
chmod +x scripts/setup_uv.sh
./scripts/setup_uv.sh
# Or manually:
uv sync
uv run pre-commit install
```
**Windows:**
```batch
REM Run the setup script
scripts\setup_uv.bat
REM Or manually:
uv sync
uv run pre-commit install
```
## New Workflow
### Common Commands
| Old Command | New Command | Description |
|-------------|-------------|-------------|
| `pip install -r requirements.txt` | `uv sync --no-dev` | Install production dependencies |
| `pip install -r requirements-dev.txt` | `uv sync` | Install all dependencies (including dev) |
| `python run.py` | `uv run python run.py` | Run the application |
| `pytest` | `uv run pytest` | Run tests |
| `black src/` | `uv run black src/` | Format code |
### Using the Makefile
The Makefile has been updated to use uv, so all existing commands work the same:
```bash
make install-dev # Install dev dependencies with uv
make test # Run tests with uv
make run-api # Start API server with uv
make lint # Lint code with uv
make format # Format code with uv
```
### Adding Dependencies
**Production dependency:**
```bash
uv add requests
```
**Development dependency:**
```bash
uv add --dev pytest
```
**Specific version:**
```bash
uv add "fastapi==0.104.1"
```
### Managing Python Versions
uv can automatically manage Python versions:
```bash
# Install and use Python 3.11
uv python install 3.11
uv sync
# Use specific Python version
uv sync --python 3.11
```
## Benefits of uv
1. **Speed** - 10-100x faster than pip
2. **Reliability** - Better dependency resolution
3. **Simplicity** - Single tool for packages and Python versions
4. **Reproducibility** - Lock file ensures consistent environments
5. **Modern** - Built-in support for pyproject.toml
## Troubleshooting
### Command not found
Make sure uv is in your PATH after installation. Restart your terminal or run:
```bash
source ~/.bashrc # or ~/.zshrc
```
### Lock file conflicts
If you encounter lock file issues:
```bash
rm uv.lock
uv sync
```
### Python version issues
Ensure the Python version in `.python-version` is available:
```bash
uv python list
uv python install 3.11 # if needed
```
## Rollback (if needed)
If you need to rollback to the old system:
1. Use the original requirements files:
```bash
pip install -r requirements.txt
pip install -r requirements-dev.txt
```
2. Revert the Makefile changes to use `python` instead of `uv run python`
3. Remove uv-specific files:
```bash
rm pyproject.toml .python-version uv.lock
rm -rf .venv # if created by uv
```
## Additional Resources
- [uv Documentation](https://docs.astral.sh/uv/)
- [Migration Guide](https://docs.astral.sh/uv/guides/projects/)
- [pyproject.toml Reference](https://packaging.python.org/en/latest/specifications/pyproject-toml/)

View File

@@ -21,39 +21,56 @@ help:
@echo " run Run the monitor in continuous mode"
@echo " run-api Run the web API server"
@echo " run-test Run a single test cycle"
@echo " run-status Show system status"
@echo ""
@echo "Alerting:"
@echo " alert-check Check water levels and send alerts"
@echo " alert-test Send test Matrix message"
@echo ""
@echo "Distribution:"
@echo " build-exe Build standalone executable"
@echo " package Build and create distribution package"
@echo ""
@echo "Docker:"
@echo " docker-build Build Docker image"
@echo " docker-run Run with Docker Compose"
@echo " docker-stop Stop Docker services"
@echo ""
@echo "Database:"
@echo " setup-postgres Setup PostgreSQL database"
@echo " test-postgres Test PostgreSQL connection"
@echo " encode-password URL encode password for connection string"
@echo " migrate-sqlite Migrate SQLite data to PostgreSQL"
@echo " migrate-fast Fast migration with 10K batch size"
@echo " analyze-sqlite Analyze SQLite database structure (dry run)"
@echo ""
@echo "Documentation:"
@echo " docs Generate documentation"
# Installation
install:
pip install -r requirements.txt
uv sync --no-dev
install-dev:
pip install -r requirements-dev.txt
pre-commit install
uv sync
uv run pre-commit install
# Testing
test:
python test_integration.py
python test_station_management.py
uv run python test_integration.py
uv run python test_station_management.py
test-cov:
pytest --cov=src --cov-report=html --cov-report=term
uv run pytest --cov=src --cov-report=html --cov-report=term
# Code quality
lint:
flake8 src/ --max-line-length=100
mypy src/
uv run flake8 src/ --max-line-length=100
uv run mypy src/
format:
black src/ *.py
isort src/ *.py
uv run black src/ *.py
uv run isort src/ *.py
# Cleanup
clean:
@@ -69,16 +86,23 @@ clean:
# Running
run:
python run.py
uv run python run.py
run-api:
python run.py --web-api
uv run python run.py --web-api
run-test:
python run.py --test
uv run python run.py --test
run-status:
python run.py --status
uv run python run.py --status
# Alerting
alert-check:
uv run python run.py --alert-check
alert-test:
uv run python run.py --alert-test
# Docker
docker-build:
@@ -99,7 +123,7 @@ docs:
# Database management
db-migrate:
python scripts/migrate_geolocation.py
uv run python scripts/migrate_geolocation.py
# Monitoring
health-check:
@@ -116,9 +140,38 @@ dev-setup: install-dev
# Production deployment
deploy-check:
python run.py --test
uv run python run.py --test
@echo "Deployment check passed!"
# Database management
setup-postgres:
uv run python scripts/setup_postgres.py
test-postgres:
uv run python -c "from scripts.setup_postgres import test_postgres_connection; from src.config import Config; config = Config.get_database_config(); test_postgres_connection(config['connection_string'])"
encode-password:
uv run python scripts/encode_password.py
migrate-sqlite:
uv run python scripts/migrate_sqlite_to_postgres.py
migrate-fast:
uv run python scripts/migrate_sqlite_to_postgres.py --fast
analyze-sqlite:
uv run python scripts/migrate_sqlite_to_postgres.py --dry-run
# Distribution
build-exe:
uv run python build_simple.py
package: build-exe
@echo "Creating distribution package..."
@if exist dist\ping-river-monitor-distribution.zip del dist\ping-river-monitor-distribution.zip
@cd dist && powershell -Command "Compress-Archive -Path * -DestinationPath ping-river-monitor-distribution.zip -Force"
@echo "✅ Distribution package created: dist/ping-river-monitor-distribution.zip"
# Git helpers
git-setup:
git remote add origin https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor.git
@@ -134,7 +187,7 @@ validate-workflows:
@echo "Validating Gitea Actions workflows..."
@for file in .gitea/workflows/*.yml; do \
echo "Checking $$file..."; \
python -c "import yaml; yaml.safe_load(open('$$file', encoding='utf-8'))" || exit 1; \
uv run python -c "import yaml; yaml.safe_load(open('$$file', encoding='utf-8'))" || exit 1; \
done
@echo "✅ All workflows are valid"

287
POSTGRESQL_SETUP.md Normal file
View File

@@ -0,0 +1,287 @@
# PostgreSQL Setup for Northern Thailand Ping River Monitor
This guide helps you configure PostgreSQL as the database backend for the water monitoring system.
## Prerequisites
- PostgreSQL server running on a remote machine (already available)
- Network connectivity to the PostgreSQL server
- Database credentials (username, password, host, port)
## Quick Setup
### 1. Configure Environment
Copy the example environment file and configure it:
```bash
cp .env.example .env
```
Edit `.env` and update the PostgreSQL configuration:
```bash
# Database Configuration
DB_TYPE=postgresql
# PostgreSQL Configuration (Remote Server)
POSTGRES_CONNECTION_STRING=postgresql://username:password@your-postgres-host:5432/water_monitoring
```
### 2. Run Setup Script
Use the interactive setup script:
```bash
# Using uv
uv run python scripts/setup_postgres.py
# Or using make
make setup-postgres
```
The script will:
- Test your database connection
- Create the database if it doesn't exist
- Initialize the required tables and indexes
- Set up sample monitoring stations
### 3. Test Connection
Test your PostgreSQL connection:
```bash
make test-postgres
```
### 4. Run the Application
Start collecting data:
```bash
# Run a test cycle
make run-test
# Start the web API
make run-api
```
## Manual Configuration
If you prefer manual setup, here's what you need:
### Connection String Format
```
postgresql://username:password@host:port/database
```
**Examples:**
- Basic: `postgresql://postgres:mypassword@192.168.1.100:5432/water_monitoring`
- With SSL: `postgresql://user:pass@host:5432/db?sslmode=require`
- With connection pooling: `postgresql://user:pass@host:5432/db?pool_size=20&max_overflow=0`
### Environment Variables
| Variable | Description | Example |
|----------|-------------|---------|
| `DB_TYPE` | Database type | `postgresql` |
| `POSTGRES_CONNECTION_STRING` | Full connection string | See above |
### Database Schema
The application uses these main tables:
1. **stations** - Monitoring station information
2. **water_measurements** - Time series water level data
3. **alert_thresholds** - Warning/danger level definitions
4. **data_quality_log** - Data collection issue tracking
See `sql/init_postgres.sql` for the complete schema.
## Connection Options
### SSL Connection
For secure connections, add SSL parameters:
```bash
POSTGRES_CONNECTION_STRING=postgresql://user:pass@host:5432/db?sslmode=require
```
SSL modes:
- `disable` - No SSL
- `require` - Require SSL
- `prefer` - Use SSL if available
- `verify-ca` - Verify certificate authority
- `verify-full` - Full certificate verification
### Connection Pooling
For high-performance applications, configure connection pooling:
```bash
POSTGRES_CONNECTION_STRING=postgresql://user:pass@host:5432/db?pool_size=20&max_overflow=0
```
Parameters:
- `pool_size` - Number of connections to maintain
- `max_overflow` - Additional connections allowed
- `pool_timeout` - Seconds to wait for connection
- `pool_recycle` - Seconds before connection refresh
## Troubleshooting
### Common Issues
**1. Connection Refused**
```
psycopg2.OperationalError: could not connect to server
```
- Check if PostgreSQL server is running
- Verify host/port in connection string
- Check firewall settings
**2. Authentication Failed**
```
psycopg2.OperationalError: FATAL: password authentication failed
```
- Verify username/password in connection string
- Check PostgreSQL pg_hba.conf configuration
- Ensure user has database access permissions
**3. Database Does Not Exist**
```
psycopg2.OperationalError: FATAL: database "water_monitoring" does not exist
```
- Run the setup script to create the database
- Or manually create: `CREATE DATABASE water_monitoring;`
**4. Permission Denied**
```
psycopg2.ProgrammingError: permission denied for table
```
- Ensure user has appropriate permissions
- Grant access: `GRANT ALL PRIVILEGES ON DATABASE water_monitoring TO username;`
### Network Configuration
For remote PostgreSQL servers, ensure:
1. **PostgreSQL allows remote connections** (`postgresql.conf`):
```
listen_addresses = '*'
port = 5432
```
2. **Client authentication is configured** (`pg_hba.conf`):
```
# Allow connections from your application server
host water_monitoring username your.app.ip/32 md5
```
3. **Firewall allows PostgreSQL port**:
```bash
# On PostgreSQL server
sudo ufw allow 5432/tcp
```
### Performance Tuning
For optimal performance with time series data:
1. **Increase work_mem** for sorting operations
2. **Tune shared_buffers** for caching
3. **Configure maintenance_work_mem** for indexing
4. **Set up regular VACUUM and ANALYZE** for statistics
Example PostgreSQL configuration additions:
```
# postgresql.conf
shared_buffers = 256MB
work_mem = 16MB
maintenance_work_mem = 256MB
effective_cache_size = 1GB
```
## Monitoring
### Check Application Status
```bash
# View current configuration
uv run python -c "from src.config import Config; Config.print_settings()"
# Test database connection
make test-postgres
# Check latest data
psql "postgresql://user:pass@host:5432/water_monitoring" -c "SELECT COUNT(*) FROM water_measurements;"
```
### PostgreSQL Monitoring
Connect directly to check database status:
```bash
# Connect to database
psql "postgresql://username:password@host:5432/water_monitoring"
# Check table sizes
\dt+
# View latest measurements
SELECT * FROM latest_measurements LIMIT 10;
# Check data quality
SELECT issue_type, COUNT(*) FROM data_quality_log
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY issue_type;
```
## Backup and Maintenance
### Backup Database
```bash
# Full backup
pg_dump "postgresql://user:pass@host:5432/water_monitoring" > backup.sql
# Data only
pg_dump --data-only "postgresql://user:pass@host:5432/water_monitoring" > data_backup.sql
```
### Restore Database
```bash
# Restore full backup
psql "postgresql://user:pass@host:5432/water_monitoring" < backup.sql
# Restore data only
psql "postgresql://user:pass@host:5432/water_monitoring" < data_backup.sql
```
### Regular Maintenance
Set up regular maintenance tasks:
```sql
-- Update table statistics (run weekly)
ANALYZE;
-- Reclaim disk space (run monthly)
VACUUM;
-- Reindex tables (run quarterly)
REINDEX DATABASE water_monitoring;
```
## Next Steps
1. Set up monitoring and alerting
2. Configure data retention policies
3. Set up automated backups
4. Implement connection pooling if needed
5. Configure SSL for production use
For more advanced configuration, see the [PostgreSQL documentation](https://www.postgresql.org/docs/).

278
SQLITE_MIGRATION.md Normal file
View File

@@ -0,0 +1,278 @@
# SQLite to PostgreSQL Migration Guide
This guide helps you migrate your existing SQLite water monitoring data to PostgreSQL.
## Quick Migration
### 1. Analyze Your SQLite Database (Optional)
First, check what's in your SQLite database:
```bash
# Analyze without migrating
make analyze-sqlite
# Or specify a specific SQLite file
uv run python scripts/migrate_sqlite_to_postgres.py --dry-run /path/to/your/database.db
```
### 2. Run the Migration
```bash
# Auto-detect SQLite file and migrate
make migrate-sqlite
# Or specify a specific SQLite file
uv run python scripts/migrate_sqlite_to_postgres.py /path/to/your/database.db
```
The migration tool will:
- ✅ Connect to both databases
- ✅ Analyze your SQLite schema automatically
- ✅ Migrate station information
- ✅ Migrate all measurement data in batches
- ✅ Handle different SQLite table structures
- ✅ Verify the migration results
- ✅ Generate a detailed log file
## What Gets Migrated
### Station Data
- Station IDs and codes
- Thai and English names
- Coordinates (latitude/longitude)
- Geohash data (if available)
- Creation/update timestamps
### Measurement Data
- Water level readings
- Discharge measurements
- Discharge percentages
- Timestamps
- Station associations
- Data quality status
## Supported SQLite Schemas
The migration tool automatically detects and handles various SQLite table structures:
### Modern Schema
```sql
-- Stations
stations: id, station_code, station_name_th, station_name_en, latitude, longitude, geohash
-- Measurements
water_measurements: timestamp, station_id, water_level, discharge, discharge_percent, status
```
### Legacy Schema
```sql
-- Stations
water_stations: station_id, station_code, station_name, lat, lon
-- Measurements
measurements: timestamp, station_id, water_level, discharge, discharge_percent
```
### Simple Schema
```sql
-- Any table with basic water level data
-- The tool will adapt and map columns automatically
```
## Migration Process
### Step 1: Database Connection
- Connects to your SQLite database
- Verifies PostgreSQL connection
- Validates configuration
### Step 2: Schema Analysis
- Scans SQLite tables and columns
- Reports data counts
- Identifies table structures
### Step 3: Station Migration
- Extracts station metadata
- Maps to PostgreSQL format
- Handles missing data gracefully
### Step 4: Measurement Migration
- Processes data in batches (1000 records at a time)
- Converts timestamps correctly
- Preserves all measurement values
- Shows progress during migration
### Step 5: Verification
- Compares record counts
- Validates data integrity
- Reports migration statistics
## Command Options
```bash
# Basic migration (auto-detects SQLite file)
uv run python scripts/migrate_sqlite_to_postgres.py
# Specify SQLite database path
uv run python scripts/migrate_sqlite_to_postgres.py /path/to/database.db
# Dry run (analyze only, no migration)
uv run python scripts/migrate_sqlite_to_postgres.py --dry-run
# Custom batch size for large databases
uv run python scripts/migrate_sqlite_to_postgres.py --batch-size 5000
```
## Auto-Detection
The tool automatically searches for SQLite files in common locations:
- `water_levels.db`
- `water_monitoring.db`
- `database.db`
- `../water_levels.db`
## Migration Output
The tool provides detailed logging:
```
========================================
SQLite to PostgreSQL Migration Tool
========================================
SQLite database: water_levels.db
PostgreSQL: postgresql
Step 1: Connecting to databases...
Connected to SQLite database: water_levels.db
Connected to PostgreSQL database
Step 2: Analyzing SQLite database structure...
Table 'stations': 8 columns, 25 rows
Table 'water_measurements': 7 columns, 15420 rows
Step 3: Migrating station data...
Migrated 25 stations
Step 4: Migrating measurement data...
Found 15420 measurements to migrate
Migrated 1000/15420 measurements
Migrated 2000/15420 measurements
...
Successfully migrated 15420 measurements
Step 5: Verifying migration...
SQLite stations: 25
SQLite measurements: 15420
PostgreSQL measurements retrieved: 15420
Migrated stations: 25
Migrated measurements: 15420
========================================
MIGRATION COMPLETED
========================================
Duration: 0:02:15
Stations migrated: 25
Measurements migrated: 15420
No errors encountered
```
## Error Handling
The migration tool is robust and handles:
- **Missing tables** - Tries alternative table names
- **Different column names** - Maps common variations
- **Missing data** - Uses sensible defaults
- **Invalid timestamps** - Attempts multiple date formats
- **Connection issues** - Provides clear error messages
- **Large datasets** - Processes in batches to avoid memory issues
## Log Files
Migration creates a detailed log file:
- `migration.log` - Complete migration log
- Shows all operations, errors, and statistics
- Useful for troubleshooting
## Troubleshooting
### Common Issues
**1. SQLite file not found**
```
SQLite database file not found. Please specify the path:
python migrate_sqlite_to_postgres.py /path/to/database.db
```
**Solution**: Specify the correct path to your SQLite file
**2. PostgreSQL not configured**
```
Error: PostgreSQL not configured. Set DB_TYPE=postgresql in your .env file
```
**Solution**: Ensure your .env file has `DB_TYPE=postgresql`
**3. Connection failed**
```
Database connection error: connection refused
```
**Solution**: Check your PostgreSQL connection settings
**4. No tables found**
```
Could not analyze SQLite database structure
```
**Solution**: Verify your SQLite file contains water monitoring data
### Performance Tips
- **Large databases**: Use `--batch-size 5000` for faster processing
- **Slow networks**: Reduce batch size to `--batch-size 100`
- **Memory issues**: Process smaller batches
## After Migration
Once migration is complete:
1. **Verify data**:
```bash
make run-test
make run-api
```
2. **Check the web interface**: Latest readings should show your migrated data
3. **Backup your SQLite**: Keep the original file as backup
4. **Update configurations**: Remove SQLite references from configs
## Rollback
If you need to rollback:
1. **Clear PostgreSQL data**:
```sql
DELETE FROM water_measurements;
DELETE FROM stations;
```
2. **Switch back to SQLite**:
```bash
# In .env file
DB_TYPE=sqlite
WATER_DB_PATH=water_levels.db
```
3. **Test the rollback**:
```bash
make run-test
```
The migration tool is designed to be safe and can be run multiple times - it handles duplicates appropriately.
## Next Steps
After successful migration:
- Set up automated backups for PostgreSQL
- Configure monitoring and alerting
- Consider data retention policies
- Update documentation references

301
build_executable.py Normal file
View File

@@ -0,0 +1,301 @@
#!/usr/bin/env python3
"""
Build script to create a standalone executable for Northern Thailand Ping River Monitor
"""
import os
import sys
import shutil
from pathlib import Path
def create_spec_file():
"""Create PyInstaller spec file"""
spec_content = """
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
# Data files to include
data_files = [
('.env', '.'),
('sql/*.sql', 'sql'),
('README.md', '.'),
('POSTGRESQL_SETUP.md', '.'),
('SQLITE_MIGRATION.md', '.'),
]
# Hidden imports that PyInstaller might miss
hidden_imports = [
'psycopg2',
'psycopg2-binary',
'sqlalchemy.dialects.postgresql',
'sqlalchemy.dialects.sqlite',
'sqlalchemy.dialects.mysql',
'influxdb',
'pymysql',
'dotenv',
'pydantic',
'fastapi',
'uvicorn',
'schedule',
'pandas',
'requests',
'psutil',
]
a = Analysis(
['run.py'],
pathex=['.'],
binaries=[],
datas=data_files,
hiddenimports=hidden_imports,
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
'tkinter',
'matplotlib',
'PIL',
'jupyter',
'notebook',
'IPython',
],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='ping-river-monitor',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon='icon.ico' if os.path.exists('icon.ico') else None,
)
"""
with open('ping-river-monitor.spec', 'w') as f:
f.write(spec_content.strip())
print("[OK] Created ping-river-monitor.spec")
def install_pyinstaller():
"""Install PyInstaller if not present"""
try:
import PyInstaller
print("[OK] PyInstaller already installed")
except ImportError:
print("Installing PyInstaller...")
os.system("uv add --dev pyinstaller")
print("[OK] PyInstaller installed")
def build_executable():
"""Build the executable"""
print("🔨 Building executable...")
# Clean previous builds
if os.path.exists('dist'):
shutil.rmtree('dist')
if os.path.exists('build'):
shutil.rmtree('build')
# Build with PyInstaller using uv
result = os.system("uv run pyinstaller ping-river-monitor.spec --clean --noconfirm")
if result == 0:
print("✅ Executable built successfully!")
# Copy additional files to dist directory
dist_dir = Path('dist')
if dist_dir.exists():
# Copy .env file if it exists
if os.path.exists('.env'):
shutil.copy2('.env', dist_dir / '.env')
print("✅ Copied .env file")
# Copy documentation
for doc in ['README.md', 'POSTGRESQL_SETUP.md', 'SQLITE_MIGRATION.md']:
if os.path.exists(doc):
shutil.copy2(doc, dist_dir / doc)
print(f"✅ Copied {doc}")
# Copy SQL files
if os.path.exists('sql'):
shutil.copytree('sql', dist_dir / 'sql', dirs_exist_ok=True)
print("✅ Copied SQL files")
print(f"\n🎉 Executable created: {dist_dir / 'ping-river-monitor.exe'}")
print(f"📁 All files in: {dist_dir.absolute()}")
else:
print("❌ Build failed!")
return False
return True
def create_batch_files():
"""Create convenient batch files"""
batch_files = {
'start.bat': '''@echo off
echo Starting Ping River Monitor...
ping-river-monitor.exe
pause
''',
'start-api.bat': '''@echo off
echo Starting Ping River Monitor Web API...
ping-river-monitor.exe --web-api
pause
''',
'test.bat': '''@echo off
echo Running Ping River Monitor test...
ping-river-monitor.exe --test
pause
''',
'status.bat': '''@echo off
echo Checking Ping River Monitor status...
ping-river-monitor.exe --status
pause
'''
}
dist_dir = Path('dist')
for filename, content in batch_files.items():
batch_file = dist_dir / filename
with open(batch_file, 'w') as f:
f.write(content)
print(f"✅ Created {filename}")
def create_readme():
"""Create deployment README"""
readme_content = """# Ping River Monitor - Standalone Executable
This is a standalone executable version of the Northern Thailand Ping River Monitor.
## Quick Start
1. **Configure Database**: Edit `.env` file with your PostgreSQL settings
2. **Test Connection**: Double-click `test.bat`
3. **Start Monitoring**: Double-click `start.bat`
4. **Web Interface**: Double-click `start-api.bat`
## Files Included
- `ping-river-monitor.exe` - Main executable
- `.env` - Configuration file (EDIT THIS!)
- `start.bat` - Start continuous monitoring
- `start-api.bat` - Start web API server
- `test.bat` - Run a test cycle
- `status.bat` - Check system status
- `README.md`, `POSTGRESQL_SETUP.md` - Documentation
- `sql/` - Database initialization scripts
## Configuration
Edit `.env` file:
```
DB_TYPE=postgresql
POSTGRES_HOST=your-server-ip
POSTGRES_PORT=5432
POSTGRES_DB=water_monitoring
POSTGRES_USER=your-username
POSTGRES_PASSWORD=your-password
```
## Usage
### Command Line
```cmd
# Continuous monitoring
ping-river-monitor.exe
# Single test run
ping-river-monitor.exe --test
# Web API server
ping-river-monitor.exe --web-api
# Check status
ping-river-monitor.exe --status
```
### Batch Files
- Just double-click the `.bat` files for easy operation
## Troubleshooting
1. **Database Connection Issues**
- Check `.env` file settings
- Verify PostgreSQL server is accessible
- Test with `test.bat`
2. **Permission Issues**
- Run as administrator if needed
- Check firewall settings for API mode
3. **Log Files**
- Check `water_monitor.log` for detailed logs
- Logs are created in the same directory as the executable
## Support
For issues or questions, check the documentation files included.
"""
with open('dist/DEPLOYMENT_README.txt', 'w') as f:
f.write(readme_content)
print("✅ Created DEPLOYMENT_README.txt")
def main():
"""Main build process"""
print("Building Ping River Monitor Executable")
print("=" * 50)
# Check if we're in the right directory
if not os.path.exists('run.py'):
print("❌ Error: run.py not found. Please run this from the project root directory.")
return False
# Install PyInstaller
install_pyinstaller()
# Create spec file
create_spec_file()
# Build executable
if not build_executable():
return False
# Create convenience files
create_batch_files()
create_readme()
print("\n" + "=" * 50)
print("🎉 BUILD COMPLETE!")
print("📁 Check the 'dist' folder for your executable")
print("💡 Edit the .env file before distributing")
print("🚀 Ready for deployment!")
return True
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

107
build_simple.py Normal file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
Simple build script for standalone executable
"""
import os
import sys
import shutil
from pathlib import Path
def main():
print("Building Ping River Monitor Executable")
print("=" * 50)
# Check if PyInstaller is installed
try:
import PyInstaller
print("[OK] PyInstaller available")
except ImportError:
print("[INFO] Installing PyInstaller...")
os.system("uv add --dev pyinstaller")
# Clean previous builds
if os.path.exists('dist'):
shutil.rmtree('dist')
print("[CLEAN] Removed old dist directory")
if os.path.exists('build'):
shutil.rmtree('build')
print("[CLEAN] Removed old build directory")
# Build command with all necessary options
cmd = [
"uv", "run", "pyinstaller",
"--onefile",
"--console",
"--name=ping-river-monitor",
"--add-data=.env;.",
"--add-data=sql;sql",
"--add-data=README.md;.",
"--add-data=POSTGRESQL_SETUP.md;.",
"--add-data=SQLITE_MIGRATION.md;.",
"--hidden-import=psycopg2",
"--hidden-import=sqlalchemy.dialects.postgresql",
"--hidden-import=sqlalchemy.dialects.sqlite",
"--hidden-import=dotenv",
"--hidden-import=pydantic",
"--hidden-import=fastapi",
"--hidden-import=uvicorn",
"--hidden-import=schedule",
"--hidden-import=pandas",
"--clean",
"--noconfirm",
"run.py"
]
print("[BUILD] Running PyInstaller...")
print("[CMD] " + " ".join(cmd))
result = os.system(" ".join(cmd))
if result == 0:
print("[SUCCESS] Executable built successfully!")
# Copy .env file to dist if it exists
if os.path.exists('.env') and os.path.exists('dist'):
shutil.copy2('.env', 'dist/.env')
print("[COPY] .env file copied to dist/")
# Create batch files for easy usage
batch_files = {
'start.bat': '''@echo off
echo Starting Ping River Monitor...
ping-river-monitor.exe
pause
''',
'start-api.bat': '''@echo off
echo Starting Web API...
ping-river-monitor.exe --web-api
pause
''',
'test.bat': '''@echo off
echo Running test...
ping-river-monitor.exe --test
pause
'''
}
for filename, content in batch_files.items():
if os.path.exists('dist'):
with open(f'dist/{filename}', 'w') as f:
f.write(content)
print(f"[CREATE] {filename}")
print("\n" + "=" * 50)
print("BUILD COMPLETE!")
print(f"Executable: dist/ping-river-monitor.exe")
print("Batch files: start.bat, start-api.bat, test.bat")
print("Don't forget to edit .env file before using!")
return True
else:
print("[ERROR] Build failed!")
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,168 @@
# Grafana Matrix Alerting Setup
## Overview
Configure Grafana to send water level alerts directly to Matrix channels when thresholds are exceeded.
## Prerequisites
- Grafana instance with your PostgreSQL data source
- Matrix account and access token
- Matrix room for alerts
## Step 1: Configure Matrix Contact Point
1. **In Grafana, go to Alerting → Contact Points**
2. **Add new contact point:**
```
Name: matrix-water-alerts
Integration: Webhook
URL: https://matrix.org/_matrix/client/v3/rooms/!ROOM_ID:matrix.org/send/m.room.message
HTTP Method: POST
```
3. **Add Headers:**
```
Authorization: Bearer YOUR_MATRIX_ACCESS_TOKEN
Content-Type: application/json
```
4. **Message Template:**
```json
{
"msgtype": "m.text",
"body": "🌊 WATER ALERT: {{ .CommonLabels.alertname }}\n\nStation: {{ .CommonLabels.station_code }}\nLevel: {{ .CommonAnnotations.water_level }}m\nStatus: {{ .CommonLabels.severity }}\n\nTime: {{ .CommonAnnotations.time }}"
}
```
## Step 2: Create Alert Rules
### High Water Level Alert
```yaml
Rule Name: high-water-level
Query: water_level > 6.0
Condition: IS ABOVE 6.0 FOR 5m
Labels:
- severity: critical
- station_code: {{ .station_code }}
Annotations:
- water_level: {{ .water_level }}
- summary: "Critical water level at {{ .station_code }}"
```
### Low Water Level Alert
```yaml
Rule Name: low-water-level
Query: water_level < 1.0
Condition: IS BELOW 1.0 FOR 10m
Labels:
- severity: warning
- station_code: {{ .station_code }}
```
### Data Gap Alert
```yaml
Rule Name: data-gap
Query: increase(measurements_total[1h]) == 0
Condition: IS EQUAL TO 0 FOR 30m
Labels:
- severity: warning
- issue: data-gap
```
## Step 3: Matrix Setup
### Get Matrix Access Token
```bash
curl -X POST https://matrix.org/_matrix/client/v3/login \
-H "Content-Type: application/json" \
-d '{
"type": "m.login.password",
"user": "your_username",
"password": "your_password"
}'
```
### Create Alert Room
```bash
curl -X POST "https://matrix.org/_matrix/client/v3/createRoom" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Water Level Alerts - Northern Thailand",
"topic": "Automated alerts for Ping River water monitoring",
"preset": "trusted_private_chat"
}'
```
## Example Alert Queries
### Critical Water Levels
```promql
# High water alert
water_level{station_code=~"P.1|P.4A|P.20"} > 6.0
# Dangerous discharge
discharge{station_code=~".*"} > 500
# Rapid level change
increase(water_level[15m]) > 0.5
```
### System Health
```promql
# No data received
up{job="water-monitor"} == 0
# Old data
(time() - timestamp) > 7200
```
## Alert Notification Format
Your Matrix messages will look like:
```
🌊 WATER ALERT: High Water Level
Station: P.1 (Chiang Mai)
Level: 6.2m (CRITICAL)
Discharge: 450 cms
Status: DANGER
Time: 2025-09-26 14:30:00
Trend: Rising (+0.3m in 30min)
📍 Location: 18.7883°N, 98.9853°E
```
## Advanced Features
### Escalation Rules
```yaml
# Send to different rooms based on severity
- if: severity == "critical"
receiver: matrix-emergency
- if: severity == "warning"
receiver: matrix-alerts
- if: time_of_day() outside "08:00-20:00"
receiver: matrix-night-duty
```
### Rate Limiting
```yaml
group_wait: 5m
group_interval: 10m
repeat_interval: 30m
```
## Testing Alerts
1. **Test Contact Point** - Use Grafana's test button
2. **Simulate Alert** - Manually trigger with test data
3. **Verify Matrix** - Check message formatting and delivery
## Troubleshooting
### Common Issues
- **403 Forbidden**: Check Matrix access token
- **Room not found**: Verify room ID format
- **No alerts**: Check query syntax and thresholds
- **Spam**: Configure proper grouping and intervals

View File

@@ -0,0 +1,351 @@
# Complete Grafana Matrix Alerting Setup Guide
## Overview
Configure Grafana to send water level alerts directly to Matrix channels when thresholds are exceeded.
## Prerequisites
- Grafana instance running (v8.0+)
- PostgreSQL data source configured in Grafana
- Matrix account
- Matrix room for alerts
## Step 1: Get Matrix Access Token
### Method 1: Using curl
```bash
curl -X POST https://matrix.org/_matrix/client/v3/login \
-H "Content-Type: application/json" \
-d '{
"type": "m.login.password",
"user": "your_username",
"password": "your_password"
}'
```
### Method 2: Using Element Web Client
1. Open Element in browser: https://app.element.io
2. Login to your account
3. Go to Settings → Help & About → Advanced
4. Copy your Access Token
### Method 3: Using Matrix Admin Panel
- If you have admin access to your homeserver, generate token via admin API
## Step 2: Create Alert Room
```bash
curl -X POST "https://matrix.org/_matrix/client/v3/createRoom" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Water Level Alerts - Northern Thailand",
"topic": "Automated alerts for Ping River water monitoring",
"preset": "private_chat"
}'
```
Save the `room_id` from the response (format: !roomid:homeserver.com)
## Step 3: Configure Grafana Contact Point
### Navigate to Alerting
1. In Grafana, go to **Alerting → Contact Points**
2. Click **Add contact point**
### Contact Point Settings
```
Name: matrix-water-alerts
Integration: Webhook
URL: https://matrix.org/_matrix/client/v3/rooms/!YOUR_ROOM_ID:matrix.org/send/m.room.message/{{ .GroupLabels.alertname }}_{{ .GroupLabels.severity }}_{{ now.Unix }}
HTTP Method: POST
```
### Headers
```
Authorization: Bearer YOUR_MATRIX_ACCESS_TOKEN
Content-Type: application/json
```
### Message Template (JSON Body)
```json
{
"msgtype": "m.text",
"body": "🌊 **PING RIVER WATER ALERT**\n\n**Alert:** {{ .GroupLabels.alertname }}\n**Severity:** {{ .GroupLabels.severity | toUpper }}\n**Station:** {{ .GroupLabels.station_code }} ({{ .GroupLabels.station_name }})\n\n{{ range .Alerts }}**Status:** {{ .Status | toUpper }}\n**Water Level:** {{ .Annotations.water_level }}m\n**Threshold:** {{ .Annotations.threshold }}m\n**Time:** {{ .StartsAt.Format \"2006-01-02 15:04:05\" }}\n{{ if .Annotations.discharge }}**Discharge:** {{ .Annotations.discharge }} cms\n{{ end }}{{ if .Annotations.message }}**Details:** {{ .Annotations.message }}\n{{ end }}{{ end }}\n📈 **Dashboard:** {{ .ExternalURL }}\n📍 **Location:** Northern Thailand Ping River"
}
```
## Step 4: Create Alert Rules
### High Water Level Alert
```yaml
# Rule Configuration
Rule Name: high-water-level
Evaluation Group: water-level-alerts
Folder: Water Monitoring
# Query A
SELECT
station_code,
station_name_th as station_name,
water_level,
discharge,
timestamp
FROM water_measurements
WHERE
timestamp > now() - interval '5 minutes'
AND water_level > 6.0
# Condition
IS ABOVE 6.0 FOR 5 minutes
# Labels
severity: critical
alertname: High Water Level
station_code: {{ $labels.station_code }}
station_name: {{ $labels.station_name }}
# Annotations
water_level: {{ $values.water_level }}
threshold: 6.0
discharge: {{ $values.discharge }}
summary: Critical water level detected at {{ $labels.station_code }}
```
### Emergency Water Level Alert
```yaml
Rule Name: emergency-water-level
Query: water_level > 8.0
Condition: IS ABOVE 8.0 FOR 2 minutes
Labels:
severity: emergency
alertname: Emergency Water Level
Annotations:
threshold: 8.0
message: IMMEDIATE ACTION REQUIRED - Flood risk imminent
```
### Low Water Level Alert
```yaml
Rule Name: low-water-level
Query: water_level < 1.0
Condition: IS BELOW 1.0 FOR 15 minutes
Labels:
severity: warning
alertname: Low Water Level
Annotations:
threshold: 1.0
message: Drought conditions detected
```
### Data Gap Alert
```yaml
Rule Name: data-gap
Query:
SELECT
station_code,
MAX(timestamp) as last_seen
FROM water_measurements
GROUP BY station_code
HAVING MAX(timestamp) < now() - interval '2 hours'
Condition: HAS NO DATA FOR 30 minutes
Labels:
severity: warning
alertname: Data Gap
issue: missing-data
```
### Rapid Level Change Alert
```yaml
Rule Name: rapid-level-change
Query:
SELECT
station_code,
water_level,
LAG(water_level, 1) OVER (PARTITION BY station_code ORDER BY timestamp) as prev_level
FROM water_measurements
WHERE timestamp > now() - interval '15 minutes'
HAVING ABS(water_level - prev_level) > 0.5
Condition: CHANGE > 0.5m FOR 1 minute
Labels:
severity: warning
alertname: Rapid Water Level Change
```
## Step 5: Configure Notification Policy
### Create Notification Policy
```yaml
# Policy Tree
- receiver: matrix-water-alerts
match:
severity: emergency|critical
group_wait: 10s
group_interval: 5m
repeat_interval: 30m
- receiver: matrix-water-alerts
match:
severity: warning
group_wait: 30s
group_interval: 10m
repeat_interval: 2h
```
### Grouping Rules
```yaml
group_by: [alertname, station_code]
group_wait: 10s
group_interval: 5m
repeat_interval: 1h
```
## Step 6: Station-Specific Thresholds
Create separate rules for each station with appropriate thresholds:
```sql
-- P.1 (Chiang Mai) - Urban area, higher thresholds
SELECT * FROM water_measurements
WHERE station_code = 'P.1' AND water_level > 6.5
-- P.4A (Mae Ping) - Agricultural area
SELECT * FROM water_measurements
WHERE station_code = 'P.4A' AND water_level > 5.0
-- P.20 (Downstream) - Lower threshold
SELECT * FROM water_measurements
WHERE station_code = 'P.20' AND water_level > 4.0
```
## Step 7: Advanced Features
### Time-Based Routing
```yaml
# Different receivers for day/night
time_intervals:
- name: working_hours
time_intervals:
- times:
- start_time: '08:00'
end_time: '20:00'
weekdays: ['monday:friday']
routes:
- receiver: matrix-alerts-day
match:
severity: warning
active_time_intervals: [working_hours]
- receiver: matrix-alerts-night
match:
severity: warning
active_time_intervals: ['!working_hours']
```
### Multi-Channel Alerts
```yaml
# Send critical alerts to multiple rooms
- receiver: matrix-emergency
webhook_configs:
- url: https://matrix.org/_matrix/client/v3/rooms/!emergency:matrix.org/send/m.room.message
http_config:
authorization:
credentials: "Bearer EMERGENCY_TOKEN"
- url: https://matrix.org/_matrix/client/v3/rooms/!general:matrix.org/send/m.room.message
http_config:
authorization:
credentials: "Bearer GENERAL_TOKEN"
```
## Step 8: Testing
### Test Contact Point
1. Go to Contact Points in Grafana
2. Select your Matrix contact point
3. Click "Test" button
4. Check Matrix room for test message
### Test Alert Rules
1. Temporarily lower thresholds
2. Wait for condition to trigger
3. Verify alert appears in Grafana
4. Verify Matrix message received
5. Reset thresholds
### Manual Alert Trigger
```bash
# Simulate high water level in database
INSERT INTO water_measurements (station_code, water_level, timestamp)
VALUES ('P.1', 7.5, NOW());
```
## Troubleshooting
### Common Issues
#### 403 Forbidden
- **Cause**: Invalid Matrix access token
- **Fix**: Regenerate token or check permissions
#### Room Not Found
- **Cause**: Incorrect room ID format
- **Fix**: Ensure room ID starts with ! and includes homeserver
#### No Alerts Firing
- **Cause**: Query returns no results
- **Fix**: Test queries in Grafana Explore, check data availability
#### Alert Spam
- **Cause**: No grouping configured
- **Fix**: Configure proper group_by and intervals
#### Messages Not Formatted
- **Cause**: Template syntax errors
- **Fix**: Validate JSON template, check Grafana template docs
### Debug Steps
1. Check Grafana alert rule status
2. Verify contact point test succeeds
3. Check Grafana logs: `/var/log/grafana/grafana.log`
4. Test Matrix API directly with curl
5. Verify database connectivity and query results
## Environment Variables
Add to your `.env`:
```bash
MATRIX_HOMESERVER=https://matrix.org
MATRIX_ACCESS_TOKEN=your_access_token_here
MATRIX_ROOM_ID=!your_room_id:matrix.org
GRAFANA_URL=http://your-grafana-host:3000
```
## Example Alert Message
Your Matrix messages will appear as:
```
🌊 **PING RIVER WATER ALERT**
**Alert:** High Water Level
**Severity:** CRITICAL
**Station:** P.1 (สถานีเชียงใหม่)
**Status:** FIRING
**Water Level:** 6.75m
**Threshold:** 6.0m
**Time:** 2025-09-26 14:30:00
**Discharge:** 450.2 cms
📈 **Dashboard:** http://grafana:3000
📍 **Location:** Northern Thailand Ping River
```
## Security Notes
- Store Matrix tokens securely (environment variables)
- Use room-specific tokens when possible
- Enable rate limiting to prevent spam
- Consider using dedicated alerting user account
- Regularly rotate access tokens
This setup provides comprehensive water level monitoring with immediate Matrix notifications when thresholds are exceeded.

View File

@@ -0,0 +1,85 @@
# Quick Matrix Alerting Setup
## Step 1: Get Matrix Account
1. Go to https://app.element.io or install Element app
2. Create account or login with existing Matrix account
## Step 2: Get Access Token
### Method 1: Element Web (Recommended)
1. Open Element in browser: https://app.element.io
2. Login to your account
3. Click Settings (gear icon) → Help & About → Advanced
4. Copy your "Access Token" (starts with `syt_...` or similar)
### Method 2: Command Line
```bash
curl -X POST https://matrix.org/_matrix/client/v3/login \
-H "Content-Type: application/json" \
-d '{
"type": "m.login.password",
"user": "your_username",
"password": "your_password"
}'
```
## Step 3: Create Alert Room
1. In Element, click "+" to create new room
2. Name: "Water Level Alerts"
3. Set to Private
4. Copy the room ID from room settings (format: `!roomid:matrix.org`)
## Step 4: Configure .env File
Add these to your `.env` file:
```bash
# Matrix Alerting Configuration
MATRIX_HOMESERVER=https://matrix.org
MATRIX_ACCESS_TOKEN=syt_your_access_token_here
MATRIX_ROOM_ID=!your_room_id:matrix.org
# Grafana Integration (optional)
GRAFANA_URL=http://localhost:3000
```
## Step 5: Test Configuration
```bash
# Test Matrix connection
uv run python run.py --alert-test
# Check system status (shows Matrix config)
uv run python run.py --status
# Run alert check
uv run python run.py --alert-check
```
## Example Alert Message
When thresholds are exceeded, you'll receive messages like:
```
🌊 **WATER LEVEL ALERT**
**Station:** P.1 (สถานีเชียงใหม่)
**Alert Type:** Critical Water Level
**Severity:** CRITICAL
**Current Level:** 6.75m
**Threshold:** 6.0m
**Difference:** +0.75m
**Discharge:** 450.2 cms
**Time:** 2025-09-26 14:30:00
📈 View dashboard: http://localhost:3000
```
## Cron Job Setup (Optional)
Add to crontab for automatic alerting:
```bash
# Check water levels every 15 minutes
*/15 * * * * cd /path/to/monitor && uv run python run.py --alert-check >> alerts.log 2>&1
```
## Troubleshooting
- **403 Error**: Check Matrix access token is valid
- **Room Not Found**: Verify room ID includes `!` prefix and `:homeserver.com` suffix
- **No Alerts**: Check database has recent data with `uv run python run.py --status`

38
ping-river-monitor.spec Normal file
View File

@@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['run.py'],
pathex=[],
binaries=[],
datas=[('.env', '.'), ('sql', 'sql'), ('README.md', '.'), ('POSTGRESQL_SETUP.md', '.'), ('SQLITE_MIGRATION.md', '.')],
hiddenimports=['psycopg2', 'sqlalchemy.dialects.postgresql', 'sqlalchemy.dialects.sqlite', 'dotenv', 'pydantic', 'fastapi', 'uvicorn', 'schedule', 'pandas'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='ping-river-monitor',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

129
pyproject.toml Normal file
View File

@@ -0,0 +1,129 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "northern-thailand-ping-river-monitor"
version = "3.1.3"
description = "Real-time water level monitoring system for the Ping River Basin in Northern Thailand"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Ping River Monitor Team", email = "contact@example.com"}
]
keywords = [
"water monitoring",
"hydrology",
"thailand",
"ping river",
"environmental monitoring",
"time series",
"fastapi",
"real-time data"
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Science/Research",
"Intended Audience :: System Administrators",
"Topic :: Scientific/Engineering :: Hydrology",
"Topic :: System :: Monitoring",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Operating System :: OS Independent",
"Environment :: Web Environment",
"Framework :: FastAPI"
]
requires-python = ">=3.9"
dependencies = [
# Core dependencies
"requests==2.31.0",
"schedule==1.2.0",
"pandas==2.0.3",
# Web API framework
"fastapi==0.104.1",
"uvicorn[standard]==0.24.0",
"pydantic==2.5.0",
# Database adapters
"sqlalchemy==2.0.23",
"influxdb==5.3.1",
"pymysql==1.1.0",
"psycopg2-binary==2.9.9",
# Monitoring and metrics
"psutil==5.9.6"
]
[project.optional-dependencies]
dev = [
# Testing
"pytest==7.4.3",
"pytest-cov==4.1.0",
"pytest-asyncio==0.21.1",
# Code formatting and linting
"black==23.11.0",
"flake8==6.1.0",
"isort==5.12.0",
"mypy==1.7.1",
# Pre-commit hooks
"pre-commit==3.5.0",
# Development tools
"ipython==8.17.2",
"jupyter==1.0.0",
# Type stubs
"types-requests==2.31.0.10",
"types-python-dateutil==2.8.19.14"
]
docs = [
"sphinx==7.2.6",
"sphinx-rtd-theme==1.3.0",
"sphinx-autodoc-typehints==1.25.2"
]
all = [
"influxdb==5.3.1",
"pymysql==1.1.0",
"psycopg2-binary==2.9.9"
]
[project.scripts]
ping-river-monitor = "src.main:main"
ping-river-api = "src.web_api:main"
[project.urls]
Homepage = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor"
Repository = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor"
Issues = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/issues"
Documentation = "https://git.b4l.co.th/B4L/Northern-Thailand-Ping-River-Monitor/wiki"
[tool.uv]
dev-dependencies = [
# Testing
"pytest==7.4.3",
"pytest-cov==4.1.0",
"pytest-asyncio==0.21.1",
# Code formatting and linting
"black==23.11.0",
"flake8==6.1.0",
"isort==5.12.0",
"mypy==1.7.1",
# Pre-commit hooks
"pre-commit==3.5.0",
# Development tools
"ipython==8.17.2",
"jupyter==1.0.0",
# Type stubs
"types-requests==2.31.0.10",
"types-python-dateutil==2.8.19.14",
# Documentation
"sphinx==7.2.6",
"sphinx-rtd-theme==1.3.0",
"sphinx-autodoc-typehints==1.25.2",
"pyinstaller>=6.16.0",
]
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-dir]
"" = "src"

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
Password URL encoder for PostgreSQL connection strings
"""
import urllib.parse
import sys
def encode_password(password: str) -> str:
"""URL encode a password for use in connection strings"""
return urllib.parse.quote(password, safe='')
def build_connection_string(username: str, password: str, host: str, port: int, database: str) -> str:
"""Build a properly encoded PostgreSQL connection string"""
encoded_password = encode_password(password)
return f"postgresql://{username}:{encoded_password}@{host}:{port}/{database}"
def main():
print("PostgreSQL Password URL Encoder")
print("=" * 40)
if len(sys.argv) > 1:
# Password provided as argument
password = sys.argv[1]
else:
# Interactive mode
password = input("Enter your password: ")
encoded = encode_password(password)
print(f"\nOriginal password: {password}")
print(f"URL encoded: {encoded}")
# Optional: build full connection string
try:
build_full = input("\nBuild full connection string? (y/N): ").strip().lower() == 'y'
except (EOFError, KeyboardInterrupt):
print("\nDone!")
return
if build_full:
username = input("Username: ").strip()
host = input("Host: ").strip()
port = input("Port [5432]: ").strip() or "5432"
database = input("Database [water_monitoring]: ").strip() or "water_monitoring"
connection_string = build_connection_string(username, password, host, int(port), database)
print(f"\nComplete connection string:")
print(f"POSTGRES_CONNECTION_STRING={connection_string}")
print(f"\nAdd this to your .env file:")
print(f"DB_TYPE=postgresql")
print(f"POSTGRES_CONNECTION_STRING={connection_string}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,619 @@
#!/usr/bin/env python3
"""
SQLite to PostgreSQL Migration Tool
Migrates all data from SQLite database to PostgreSQL
"""
import os
import sys
import logging
import sqlite3
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
# Add src to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
@dataclass
class MigrationStats:
stations_migrated: int = 0
measurements_migrated: int = 0
errors: List[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
def __post_init__(self):
if self.errors is None:
self.errors = []
class SQLiteToPostgresMigrator:
def __init__(self, sqlite_path: str, postgres_config: Dict[str, Any]):
self.sqlite_path = sqlite_path
self.postgres_config = postgres_config
self.sqlite_conn = None
self.postgres_adapter = None
self.stats = MigrationStats()
# Setup logging with UTF-8 encoding
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('migration.log', encoding='utf-8')
]
)
self.logger = logging.getLogger(__name__)
def connect_databases(self) -> bool:
"""Connect to both SQLite and PostgreSQL databases"""
try:
# Connect to SQLite
if not os.path.exists(self.sqlite_path):
self.logger.error(f"SQLite database not found: {self.sqlite_path}")
return False
self.sqlite_conn = sqlite3.connect(self.sqlite_path)
self.sqlite_conn.row_factory = sqlite3.Row # For dict-like access
self.logger.info(f"Connected to SQLite database: {self.sqlite_path}")
# Connect to PostgreSQL
from database_adapters import create_database_adapter
self.postgres_adapter = create_database_adapter(
self.postgres_config['type'],
connection_string=self.postgres_config['connection_string']
)
if not self.postgres_adapter.connect():
self.logger.error("Failed to connect to PostgreSQL")
return False
self.logger.info("Connected to PostgreSQL database")
return True
except Exception as e:
self.logger.error(f"Database connection error: {e}")
return False
def analyze_sqlite_schema(self) -> Dict[str, List[str]]:
"""Analyze SQLite database structure"""
try:
cursor = self.sqlite_conn.cursor()
# Get all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [row[0] for row in cursor.fetchall()]
schema_info = {}
for table in tables:
cursor.execute(f"PRAGMA table_info({table})")
columns = [row[1] for row in cursor.fetchall()]
schema_info[table] = columns
# Get row count
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
self.logger.info(f"Table '{table}': {len(columns)} columns, {count} rows")
return schema_info
except Exception as e:
self.logger.error(f"Schema analysis error: {e}")
return {}
def migrate_stations(self) -> bool:
"""Migrate station data"""
try:
cursor = self.sqlite_conn.cursor()
# Try different possible table names and structures
station_queries = [
# Modern structure
"""SELECT id, station_code, station_name_th as thai_name, station_name_en as english_name,
latitude, longitude, geohash, created_at, updated_at
FROM stations""",
# Alternative structure 1
"""SELECT id, station_code, thai_name, english_name,
latitude, longitude, geohash, created_at, updated_at
FROM stations""",
# Legacy structure
"""SELECT station_id as id, station_code, station_name as thai_name,
station_name as english_name, lat as latitude, lon as longitude,
NULL as geohash, datetime('now') as created_at, datetime('now') as updated_at
FROM water_stations""",
# Simple structure
"""SELECT rowid as id, station_code, name as thai_name, name as english_name,
NULL as latitude, NULL as longitude, NULL as geohash,
datetime('now') as created_at, datetime('now') as updated_at
FROM stations""",
]
stations_data = []
for query in station_queries:
try:
cursor.execute(query)
rows = cursor.fetchall()
if rows:
self.logger.info(f"Found {len(rows)} stations using query variant")
for row in rows:
station = {
'station_id': row[0],
'station_code': row[1] or f"STATION_{row[0]}",
'station_name_th': row[2] or f"Station {row[0]}",
'station_name_en': row[3] or f"Station {row[0]}",
'latitude': row[4],
'longitude': row[5],
'geohash': row[6],
'status': 'active'
}
stations_data.append(station)
break
except sqlite3.OperationalError as e:
if "no such table" in str(e).lower() or "no such column" in str(e).lower():
continue
else:
raise
if not stations_data:
self.logger.warning("No stations found in SQLite database")
return True
# Insert stations into PostgreSQL using raw SQL
# Since the adapter is designed for measurements, we'll use direct SQL
try:
from sqlalchemy import create_engine, text
engine = create_engine(self.postgres_config['connection_string'])
# Process stations individually to avoid transaction rollback issues
for station in stations_data:
try:
with engine.begin() as conn:
# Use PostgreSQL UPSERT syntax with correct column names
station_sql = """
INSERT INTO stations (id, station_code, thai_name, english_name, latitude, longitude, geohash)
VALUES (:station_id, :station_code, :thai_name, :english_name, :latitude, :longitude, :geohash)
ON CONFLICT (id) DO UPDATE SET
thai_name = EXCLUDED.thai_name,
english_name = EXCLUDED.english_name,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
geohash = EXCLUDED.geohash,
updated_at = CURRENT_TIMESTAMP
"""
conn.execute(text(station_sql), {
'station_id': station['station_id'],
'station_code': station['station_code'],
'thai_name': station['station_name_th'],
'english_name': station['station_name_en'],
'latitude': station.get('latitude'),
'longitude': station.get('longitude'),
'geohash': station.get('geohash')
})
self.stats.stations_migrated += 1
except Exception as e:
error_msg = f"Error migrating station {station.get('station_code', 'unknown')}: {str(e)[:100]}..."
self.logger.warning(error_msg)
self.stats.errors.append(error_msg)
self.logger.info(f"Migrated {self.stats.stations_migrated} stations")
except Exception as e:
self.logger.error(f"Station migration failed: {e}")
return False
self.logger.info(f"Migrated {self.stats.stations_migrated} stations")
return True
except Exception as e:
self.logger.error(f"Station migration error: {e}")
return False
def migrate_measurements(self, batch_size: int = 5000) -> bool:
"""Migrate measurement data in batches"""
try:
cursor = self.sqlite_conn.cursor()
# Try different possible measurement table structures
measurement_queries = [
# Modern structure
"""SELECT w.timestamp, w.station_id, s.station_code, s.station_name_th, s.station_name_en,
w.water_level, w.discharge, w.discharge_percent, w.status
FROM water_measurements w
JOIN stations s ON w.station_id = s.id
ORDER BY w.timestamp""",
# Alternative with different join
"""SELECT w.timestamp, w.station_id, s.station_code, s.thai_name, s.english_name,
w.water_level, w.discharge, w.discharge_percent, 'active' as status
FROM water_measurements w
JOIN stations s ON w.station_id = s.id
ORDER BY w.timestamp""",
# Legacy structure
"""SELECT timestamp, station_id, station_code, station_name, station_name,
water_level, discharge, discharge_percent, 'active' as status
FROM measurements
ORDER BY timestamp""",
# Simple structure without joins
"""SELECT timestamp, station_id, 'UNKNOWN' as station_code, 'Unknown' as station_name_th, 'Unknown' as station_name_en,
water_level, discharge, discharge_percent, 'active' as status
FROM water_measurements
ORDER BY timestamp""",
]
measurements_processed = 0
for query in measurement_queries:
try:
# Get total count first
count_query = query.replace("SELECT", "SELECT COUNT(*) FROM (SELECT").replace("ORDER BY w.timestamp", "") + ")"
cursor.execute(count_query)
total_measurements = cursor.fetchone()[0]
if total_measurements == 0:
continue
self.logger.info(f"Found {total_measurements} measurements to migrate")
# Process in batches
offset = 0
while True:
batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
cursor.execute(batch_query)
rows = cursor.fetchall()
if not rows:
break
# Convert to measurement format
measurements = []
for row in rows:
try:
# Parse timestamp
timestamp_str = row[0]
if isinstance(timestamp_str, str):
try:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
except:
# Try other common formats
for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S']:
try:
timestamp = datetime.strptime(timestamp_str, fmt)
break
except:
continue
else:
timestamp = datetime.now()
else:
timestamp = timestamp_str
measurement = {
'timestamp': timestamp,
'station_id': row[1] or 999,
'station_code': row[2] or 'UNKNOWN',
'station_name_th': row[3] or 'Unknown',
'station_name_en': row[4] or 'Unknown',
'water_level': float(row[5]) if row[5] is not None else None,
'discharge': float(row[6]) if row[6] is not None else None,
'discharge_percent': float(row[7]) if row[7] is not None else None,
'status': row[8] or 'active'
}
measurements.append(measurement)
except Exception as e:
error_msg = f"Error processing measurement row: {e}"
self.logger.warning(error_msg)
continue
# Save batch to PostgreSQL using fast bulk insert
if measurements:
try:
self._fast_bulk_insert(measurements)
measurements_processed += len(measurements)
self.stats.measurements_migrated += len(measurements)
self.logger.info(f"Migrated {measurements_processed}/{total_measurements} measurements")
except Exception as e:
error_msg = f"Error saving measurement batch: {e}"
self.logger.error(error_msg)
self.stats.errors.append(error_msg)
offset += batch_size
# If we processed measurements, we're done
if measurements_processed > 0:
break
except sqlite3.OperationalError as e:
if "no such table" in str(e).lower() or "no such column" in str(e).lower():
continue
else:
raise
if measurements_processed == 0:
self.logger.warning("No measurements found in SQLite database")
else:
self.logger.info(f"Successfully migrated {measurements_processed} measurements")
return True
except Exception as e:
self.logger.error(f"Measurement migration error: {e}")
return False
def _fast_bulk_insert(self, measurements: List[Dict]) -> bool:
"""Super fast bulk insert using PostgreSQL COPY or VALUES clause"""
try:
import psycopg2
from urllib.parse import urlparse
import io
# Parse connection string for direct psycopg2 connection
parsed = urlparse(self.postgres_config['connection_string'])
# Try super fast COPY method first
try:
conn = psycopg2.connect(
host=parsed.hostname,
port=parsed.port or 5432,
database=parsed.path[1:],
user=parsed.username,
password=parsed.password
)
with conn:
with conn.cursor() as cur:
# Prepare data for COPY
data_buffer = io.StringIO()
null_val = '\\N'
for m in measurements:
data_buffer.write(f"{m['timestamp']}\t{m['station_id']}\t{m['water_level'] or null_val}\t{m['discharge'] or null_val}\t{m['discharge_percent'] or null_val}\t{m['status']}\n")
data_buffer.seek(0)
# Use COPY for maximum speed
cur.copy_from(
data_buffer,
'water_measurements',
columns=('timestamp', 'station_id', 'water_level', 'discharge', 'discharge_percent', 'status'),
sep='\t'
)
conn.close()
return True
except Exception as copy_error:
# Fallback to SQLAlchemy bulk insert
self.logger.debug(f"COPY failed, using bulk VALUES: {copy_error}")
from sqlalchemy import create_engine, text
engine = create_engine(self.postgres_config['connection_string'])
with engine.begin() as conn:
# Use PostgreSQL's fast bulk insert with ON CONFLICT
values_list = []
for m in measurements:
timestamp = m['timestamp'].isoformat() if hasattr(m['timestamp'], 'isoformat') else str(m['timestamp'])
values_list.append(
f"('{timestamp}', {m['station_id']}, {m['water_level'] or 'NULL'}, "
f"{m['discharge'] or 'NULL'}, {m['discharge_percent'] or 'NULL'}, '{m['status']}')"
)
# Build bulk insert query with ON CONFLICT handling
bulk_sql = f"""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge, discharge_percent, status)
VALUES {','.join(values_list)}
ON CONFLICT (timestamp, station_id) DO UPDATE SET
water_level = EXCLUDED.water_level,
discharge = EXCLUDED.discharge,
discharge_percent = EXCLUDED.discharge_percent,
status = EXCLUDED.status
"""
conn.execute(text(bulk_sql))
return True
except Exception as e:
self.logger.warning(f"Fast bulk insert failed: {e}")
# Final fallback to original method
try:
success = self.postgres_adapter.save_measurements(measurements)
return success
except Exception as fallback_e:
self.logger.error(f"All insert methods failed: {fallback_e}")
return False
def verify_migration(self) -> bool:
"""Verify the migration by comparing counts"""
try:
# Get SQLite counts
cursor = self.sqlite_conn.cursor()
sqlite_stations = 0
sqlite_measurements = 0
# Try to get station count
for table in ['stations', 'water_stations']:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
sqlite_stations = cursor.fetchone()[0]
break
except:
continue
# Try to get measurement count
for table in ['water_measurements', 'measurements']:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
sqlite_measurements = cursor.fetchone()[0]
break
except:
continue
# Get PostgreSQL counts
postgres_measurements = self.postgres_adapter.get_latest_measurements(limit=999999)
postgres_count = len(postgres_measurements)
self.logger.info("Migration Verification:")
self.logger.info(f"SQLite stations: {sqlite_stations}")
self.logger.info(f"SQLite measurements: {sqlite_measurements}")
self.logger.info(f"PostgreSQL measurements retrieved: {postgres_count}")
self.logger.info(f"Migrated stations: {self.stats.stations_migrated}")
self.logger.info(f"Migrated measurements: {self.stats.measurements_migrated}")
return True
except Exception as e:
self.logger.error(f"Verification error: {e}")
return False
def run_migration(self, sqlite_path: str = None) -> bool:
"""Run the complete migration process"""
self.stats.start_time = datetime.now()
if sqlite_path:
self.sqlite_path = sqlite_path
self.logger.info("=" * 60)
self.logger.info("SQLite to PostgreSQL Migration Tool")
self.logger.info("=" * 60)
self.logger.info(f"SQLite database: {self.sqlite_path}")
self.logger.info(f"PostgreSQL: {self.postgres_config['type']}")
try:
# Step 1: Connect to databases
self.logger.info("Step 1: Connecting to databases...")
if not self.connect_databases():
return False
# Step 2: Analyze SQLite schema
self.logger.info("Step 2: Analyzing SQLite database structure...")
schema_info = self.analyze_sqlite_schema()
if not schema_info:
self.logger.error("Could not analyze SQLite database structure")
return False
# Step 3: Migrate stations
self.logger.info("Step 3: Migrating station data...")
if not self.migrate_stations():
self.logger.error("Station migration failed")
return False
# Step 4: Migrate measurements
self.logger.info("Step 4: Migrating measurement data...")
if not self.migrate_measurements():
self.logger.error("Measurement migration failed")
return False
# Step 5: Verify migration
self.logger.info("Step 5: Verifying migration...")
self.verify_migration()
self.stats.end_time = datetime.now()
duration = self.stats.end_time - self.stats.start_time
# Final report
self.logger.info("=" * 60)
self.logger.info("MIGRATION COMPLETED")
self.logger.info("=" * 60)
self.logger.info(f"Duration: {duration}")
self.logger.info(f"Stations migrated: {self.stats.stations_migrated}")
self.logger.info(f"Measurements migrated: {self.stats.measurements_migrated}")
if self.stats.errors:
self.logger.warning(f"Errors encountered: {len(self.stats.errors)}")
for error in self.stats.errors[:10]: # Show first 10 errors
self.logger.warning(f" - {error}")
if len(self.stats.errors) > 10:
self.logger.warning(f" ... and {len(self.stats.errors) - 10} more errors")
else:
self.logger.info("No errors encountered")
return True
except Exception as e:
self.logger.error(f"Migration failed: {e}")
return False
finally:
# Cleanup
if self.sqlite_conn:
self.sqlite_conn.close()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="Migrate SQLite data to PostgreSQL")
parser.add_argument("sqlite_path", nargs="?", help="Path to SQLite database file")
parser.add_argument("--batch-size", type=int, default=5000, help="Batch size for processing measurements")
parser.add_argument("--fast", action="store_true", help="Use maximum speed mode (batch-size 10000)")
parser.add_argument("--dry-run", action="store_true", help="Analyze only, don't migrate")
args = parser.parse_args()
# Set fast mode
if args.fast:
args.batch_size = 10000
# Get SQLite path
sqlite_path = args.sqlite_path
if not sqlite_path:
# Try to find common SQLite database files
possible_paths = [
"water_levels.db",
"water_monitoring.db",
"database.db",
"../water_levels.db"
]
for path in possible_paths:
if os.path.exists(path):
sqlite_path = path
break
if not sqlite_path:
print("SQLite database file not found. Please specify the path:")
print(" python migrate_sqlite_to_postgres.py /path/to/database.db")
return False
# Get PostgreSQL configuration
try:
from config import Config
postgres_config = Config.get_database_config()
if postgres_config['type'] != 'postgresql':
print("Error: PostgreSQL not configured. Set DB_TYPE=postgresql in your .env file")
return False
except Exception as e:
print(f"Error loading PostgreSQL configuration: {e}")
return False
# Run migration
migrator = SQLiteToPostgresMigrator(sqlite_path, postgres_config)
if args.dry_run:
print("DRY RUN MODE - Analyzing SQLite database structure only")
if migrator.connect_databases():
schema_info = migrator.analyze_sqlite_schema()
print("\nSQLite database structure analysis complete.")
print("Run without --dry-run to perform the actual migration.")
return True
success = migrator.run_migration()
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

175
scripts/setup_postgres.py Normal file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""
PostgreSQL setup script for Northern Thailand Ping River Monitor
This script helps you configure and test your PostgreSQL connection
"""
import os
import sys
import logging
from typing import Optional
from urllib.parse import urlparse
def setup_logging():
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
def test_postgres_connection(connection_string: str) -> bool:
"""Test connection to PostgreSQL database"""
try:
from sqlalchemy import create_engine, text
# Test connection
engine = create_engine(connection_string, pool_pre_ping=True)
with engine.connect() as conn:
result = conn.execute(text("SELECT version()"))
version = result.fetchone()[0]
logging.info(f"✅ Connected to PostgreSQL successfully!")
logging.info(f"Database version: {version}")
return True
except ImportError:
logging.error("❌ psycopg2-binary not installed. Run: uv add psycopg2-binary")
return False
except Exception as e:
logging.error(f"❌ Connection failed: {e}")
return False
def parse_connection_string(connection_string: str) -> dict:
"""Parse PostgreSQL connection string into components"""
try:
parsed = urlparse(connection_string)
return {
'host': parsed.hostname,
'port': parsed.port or 5432,
'database': parsed.path[1:] if parsed.path else None,
'username': parsed.username,
'password': parsed.password,
}
except Exception as e:
logging.error(f"Failed to parse connection string: {e}")
return {}
def create_database_if_not_exists(connection_string: str, database_name: str) -> bool:
"""Create database if it doesn't exist"""
try:
from sqlalchemy import create_engine, text
# Connect to default postgres database to create our database
parsed = urlparse(connection_string)
admin_connection = connection_string.replace(f"/{parsed.path[1:]}", "/postgres")
engine = create_engine(admin_connection, pool_pre_ping=True)
with engine.connect() as conn:
# Check if database exists
result = conn.execute(text(
"SELECT 1 FROM pg_database WHERE datname = :db_name"
), {"db_name": database_name})
if result.fetchone():
logging.info(f"✅ Database '{database_name}' already exists")
return True
else:
# Create database
conn.execute(text("COMMIT")) # End transaction
conn.execute(text(f'CREATE DATABASE "{database_name}"'))
logging.info(f"✅ Created database '{database_name}'")
return True
except Exception as e:
logging.error(f"❌ Failed to create database: {e}")
return False
def initialize_tables(connection_string: str) -> bool:
"""Initialize database tables"""
try:
# Import the database adapter to create tables
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from database_adapters import SQLAdapter
adapter = SQLAdapter(connection_string=connection_string, db_type='postgresql')
if adapter.connect():
logging.info("✅ Database tables initialized successfully")
return True
else:
logging.error("❌ Failed to initialize tables")
return False
except Exception as e:
logging.error(f"❌ Failed to initialize tables: {e}")
return False
def interactive_setup():
"""Interactive setup wizard"""
print("🐘 PostgreSQL Setup Wizard for Ping River Monitor")
print("=" * 50)
# Get connection details
host = input("PostgreSQL host (e.g., 192.168.1.100): ").strip()
port = input("PostgreSQL port [5432]: ").strip() or "5432"
database = input("Database name [water_monitoring]: ").strip() or "water_monitoring"
username = input("Username: ").strip()
password = input("Password: ").strip()
# Optional SSL
use_ssl = input("Use SSL connection? (y/N): ").strip().lower() == 'y'
ssl_params = "?sslmode=require" if use_ssl else ""
connection_string = f"postgresql://{username}:{password}@{host}:{port}/{database}{ssl_params}"
print(f"\nGenerated connection string:")
print(f"POSTGRES_CONNECTION_STRING={connection_string}")
return connection_string
def main():
setup_logging()
print("🚀 Northern Thailand Ping River Monitor - PostgreSQL Setup")
print("=" * 60)
# Check if connection string is provided via environment
connection_string = os.getenv('POSTGRES_CONNECTION_STRING')
if not connection_string:
print("No POSTGRES_CONNECTION_STRING found in environment.")
print("Starting interactive setup...\n")
connection_string = interactive_setup()
# Suggest adding to .env file
print(f"\n💡 Add this to your .env file:")
print(f"DB_TYPE=postgresql")
print(f"POSTGRES_CONNECTION_STRING={connection_string}")
# Parse connection details
config = parse_connection_string(connection_string)
if not config.get('host'):
logging.error("Invalid connection string format")
return False
print(f"\n🔗 Connecting to PostgreSQL at {config['host']}:{config['port']}")
# Test connection
if not test_postgres_connection(connection_string):
return False
# Try to create database
database_name = config.get('database', 'water_monitoring')
if database_name:
create_database_if_not_exists(connection_string, database_name)
# Initialize tables
if not initialize_tables(connection_string):
return False
print("\n🎉 PostgreSQL setup completed successfully!")
print("\nNext steps:")
print("1. Update your .env file with the connection string")
print("2. Run: make run-test")
print("3. Run: make run-api")
return True
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

48
scripts/setup_uv.bat Normal file
View File

@@ -0,0 +1,48 @@
@echo off
REM Setup script for uv-based development environment on Windows
echo 🚀 Setting up Northern Thailand Ping River Monitor with uv...
REM Check if uv is installed
uv --version >nul 2>&1
if %errorlevel% neq 0 (
echo ❌ uv is not installed. Please install it first:
echo powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
exit /b 1
)
echo ✅ uv found
uv --version
REM Initialize uv project if not already initialized
if not exist "uv.lock" (
echo 🔧 Initializing uv project...
uv sync
) else (
echo 📦 Syncing dependencies with uv...
uv sync
)
REM Install pre-commit hooks
echo 🎣 Installing pre-commit hooks...
uv run pre-commit install
REM Create .env file if it doesn't exist
if not exist ".env" (
if exist ".env.example" (
echo 📝 Creating .env file from template...
copy .env.example .env
echo ⚠️ Please edit .env file with your configuration
)
)
echo ✅ Setup complete!
echo.
echo 📚 Quick start commands:
echo make install-dev # Install all dependencies
echo make run-test # Run a test cycle
echo make run-api # Start the web API
echo make test # Run tests
echo make lint # Check code quality
echo.
echo 🎉 Happy monitoring!

46
scripts/setup_uv.sh Normal file
View File

@@ -0,0 +1,46 @@
#!/bin/bash
# Setup script for uv-based development environment
set -e
echo "🚀 Setting up Northern Thailand Ping River Monitor with uv..."
# Check if uv is installed
if ! command -v uv &> /dev/null; then
echo "❌ uv is not installed. Please install it first:"
echo " curl -LsSf https://astral.sh/uv/install.sh | sh"
exit 1
fi
echo "✅ uv found: $(uv --version)"
# Initialize uv project if not already initialized
if [ ! -f "uv.lock" ]; then
echo "🔧 Initializing uv project..."
uv sync
else
echo "📦 Syncing dependencies with uv..."
uv sync
fi
# Install pre-commit hooks
echo "🎣 Installing pre-commit hooks..."
uv run pre-commit install
# Create .env file if it doesn't exist
if [ ! -f ".env" ] && [ -f ".env.example" ]; then
echo "📝 Creating .env file from template..."
cp .env.example .env
echo "⚠️ Please edit .env file with your configuration"
fi
echo "✅ Setup complete!"
echo ""
echo "📚 Quick start commands:"
echo " make install-dev # Install all dependencies"
echo " make run-test # Run a test cycle"
echo " make run-api # Start the web API"
echo " make test # Run tests"
echo " make lint # Check code quality"
echo ""
echo "🎉 Happy monitoring!"

162
sql/init_postgres.sql Normal file
View File

@@ -0,0 +1,162 @@
-- Northern Thailand Ping River Monitor - PostgreSQL Database Schema
-- This script initializes the database tables for water monitoring data
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Create schema for better organization
CREATE SCHEMA IF NOT EXISTS water_monitor;
SET search_path TO water_monitor, public;
-- Stations table - stores monitoring station information
CREATE TABLE IF NOT EXISTS stations (
id SERIAL PRIMARY KEY,
station_code VARCHAR(10) UNIQUE NOT NULL,
thai_name VARCHAR(255) NOT NULL,
english_name VARCHAR(255) NOT NULL,
latitude DECIMAL(10,8),
longitude DECIMAL(11,8),
geohash VARCHAR(20),
elevation DECIMAL(8,2), -- meters above sea level
river_basin VARCHAR(100),
province VARCHAR(100),
district VARCHAR(100),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Water measurements table - stores time series data
CREATE TABLE IF NOT EXISTS water_measurements (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
station_id INTEGER NOT NULL,
water_level NUMERIC(10,3), -- meters
discharge NUMERIC(10,2), -- cubic meters per second
discharge_percent NUMERIC(5,2), -- percentage of normal discharge
status VARCHAR(20) DEFAULT 'active',
data_quality VARCHAR(20) DEFAULT 'good', -- good, fair, poor, missing
remarks TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (station_id) REFERENCES stations(id) ON DELETE CASCADE,
UNIQUE(timestamp, station_id)
);
-- Alert thresholds table - stores warning/danger levels for each station
CREATE TABLE IF NOT EXISTS alert_thresholds (
id SERIAL PRIMARY KEY,
station_id INTEGER NOT NULL,
threshold_type VARCHAR(20) NOT NULL, -- 'warning', 'danger', 'critical'
water_level_min NUMERIC(10,3),
water_level_max NUMERIC(10,3),
discharge_min NUMERIC(10,2),
discharge_max NUMERIC(10,2),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (station_id) REFERENCES stations(id) ON DELETE CASCADE
);
-- Data quality log - tracks data collection issues
CREATE TABLE IF NOT EXISTS data_quality_log (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
station_id INTEGER,
issue_type VARCHAR(50) NOT NULL, -- 'connection_failed', 'invalid_data', 'missing_data'
description TEXT,
severity VARCHAR(20) DEFAULT 'info', -- 'info', 'warning', 'error', 'critical'
resolved_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (station_id) REFERENCES stations(id) ON DELETE SET NULL
);
-- Create indexes for better query performance
CREATE INDEX IF NOT EXISTS idx_water_measurements_timestamp ON water_measurements(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_water_measurements_station_id ON water_measurements(station_id);
CREATE INDEX IF NOT EXISTS idx_water_measurements_station_timestamp ON water_measurements(station_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_water_measurements_status ON water_measurements(status);
CREATE INDEX IF NOT EXISTS idx_stations_code ON stations(station_code);
CREATE INDEX IF NOT EXISTS idx_stations_active ON stations(is_active);
CREATE INDEX IF NOT EXISTS idx_data_quality_timestamp ON data_quality_log(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_data_quality_station ON data_quality_log(station_id);
-- Create a view for latest measurements per station
CREATE OR REPLACE VIEW latest_measurements AS
SELECT
s.id as station_id,
s.station_code,
s.english_name,
s.thai_name,
s.latitude,
s.longitude,
s.province,
s.river_basin,
m.timestamp,
m.water_level,
m.discharge,
m.discharge_percent,
m.status,
m.data_quality,
CASE
WHEN m.timestamp > CURRENT_TIMESTAMP - INTERVAL '2 hours' THEN 'online'
WHEN m.timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours' THEN 'delayed'
ELSE 'offline'
END as station_status
FROM stations s
LEFT JOIN LATERAL (
SELECT * FROM water_measurements
WHERE station_id = s.id
ORDER BY timestamp DESC
LIMIT 1
) m ON true
WHERE s.is_active = true
ORDER BY s.station_code;
-- Create a function to update the updated_at timestamp
CREATE OR REPLACE FUNCTION update_modified_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
-- Create triggers to automatically update updated_at
DROP TRIGGER IF EXISTS update_stations_modtime ON stations;
CREATE TRIGGER update_stations_modtime
BEFORE UPDATE ON stations
FOR EACH ROW
EXECUTE FUNCTION update_modified_column();
-- Insert sample stations (Northern Thailand Ping River stations)
INSERT INTO stations (id, station_code, thai_name, english_name, latitude, longitude, province, river_basin) VALUES
(1, 'P.1', 'เชียงใหม่', 'Chiang Mai', 18.7883, 98.9853, 'Chiang Mai', 'Ping River'),
(2, 'P.4A', 'ท่าแพ', 'Tha Phae', 18.7875, 99.0045, 'Chiang Mai', 'Ping River'),
(3, 'P.12', 'สันป่าตอง', 'San Pa Tong', 18.6167, 98.9500, 'Chiang Mai', 'Ping River'),
(4, 'P.20', 'ลำพูน', 'Lamphun', 18.5737, 99.0081, 'Lamphun', 'Ping River'),
(5, 'P.30', 'ลี้', 'Li', 17.4833, 99.3000, 'Lamphun', 'Ping River'),
(6, 'P.35', 'ป่าซาง', 'Pa Sang', 18.5444, 98.9397, 'Lamphun', 'Ping River'),
(7, 'P.67', 'ตาก', 'Tak', 16.8839, 99.1267, 'Tak', 'Ping River'),
(8, 'P.75', 'สามเงา', 'Sam Ngao', 17.1019, 99.4644, 'Tak', 'Ping River')
ON CONFLICT (id) DO NOTHING;
-- Insert sample alert thresholds
INSERT INTO alert_thresholds (station_id, threshold_type, water_level_min, water_level_max) VALUES
(1, 'warning', 4.5, NULL),
(1, 'danger', 6.0, NULL),
(1, 'critical', 7.5, NULL),
(2, 'warning', 4.0, NULL),
(2, 'danger', 5.5, NULL),
(2, 'critical', 7.0, NULL)
ON CONFLICT DO NOTHING;
-- Grant permissions (adjust as needed for your setup)
GRANT USAGE ON SCHEMA water_monitor TO postgres;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA water_monitor TO postgres;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA water_monitor TO postgres;
-- Optional: Create a read-only user for reporting
-- CREATE USER water_monitor_readonly WITH PASSWORD 'readonly_password';
-- GRANT USAGE ON SCHEMA water_monitor TO water_monitor_readonly;
-- GRANT SELECT ON ALL TABLES IN SCHEMA water_monitor TO water_monitor_readonly;
COMMIT;

513
src/alerting.py Normal file
View File

@@ -0,0 +1,513 @@
#!/usr/bin/env python3
"""
Water Level Alerting System with Matrix Integration
"""
import os
import json
import requests
import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
try:
from .config import Config
from .database_adapters import create_database_adapter
from .logging_config import get_logger
except ImportError:
from config import Config
from database_adapters import create_database_adapter
import logging
def get_logger(name):
return logging.getLogger(name)
logger = get_logger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class WaterAlert:
station_code: str
station_name: str
alert_type: str
level: AlertLevel
water_level: float
threshold: float
discharge: Optional[float] = None
timestamp: Optional[datetime.datetime] = None
message: Optional[str] = None
class MatrixNotifier:
def __init__(self, homeserver: str, access_token: str, room_id: str):
self.homeserver = homeserver.rstrip('/')
self.access_token = access_token
self.room_id = room_id
self.session = requests.Session()
def send_message(self, message: str, msgtype: str = "m.text") -> bool:
"""Send message to Matrix room"""
try:
# Add transaction ID to prevent duplicates
txn_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn_id}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
data = {
"msgtype": msgtype,
"body": message
}
# Matrix API requires PUT when transaction ID is in the URL path
response = self.session.put(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
logger.info(f"Matrix message sent successfully: {response.json().get('event_id')}")
return True
except Exception as e:
logger.error(f"Failed to send Matrix message: {e}")
return False
def send_alert(self, alert: WaterAlert) -> bool:
"""Send formatted water alert to Matrix"""
emoji_map = {
AlertLevel.INFO: "",
AlertLevel.WARNING: "⚠️",
AlertLevel.CRITICAL: "🚨",
AlertLevel.EMERGENCY: "🆘"
}
emoji = emoji_map.get(alert.level, "📊")
message = f"""{emoji} **WATER LEVEL ALERT**
**Station:** {alert.station_code} ({alert.station_name})
**Alert Type:** {alert.alert_type}
**Severity:** {alert.level.value.upper()}
**Current Level:** {alert.water_level:.2f}m
**Threshold:** {alert.threshold:.2f}m
**Difference:** {(alert.water_level - alert.threshold):+.2f}m
"""
if alert.discharge:
message += f"**Discharge:** {alert.discharge:.1f} cms\n"
if alert.timestamp:
message += f"**Time:** {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n"
if alert.message:
message += f"\n**Details:** {alert.message}\n"
message += f"\n📈 View dashboard: {os.getenv('GRAFANA_URL', 'http://localhost:3000')}"
return self.send_message(message)
class WaterLevelAlertSystem:
def __init__(self):
self.db_adapter = None
self.matrix_notifier = None
self.thresholds = self._load_thresholds()
# Matrix configuration from environment
matrix_homeserver = os.getenv('MATRIX_HOMESERVER', 'https://matrix.org')
matrix_token = os.getenv('MATRIX_ACCESS_TOKEN')
matrix_room = os.getenv('MATRIX_ROOM_ID')
if matrix_token and matrix_room:
self.matrix_notifier = MatrixNotifier(matrix_homeserver, matrix_token, matrix_room)
logger.info("Matrix notifications enabled")
else:
logger.warning("Matrix configuration missing - notifications disabled")
def _load_thresholds(self) -> Dict[str, Dict[str, float]]:
"""Load alert thresholds from config or database"""
# Default thresholds for Northern Thailand stations
return {
"P.1": {
# Zone-based thresholds for Nawarat Bridge (P.1)
"zone_1": 3.7,
"zone_2": 3.9,
"zone_3": 4.0,
"zone_4": 4.1,
"zone_5": 4.2,
"zone_6": 4.3,
"zone_7": 4.6,
"zone_8": 4.8,
"newedge": 4.8, # Same as zone 8 or adjust as needed
# Keep legacy thresholds for compatibility
"warning": 3.7,
"critical": 4.3,
"emergency": 4.8
},
"P.4A": {"warning": 4.5, "critical": 6.0, "emergency": 7.5},
"P.20": {"warning": 3.0, "critical": 4.5, "emergency": 6.0},
"P.21": {"warning": 4.0, "critical": 5.5, "emergency": 7.0},
"P.67": {"warning": 6.0, "critical": 8.0, "emergency": 10.0},
"P.75": {"warning": 5.5, "critical": 7.5, "emergency": 9.5},
"P.103": {"warning": 7.0, "critical": 9.0, "emergency": 11.0},
# Default for unknown stations
"default": {"warning": 4.0, "critical": 6.0, "emergency": 8.0}
}
def connect_database(self):
"""Initialize database connection"""
try:
db_config = Config.get_database_config()
self.db_adapter = create_database_adapter(
db_config['type'],
connection_string=db_config['connection_string']
)
if self.db_adapter.connect():
logger.info("Database connection established for alerting")
return True
else:
logger.error("Failed to connect to database")
return False
except Exception as e:
logger.error(f"Database connection error: {e}")
return False
def check_water_levels(self) -> List[WaterAlert]:
"""Check current water levels against thresholds"""
alerts = []
if not self.db_adapter:
logger.error("Database not connected")
return alerts
try:
# Get latest measurements
measurements = self.db_adapter.get_latest_measurements(limit=50)
for measurement in measurements:
station_code = measurement.get('station_code', 'UNKNOWN')
water_level = measurement.get('water_level')
if not water_level:
continue
# Get thresholds for this station
station_thresholds = self.thresholds.get(station_code, self.thresholds['default'])
# Check each threshold level
alert_level = None
threshold_value = None
alert_type = None
# Special handling for P.1 with zone-based thresholds
if station_code == "P.1" and 'zone_1' in station_thresholds:
# Check all zones in reverse order (highest to lowest)
zones = [
("zone_8", 4.8, AlertLevel.EMERGENCY, "Zone 8 - Emergency"),
("newedge", 4.8, AlertLevel.EMERGENCY, "NewEdge Alert Level"),
("zone_7", 4.6, AlertLevel.CRITICAL, "Zone 7 - Critical"),
("zone_6", 4.3, AlertLevel.CRITICAL, "Zone 6 - Critical"),
("zone_5", 4.2, AlertLevel.WARNING, "Zone 5 - Warning"),
("zone_4", 4.1, AlertLevel.WARNING, "Zone 4 - Warning"),
("zone_3", 4.0, AlertLevel.WARNING, "Zone 3 - Warning"),
("zone_2", 3.9, AlertLevel.INFO, "Zone 2 - Info"),
("zone_1", 3.7, AlertLevel.INFO, "Zone 1 - Info"),
]
for zone_name, zone_threshold, zone_alert_level, zone_description in zones:
if water_level >= zone_threshold:
alert_level = zone_alert_level
threshold_value = zone_threshold
alert_type = zone_description
break
else:
# Standard threshold checking for other stations
if water_level >= station_thresholds.get('emergency', float('inf')):
alert_level = AlertLevel.EMERGENCY
threshold_value = station_thresholds['emergency']
alert_type = "Emergency Water Level"
elif water_level >= station_thresholds.get('critical', float('inf')):
alert_level = AlertLevel.CRITICAL
threshold_value = station_thresholds['critical']
alert_type = "Critical Water Level"
elif water_level >= station_thresholds.get('warning', float('inf')):
alert_level = AlertLevel.WARNING
threshold_value = station_thresholds['warning']
alert_type = "High Water Level"
if alert_level:
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
alert_type=alert_type,
level=alert_level,
water_level=water_level,
threshold=threshold_value,
discharge=measurement.get('discharge'),
timestamp=measurement.get('timestamp')
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking water levels: {e}")
return alerts
def check_data_freshness(self, max_age_hours: int = 2) -> List[WaterAlert]:
"""Check if data is fresh enough"""
alerts = []
if not self.db_adapter:
return alerts
try:
measurements = self.db_adapter.get_latest_measurements(limit=20)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=max_age_hours)
for measurement in measurements:
timestamp = measurement.get('timestamp')
if timestamp and timestamp < cutoff_time:
station_code = measurement.get('station_code', 'UNKNOWN')
age_hours = (datetime.datetime.now() - timestamp).total_seconds() / 3600
alert = WaterAlert(
station_code=station_code,
station_name=measurement.get('station_name_th', f'Station {station_code}'),
alert_type="Stale Data",
level=AlertLevel.WARNING,
water_level=measurement.get('water_level', 0),
threshold=max_age_hours,
timestamp=timestamp,
message=f"No fresh data for {age_hours:.1f} hours"
)
alerts.append(alert)
except Exception as e:
logger.error(f"Error checking data freshness: {e}")
return alerts
def check_rate_of_change(self, lookback_hours: int = 3) -> List[WaterAlert]:
"""Check for rapid water level changes over recent hours"""
alerts = []
if not self.db_adapter:
return alerts
try:
# Define rate-of-change thresholds (meters per hour)
rate_thresholds = {
"P.1": {
"warning": 0.15, # 15cm/hour - moderate rise
"critical": 0.25, # 25cm/hour - rapid rise
"emergency": 0.40 # 40cm/hour - very rapid rise
},
"default": {
"warning": 0.20,
"critical": 0.35,
"emergency": 0.50
}
}
# Get recent measurements for each station
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=lookback_hours)
# Get unique stations from latest data
latest = self.db_adapter.get_latest_measurements(limit=20)
station_ids = set(m.get('station_id') for m in latest if m.get('station_id'))
for station_id in station_ids:
try:
# Get measurements for this station in the time window using database adapter
# Try direct connection for SQLite/PostgreSQL
results = []
try:
import sqlite3
import psycopg2
# Check if we have a connection object (SQLite or PostgreSQL)
if hasattr(self.db_adapter, 'conn') and self.db_adapter.conn:
# SQLite/PostgreSQL style query
query = """
SELECT timestamp, water_level, station_id
FROM water_measurements
WHERE station_id = ? AND timestamp >= ?
ORDER BY timestamp ASC
"""
cursor = self.db_adapter.conn.cursor()
cursor.execute(query, (station_id, cutoff_time))
results = cursor.fetchall()
except:
# Fallback: use get_latest_measurements and filter
all_measurements = self.db_adapter.get_latest_measurements(limit=500)
results = []
for m in all_measurements:
if m.get('station_id') == station_id and m.get('timestamp') and m.get('timestamp') >= cutoff_time:
results.append((m['timestamp'], m['water_level'], m.get('station_id')))
if len(results) < 2:
continue # Need at least 2 points to calculate rate
# Get oldest and newest measurements
oldest = results[0]
newest = results[-1]
oldest_time, oldest_level, _ = oldest
newest_time, newest_level, _ = newest
# Convert timestamp strings to datetime if needed
if isinstance(oldest_time, str):
oldest_time = datetime.datetime.fromisoformat(oldest_time)
if isinstance(newest_time, str):
newest_time = datetime.datetime.fromisoformat(newest_time)
# Calculate rate of change
time_diff_hours = (newest_time - oldest_time).total_seconds() / 3600
if time_diff_hours == 0:
continue
level_change = newest_level - oldest_level
rate_per_hour = level_change / time_diff_hours
# Only alert on rising water (positive rate)
if rate_per_hour <= 0:
continue
# Get station info from latest data
station_info = next((m for m in latest if m.get('station_id') == station_id), {})
station_code = station_info.get('station_code', f'Station {station_id}')
station_name = station_info.get('station_name_th', station_code)
# Get thresholds for this station
station_rate_threshold = rate_thresholds.get(station_code, rate_thresholds['default'])
alert_level = None
threshold_value = None
alert_type = None
if rate_per_hour >= station_rate_threshold['emergency']:
alert_level = AlertLevel.EMERGENCY
threshold_value = station_rate_threshold['emergency']
alert_type = "Very Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold['critical']:
alert_level = AlertLevel.CRITICAL
threshold_value = station_rate_threshold['critical']
alert_type = "Rapid Water Level Rise"
elif rate_per_hour >= station_rate_threshold['warning']:
alert_level = AlertLevel.WARNING
threshold_value = station_rate_threshold['warning']
alert_type = "Moderate Water Level Rise"
if alert_level:
message = f"Rising at {rate_per_hour:.2f}m/h over last {time_diff_hours:.1f}h (change: {level_change:+.2f}m)"
alert = WaterAlert(
station_code=station_code,
station_name=station_name or f'Station {station_code}',
alert_type=alert_type,
level=alert_level,
water_level=newest_level,
threshold=threshold_value,
timestamp=newest_time,
message=message
)
alerts.append(alert)
except Exception as station_error:
logger.debug(f"Error checking rate of change for station {station_id}: {station_error}")
continue
except Exception as e:
logger.error(f"Error checking rate of change: {e}")
return alerts
def send_alerts(self, alerts: List[WaterAlert]) -> int:
"""Send alerts via configured channels"""
sent_count = 0
if not alerts:
return sent_count
if self.matrix_notifier:
for alert in alerts:
if self.matrix_notifier.send_alert(alert):
sent_count += 1
# Could add other notification channels here:
# - Email
# - Discord
# - Telegram
# - SMS
return sent_count
def run_alert_check(self) -> Dict[str, int]:
"""Run complete alert check cycle"""
if not self.connect_database():
return {"error": 1}
# Check water levels
water_alerts = self.check_water_levels()
# Check data freshness
data_alerts = self.check_data_freshness()
# Check rate of change (rapid rises)
rate_alerts = self.check_rate_of_change()
# Combine alerts
all_alerts = water_alerts + data_alerts + rate_alerts
# Send alerts
sent_count = self.send_alerts(all_alerts)
logger.info(f"Alert check complete: {len(all_alerts)} alerts, {sent_count} sent")
return {
"water_alerts": len(water_alerts),
"data_alerts": len(data_alerts),
"rate_alerts": len(rate_alerts),
"total_alerts": len(all_alerts),
"sent": sent_count
}
def main():
"""Standalone alerting check"""
import argparse
parser = argparse.ArgumentParser(description="Water Level Alert System")
parser.add_argument("--check", action="store_true", help="Run alert check")
parser.add_argument("--test", action="store_true", help="Send test message")
args = parser.parse_args()
alerting = WaterLevelAlertSystem()
if args.test:
if alerting.matrix_notifier:
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Water Level Alert System.\n\nIf you received this, Matrix notifications are working correctly!"
success = alerting.matrix_notifier.send_message(test_message)
print(f"Test message sent: {success}")
else:
print("Matrix notifier not configured")
elif args.check:
results = alerting.run_alert_check()
print(f"Alert check results: {results}")
else:
print("Use --check or --test")
if __name__ == "__main__":
main()

View File

@@ -1,6 +1,14 @@
import os
from typing import Dict, Any, Optional
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
# python-dotenv not installed, continue without it
pass
try:
from .exceptions import ConfigurationError
from .models import DatabaseType, DatabaseConfig
@@ -49,6 +57,11 @@ class Config:
# PostgreSQL settings
POSTGRES_CONNECTION_STRING = os.getenv('POSTGRES_CONNECTION_STRING')
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_PORT = int(os.getenv('POSTGRES_PORT', '5432'))
POSTGRES_DB = os.getenv('POSTGRES_DB', 'water_monitoring')
POSTGRES_USER = os.getenv('POSTGRES_USER', 'postgres')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
# MySQL settings
MYSQL_CONNECTION_STRING = os.getenv('MYSQL_CONNECTION_STRING')
@@ -93,10 +106,21 @@ class Config:
errors.append("INFLUX_DATABASE is required for InfluxDB")
elif cls.DB_TYPE in ['postgresql', 'mysql']:
connection_string = (cls.POSTGRES_CONNECTION_STRING if cls.DB_TYPE == 'postgresql'
else cls.MYSQL_CONNECTION_STRING)
if not connection_string:
errors.append(f"Connection string is required for {cls.DB_TYPE.upper()}")
if cls.DB_TYPE == 'postgresql':
# Check if either connection string or individual components are provided
if not cls.POSTGRES_CONNECTION_STRING:
# If no connection string, check individual components
if not cls.POSTGRES_HOST:
errors.append("POSTGRES_HOST is required for PostgreSQL")
if not cls.POSTGRES_USER:
errors.append("POSTGRES_USER is required for PostgreSQL")
if not cls.POSTGRES_PASSWORD:
errors.append("POSTGRES_PASSWORD is required for PostgreSQL")
if not cls.POSTGRES_DB:
errors.append("POSTGRES_DB is required for PostgreSQL")
else: # mysql
if not cls.MYSQL_CONNECTION_STRING:
errors.append("MYSQL_CONNECTION_STRING is required for MySQL")
# Validate numeric settings
if cls.SCRAPING_INTERVAL_HOURS <= 0:
@@ -129,11 +153,21 @@ class Config:
'password': cls.INFLUX_PASSWORD
}
elif cls.DB_TYPE == 'postgresql':
return {
'type': 'postgresql',
'connection_string': cls.POSTGRES_CONNECTION_STRING or
'postgresql://postgres:password@localhost:5432/water_monitoring'
}
# Use individual components if POSTGRES_CONNECTION_STRING is not provided
if cls.POSTGRES_CONNECTION_STRING:
return {
'type': 'postgresql',
'connection_string': cls.POSTGRES_CONNECTION_STRING
}
else:
# Build connection string from components (automatically URL-encodes password)
import urllib.parse
password = urllib.parse.quote(cls.POSTGRES_PASSWORD or 'password', safe='')
connection_string = f'postgresql://{cls.POSTGRES_USER}:{password}@{cls.POSTGRES_HOST}:{cls.POSTGRES_PORT}/{cls.POSTGRES_DB}'
return {
'type': 'postgresql',
'connection_string': connection_string
}
elif cls.DB_TYPE == 'mysql':
return {
'type': 'mysql',

View File

@@ -28,6 +28,10 @@ class DatabaseAdapter(ABC):
station_codes: Optional[List[str]] = None) -> List[Dict]:
pass
@abstractmethod
def get_measurements_for_date(self, target_date: datetime.datetime) -> List[Dict]:
pass
# InfluxDB Adapter
class InfluxDBAdapter(DatabaseAdapter):
def __init__(self, host: str = "localhost", port: int = 8086,
@@ -525,6 +529,52 @@ class SQLAdapter(DatabaseAdapter):
logging.error(f"Error querying {self.db_type.upper()}: {e}")
return []
def get_measurements_for_date(self, target_date: datetime.datetime) -> List[Dict]:
"""Get all measurements for a specific date"""
if not self.engine:
return []
try:
from sqlalchemy import text
# Get start and end of the target date
start_of_day = target_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = target_date.replace(hour=23, minute=59, second=59, microsecond=999999)
query = """
SELECT m.timestamp, m.station_id, s.station_code, s.thai_name,
m.water_level, m.discharge, m.discharge_percent, m.status
FROM water_measurements m
LEFT JOIN stations s ON m.station_id = s.id
WHERE m.timestamp >= :start_time AND m.timestamp <= :end_time
ORDER BY m.timestamp DESC
"""
with self.engine.connect() as conn:
result = conn.execute(text(query), {
'start_time': start_of_day,
'end_time': end_of_day
})
measurements = []
for row in result:
measurements.append({
'timestamp': row[0],
'station_id': row[1],
'station_code': row[2] or f"Station_{row[1]}",
'station_name_th': row[3] or f"Station {row[1]}",
'water_level': float(row[4]) if row[4] else None,
'discharge': float(row[5]) if row[5] else None,
'discharge_percent': float(row[6]) if row[6] else None,
'status': row[7]
})
return measurements
except Exception as e:
logging.error(f"Error querying {self.db_type.upper()} for date {target_date.date()}: {e}")
return []
# VictoriaMetrics Adapter (using Prometheus format)
class VictoriaMetricsAdapter(DatabaseAdapter):
def __init__(self, host: str = "localhost", port: int = 8428):
@@ -638,6 +688,11 @@ class VictoriaMetricsAdapter(DatabaseAdapter):
logging.warning("get_measurements_by_timerange not fully implemented for VictoriaMetrics")
return []
def get_measurements_for_date(self, target_date: datetime.datetime) -> List[Dict]:
"""Get all measurements for a specific date"""
logging.warning("get_measurements_for_date not fully implemented for VictoriaMetrics")
return []
# Factory function to create appropriate adapter
def create_database_adapter(db_type: str, **kwargs) -> DatabaseAdapter:
"""

View File

@@ -7,6 +7,7 @@ import argparse
import asyncio
import sys
import signal
import time
from datetime import datetime
from typing import Optional
@@ -63,7 +64,7 @@ def run_test_cycle():
return False
def run_continuous_monitoring():
"""Run continuous monitoring with scheduling"""
"""Run continuous monitoring with adaptive scheduling"""
logger.info("Starting continuous monitoring...")
try:
@@ -78,20 +79,57 @@ def run_continuous_monitoring():
setup_signal_handlers(scraper)
logger.info(f"Monitoring started with {Config.SCRAPING_INTERVAL_HOURS}h interval")
logger.info("Adaptive retry: switches to 1-minute intervals when no data available")
logger.info("Press Ctrl+C to stop")
# Run initial cycle
logger.info("Running initial data collection...")
scraper.run_scraping_cycle()
initial_success = scraper.run_scraping_cycle()
# Start scheduled monitoring
import schedule
# Adaptive scheduling state
from datetime import datetime, timedelta
retry_mode = not initial_success
last_successful_fetch = None if not initial_success else datetime.now()
schedule.every(Config.SCRAPING_INTERVAL_HOURS).hours.do(scraper.run_scraping_cycle)
if retry_mode:
logger.warning("No data fetched in initial run - entering retry mode")
next_run = datetime.now() + timedelta(minutes=1)
else:
logger.info("Initial data fetch successful - using hourly schedule")
next_run = (datetime.now() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
logger.info(f"Next run at {next_run.strftime('%H:%M')}")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
current_time = datetime.now()
if current_time >= next_run:
logger.info("Running scheduled data collection...")
success = scraper.run_scraping_cycle()
if success:
last_successful_fetch = current_time
if retry_mode:
logger.info("✅ Data fetch successful - switching back to hourly schedule")
retry_mode = False
# Schedule next run at the next full hour
next_run = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
else:
# Continue hourly schedule
next_run = (current_time + timedelta(hours=Config.SCRAPING_INTERVAL_HOURS)).replace(minute=0, second=0, microsecond=0)
logger.info(f"Next scheduled run at {next_run.strftime('%H:%M')}")
else:
if not retry_mode:
logger.warning("⚠️ No data fetched - switching to retry mode (1-minute intervals)")
retry_mode = True
# Schedule retry in 1 minute
next_run = current_time + timedelta(minutes=1)
logger.info(f"Retrying in 1 minute at {next_run.strftime('%H:%M')}")
# Sleep for 10 seconds and check again
time.sleep(10)
except KeyboardInterrupt:
logger.info("Monitoring stopped by user")
@@ -153,6 +191,45 @@ def run_data_update(days_back: int):
logger.error(f"❌ Data update failed: {e}")
return False
def run_historical_import(start_date_str: str, end_date_str: str, skip_existing: bool = True):
"""Import historical data for a date range"""
try:
# Parse dates
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
if start_date > end_date:
logger.error("Start date must be before or equal to end date")
return False
logger.info(f"Importing historical data from {start_date.date()} to {end_date.date()}")
if skip_existing:
logger.info("Skipping dates that already have data")
# Validate configuration
Config.validate_config()
# Initialize scraper
db_config = Config.get_database_config()
scraper = EnhancedWaterMonitorScraper(db_config)
# Import historical data
imported_count = scraper.import_historical_data(start_date, end_date, skip_existing)
if imported_count > 0:
logger.info(f"✅ Imported {imported_count} historical data points")
else:
logger.info("✅ No new data imported")
return True
except ValueError as e:
logger.error(f"❌ Invalid date format. Use YYYY-MM-DD: {e}")
return False
except Exception as e:
logger.error(f"❌ Historical import failed: {e}")
return False
def run_web_api():
"""Run the FastAPI web interface"""
logger.info("Starting web API server...")
@@ -179,6 +256,65 @@ def run_web_api():
logger.error(f"Web API failed: {e}")
return False
def run_alert_check():
"""Run water level alert check"""
logger.info("Running water level alert check...")
try:
from .alerting import WaterLevelAlertSystem
# Initialize alerting system
alerting = WaterLevelAlertSystem()
# Run alert check
results = alerting.run_alert_check()
if 'error' in results:
logger.error("❌ Alert check failed due to database connection")
return False
logger.info(f"✅ Alert check completed:")
logger.info(f" • Water level alerts: {results['water_alerts']}")
logger.info(f" • Data freshness alerts: {results['data_alerts']}")
logger.info(f" • Total alerts generated: {results['total_alerts']}")
logger.info(f" • Alerts sent: {results['sent']}")
return True
except Exception as e:
logger.error(f"❌ Alert check failed: {e}")
return False
def run_alert_test():
"""Send test alert message"""
logger.info("Sending test alert message...")
try:
from .alerting import WaterLevelAlertSystem
# Initialize alerting system
alerting = WaterLevelAlertSystem()
if not alerting.matrix_notifier:
logger.error("❌ Matrix notifier not configured")
logger.info("Please set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in your .env file")
return False
# Send test message
test_message = "🧪 **Test Alert**\n\nThis is a test message from the Northern Thailand Ping River Monitor.\n\nIf you received this, Matrix notifications are working correctly!"
success = alerting.matrix_notifier.send_message(test_message)
if success:
logger.info("✅ Test alert message sent successfully")
else:
logger.error("❌ Test alert message failed to send")
return success
except Exception as e:
logger.error(f"❌ Test alert failed: {e}")
return False
def show_status():
"""Show current system status"""
logger.info("=== Northern Thailand Ping River Monitor Status ===")
@@ -209,6 +345,20 @@ def show_status():
else:
logger.error("❌ Database connection failed")
# Test alerting system
logger.info("\n=== Alerting System Status ===")
try:
from .alerting import WaterLevelAlertSystem
alerting = WaterLevelAlertSystem()
if alerting.matrix_notifier:
logger.info("✅ Matrix notifications configured")
else:
logger.warning("⚠️ Matrix notifications not configured")
logger.info("Set MATRIX_ACCESS_TOKEN and MATRIX_ROOM_ID in .env file")
except Exception as e:
logger.error(f"❌ Alerting system error: {e}")
# Show metrics if available
metrics_collector = get_metrics_collector()
metrics = metrics_collector.get_all_metrics()
@@ -237,7 +387,10 @@ Examples:
%(prog)s --web-api # Start web API server
%(prog)s --fill-gaps 7 # Fill missing data for last 7 days
%(prog)s --update-data 2 # Update existing data for last 2 days
%(prog)s --import-historical 2024-01-01 2024-01-31 # Import historical data
%(prog)s --status # Show system status
%(prog)s --alert-check # Check water levels and send alerts
%(prog)s --alert-test # Send test Matrix message
"""
)
@@ -267,12 +420,37 @@ Examples:
help="Update existing data for the specified number of days back"
)
parser.add_argument(
"--import-historical",
nargs=2,
metavar=("START_DATE", "END_DATE"),
help="Import historical data for date range (YYYY-MM-DD format)"
)
parser.add_argument(
"--force-overwrite",
action="store_true",
help="Overwrite existing data when importing historical data"
)
parser.add_argument(
"--status",
action="store_true",
help="Show current system status"
)
parser.add_argument(
"--alert-check",
action="store_true",
help="Run water level alert check"
)
parser.add_argument(
"--alert-test",
action="store_true",
help="Send test alert message to Matrix"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
@@ -311,8 +489,16 @@ Examples:
success = run_gap_filling(args.fill_gaps)
elif args.update_data is not None:
success = run_data_update(args.update_data)
elif args.import_historical is not None:
start_date, end_date = args.import_historical
skip_existing = not args.force_overwrite
success = run_historical_import(start_date, end_date, skip_existing)
elif args.status:
success = show_status()
elif args.alert_check:
success = run_alert_check()
elif args.alert_test:
success = run_alert_test()
else:
success = run_continuous_monitoring()

View File

@@ -26,8 +26,8 @@ class DataValidator:
def validate_measurement(cls, measurement: Dict[str, Any]) -> bool:
"""Validate a single measurement"""
try:
# Check required fields
required_fields = ['timestamp', 'station_id', 'water_level', 'discharge']
# Check required fields (discharge is now optional)
required_fields = ['timestamp', 'station_id', 'water_level']
for field in required_fields:
if field not in measurement:
logger.warning(f"Missing required field: {field}")
@@ -38,17 +38,22 @@ class DataValidator:
logger.warning(f"Invalid timestamp type: {type(measurement['timestamp'])}")
return False
# Validate water level
# Validate water level (required)
if measurement['water_level'] is None:
logger.warning("Water level cannot be None")
return False
water_level = float(measurement['water_level'])
if not (cls.WATER_LEVEL_MIN <= water_level <= cls.WATER_LEVEL_MAX):
logger.warning(f"Water level out of range: {water_level}")
return False
# Validate discharge
discharge = float(measurement['discharge'])
if not (cls.DISCHARGE_MIN <= discharge <= cls.DISCHARGE_MAX):
logger.warning(f"Discharge out of range: {discharge}")
return False
# Validate discharge (optional - can be None)
discharge_value = measurement.get('discharge')
if discharge_value is not None:
discharge = float(discharge_value)
if not (cls.DISCHARGE_MIN <= discharge <= cls.DISCHARGE_MAX):
logger.warning(f"Discharge out of range: {discharge}")
return False
# Validate discharge percent if present
if measurement.get('discharge_percent') is not None:

View File

@@ -338,21 +338,39 @@ class EnhancedWaterMonitorScraper:
q_key = f'qvalues{station_num}'
qp_key = f'QPercent{station_num}'
# Check if both water level and discharge data exist
if wl_key in row and q_key in row:
# Check if water level data exists (required)
if wl_key in row:
try:
water_level = row[wl_key]
discharge = row[q_key]
discharge_percent = row.get(qp_key)
# Skip if values are None or invalid
if water_level is None or discharge is None:
# Skip if water level is None or invalid
if water_level is None:
continue
# Convert to float
# Convert water level to float (required)
water_level = float(water_level)
discharge = float(discharge)
discharge_percent = float(discharge_percent) if discharge_percent is not None else None
# Try to parse discharge data (optional)
discharge = None
discharge_percent = None
if q_key in row:
try:
discharge_raw = row[q_key]
if discharge_raw is not None and discharge_raw != "***":
discharge = float(discharge_raw)
# Only parse discharge percent if discharge is valid
discharge_percent_raw = row.get(qp_key)
if discharge_percent_raw is not None:
try:
discharge_percent = float(discharge_percent_raw)
except (ValueError, TypeError):
discharge_percent = None
else:
logger.debug(f"Skipping malformed discharge data for station {station_num}: {discharge_raw}")
except (ValueError, TypeError) as e:
logger.debug(f"Could not parse discharge for station {station_num}: {e}")
station_info = self.station_mapping.get(str(station_num), {
'code': f'P.{19+station_num}',
@@ -380,7 +398,7 @@ class EnhancedWaterMonitorScraper:
station_count += 1
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse data for station {station_num}: {e}")
logger.warning(f"Could not parse water level for station {station_num}: {e}")
continue
logger.debug(f"Processed {station_count} stations for time {time_str}")
@@ -407,9 +425,34 @@ class EnhancedWaterMonitorScraper:
return None
def fetch_water_data(self) -> Optional[List[Dict]]:
"""Fetch water levels and discharge data from API for current date"""
current_date = datetime.datetime.now()
return self.fetch_water_data_for_date(current_date)
"""Fetch water levels and discharge data from API with smart date selection"""
current_time = datetime.datetime.now()
# If it's past 01:00, try today's data first, then yesterday as fallback
if current_time.hour >= 1:
logger.info("After 01:00 - trying today's data first, will fallback to yesterday if needed")
# Try today's data first
today_data = self.fetch_water_data_for_date(current_time)
if today_data and len(today_data) > 0:
logger.info(f"Successfully fetched {len(today_data)} data points for today")
return today_data
# Fallback to yesterday's data
logger.info("No data available for today, trying yesterday's data")
yesterday = current_time - datetime.timedelta(days=1)
yesterday_data = self.fetch_water_data_for_date(yesterday)
if yesterday_data and len(yesterday_data) > 0:
logger.info(f"Successfully fetched {len(yesterday_data)} data points for yesterday")
return yesterday_data
logger.warning("No data available for today or yesterday")
return None
else:
# Before 01:00 - only try yesterday's data (API likely hasn't updated yet)
logger.info("Before 01:00 - fetching yesterday's data only")
yesterday = current_time - datetime.timedelta(days=1)
return self.fetch_water_data_for_date(yesterday)
def save_to_database(self, water_data: List[Dict], max_retries: int = 3) -> bool:
"""Save water measurements to database with retry logic"""
@@ -456,21 +499,68 @@ class EnhancedWaterMonitorScraper:
logger.error(f"Error getting latest data: {e}")
return []
def _check_data_freshness(self, water_data: List[Dict]) -> bool:
"""Check if the fetched data contains new data for the current hour"""
if not water_data:
return False
current_time = datetime.datetime.now()
current_hour = current_time.hour
# Find the most recent timestamp in the data
latest_timestamp = None
for data_point in water_data:
timestamp = data_point.get('timestamp')
if timestamp and (latest_timestamp is None or timestamp > latest_timestamp):
latest_timestamp = timestamp
if latest_timestamp is None:
logger.warning("No valid timestamps found in data")
return False
latest_hour = latest_timestamp.hour
time_diff = current_time - latest_timestamp
minutes_old = time_diff.total_seconds() / 60
logger.info(f"Current time: {current_time.strftime('%H:%M')}, Latest data: {latest_timestamp.strftime('%H:%M')}")
logger.info(f"Current hour: {current_hour}, Latest data hour: {latest_hour}, Age: {minutes_old:.1f} minutes")
# Strict check: we need data from the current hour
# If it's 20:xx and we only have data up to 19:xx, that's stale - go to retry mode
has_current_hour_data = latest_hour >= current_hour
if not has_current_hour_data:
logger.warning(f"No new data available - expected hour {current_hour}, got {latest_hour}")
logger.warning("Switching to retry mode until new data becomes available")
return False
else:
logger.info(f"Fresh data available for current hour {current_hour}")
return True
def run_scraping_cycle(self) -> bool:
"""Run a complete scraping cycle"""
"""Run a complete scraping cycle with freshness check"""
logger.info("Starting scraping cycle...")
try:
# Fetch current data
water_data = self.fetch_water_data()
if water_data:
success = self.save_to_database(water_data)
if success:
logger.info("Scraping cycle completed successfully")
increment_counter("scraping_cycles_successful")
return True
# Check if data is fresh/recent
is_fresh = self._check_data_freshness(water_data)
if is_fresh:
success = self.save_to_database(water_data)
if success:
logger.info("Scraping cycle completed successfully with fresh data")
increment_counter("scraping_cycles_successful")
return True
else:
logger.error("Failed to save data")
increment_counter("scraping_cycles_failed")
return False
else:
logger.error("Failed to save data")
# Data exists but is stale
logger.warning("Data fetched but is stale - treating as no fresh data available")
increment_counter("scraping_cycles_failed")
return False
else:
@@ -483,6 +573,151 @@ class EnhancedWaterMonitorScraper:
increment_counter("scraping_cycles_failed")
return False
def fill_data_gaps(self, days_back: int) -> int:
"""Fill gaps in data for the specified number of days back"""
logger = get_logger(__name__)
filled_count = 0
try:
# Calculate date range
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)
logger.info(f"Checking for gaps from {start_date.date()} to {end_date.date()}")
# Iterate through each date in the range
current_date = start_date
while current_date <= end_date:
# Check if we have data for this date
has_data = self._check_data_exists_for_date(current_date)
if not has_data:
logger.info(f"Filling gap for date: {current_date.date()}")
# Fetch data for this specific date
data = self.fetch_water_data_for_date(current_date)
if data:
# Save the data
if self.save_to_database(data):
filled_count += len(data)
logger.info(f"Filled {len(data)} measurements for {current_date.date()}")
else:
logger.warning(f"Failed to save data for {current_date.date()}")
else:
logger.warning(f"No data available for {current_date.date()}")
current_date += datetime.timedelta(days=1)
except Exception as e:
logger.error(f"Gap filling error: {e}")
return filled_count
def update_existing_data(self, days_back: int) -> int:
"""Update existing data with latest values for the specified number of days back"""
logger = get_logger(__name__)
updated_count = 0
try:
# Calculate date range
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days=days_back)
logger.info(f"Updating data from {start_date.date()} to {end_date.date()}")
# Iterate through each date in the range
current_date = start_date
while current_date <= end_date:
logger.info(f"Updating data for date: {current_date.date()}")
# Fetch fresh data for this date
data = self.fetch_water_data_for_date(current_date)
if data:
# Save the data (this will update existing records)
if self.save_to_database(data):
updated_count += len(data)
logger.info(f"Updated {len(data)} measurements for {current_date.date()}")
else:
logger.warning(f"Failed to update data for {current_date.date()}")
else:
logger.warning(f"No data available for {current_date.date()}")
current_date += datetime.timedelta(days=1)
except Exception as e:
logger.error(f"Data update error: {e}")
return updated_count
def _check_data_exists_for_date(self, target_date: datetime.datetime) -> bool:
"""Check if data exists for a specific date"""
try:
if not self.db_adapter:
return False
# Get data for the specific date
measurements = self.db_adapter.get_measurements_for_date(target_date)
return len(measurements) > 0
except Exception as e:
logger = get_logger(__name__)
logger.debug(f"Error checking data existence: {e}")
return False
def import_historical_data(self, start_date: datetime.datetime, end_date: datetime.datetime,
skip_existing: bool = True) -> int:
"""
Import historical data for a date range
Args:
start_date: Start date for historical import
end_date: End date for historical import
skip_existing: Skip dates that already have data (default: True)
Returns:
Number of data points imported
"""
logger.info(f"Starting historical data import from {start_date.date()} to {end_date.date()}")
total_imported = 0
current_date = start_date
while current_date <= end_date:
try:
# Check if data already exists for this date
if skip_existing and self._check_data_exists_for_date(current_date):
logger.info(f"Data already exists for {current_date.date()}, skipping...")
current_date += datetime.timedelta(days=1)
continue
logger.info(f"Importing data for {current_date.date()}...")
# Fetch data for this date
data = self.fetch_water_data_for_date(current_date)
if data:
# Save to database
if self.save_to_database(data):
total_imported += len(data)
logger.info(f"Successfully imported {len(data)} data points for {current_date.date()}")
else:
logger.warning(f"Failed to save data for {current_date.date()}")
else:
logger.warning(f"No data available for {current_date.date()}")
# Add small delay to be respectful to the API
time.sleep(1)
except Exception as e:
logger.error(f"Error importing data for {current_date.date()}: {e}")
current_date += datetime.timedelta(days=1)
logger.info(f"Historical import completed. Total data points imported: {total_imported}")
return total_imported
# Main execution for standalone usage
if __name__ == "__main__":
import argparse

3653
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff