Some checks failed
Security & Dependency Updates / Dependency Security Scan (push) Successful in 29s
Security & Dependency Updates / Docker Security Scan (push) Failing after 53s
Security & Dependency Updates / License Compliance (push) Successful in 13s
Security & Dependency Updates / Check for Dependency Updates (push) Successful in 19s
Security & Dependency Updates / Code Quality Metrics (push) Successful in 11s
Security & Dependency Updates / Security Summary (push) Successful in 7s
Features: - Real-time water level monitoring for Ping River Basin (16 stations) - Coverage from Chiang Dao to Nakhon Sawan in Northern Thailand - FastAPI web interface with interactive dashboard and station management - Multi-database support (SQLite, MySQL, PostgreSQL, InfluxDB, VictoriaMetrics) - Comprehensive monitoring with health checks and metrics collection - Docker deployment with Grafana integration - Production-ready architecture with enterprise-grade observability CI/CD & Automation: - Complete Gitea Actions workflows for CI/CD, security, and releases - Multi-Python version testing (3.9-3.12) - Multi-architecture Docker builds (amd64, arm64) - Daily security scanning and dependency monitoring - Automated documentation generation - Performance testing and validation Production Ready: - Type safety with Pydantic models and comprehensive type hints - Data validation layer with range checking and error handling - Rate limiting and request tracking for API protection - Enhanced logging with rotation, colors, and performance metrics - Station management API for dynamic CRUD operations - Comprehensive documentation and deployment guides Technical Stack: - Python 3.9+ with FastAPI and Pydantic - Multi-database architecture with adapter pattern - Docker containerization with multi-stage builds - Grafana dashboards for visualization - Gitea Actions for CI/CD automation - Enterprise monitoring and alerting Ready for deployment to B4L infrastructure!
210 lines
6.2 KiB
Python
210 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integration test script for the enhanced water monitoring system
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Add project root to Python path
|
|
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, project_root)
|
|
|
|
def test_imports():
|
|
"""Test that all modules can be imported"""
|
|
print("Testing imports...")
|
|
|
|
try:
|
|
from src.config import Config
|
|
from src.models import WaterMeasurement, StationInfo
|
|
from src.exceptions import WaterMonitorException
|
|
from src.validators import DataValidator
|
|
from src.metrics import get_metrics_collector
|
|
from src.health_check import HealthCheckManager
|
|
from src.rate_limiter import RateLimiter
|
|
from src.logging_config import setup_logging
|
|
print("✅ All imports successful")
|
|
return True
|
|
except ImportError as e:
|
|
print(f"❌ Import failed: {e}")
|
|
return False
|
|
|
|
def test_configuration():
|
|
"""Test configuration validation"""
|
|
print("Testing configuration...")
|
|
|
|
try:
|
|
from src.config import Config
|
|
|
|
# Test configuration loading
|
|
settings = Config.get_all_settings()
|
|
print(f"✅ Configuration loaded: {len(settings)} settings")
|
|
|
|
# Test database config
|
|
db_config = Config.get_database_config()
|
|
print(f"✅ Database config: {db_config['type']}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Configuration test failed: {e}")
|
|
return False
|
|
|
|
def test_metrics():
|
|
"""Test metrics collection"""
|
|
print("Testing metrics...")
|
|
|
|
try:
|
|
from src.metrics import increment_counter, set_gauge, record_histogram, get_metrics_collector
|
|
|
|
# Test metrics
|
|
increment_counter("test_counter", 5)
|
|
set_gauge("test_gauge", 42.0)
|
|
record_histogram("test_histogram", 1.5)
|
|
|
|
# Get metrics
|
|
collector = get_metrics_collector()
|
|
metrics = collector.get_all_metrics()
|
|
|
|
print(f"✅ Metrics collected: {len(metrics['counters'])} counters, {len(metrics['gauges'])} gauges")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Metrics test failed: {e}")
|
|
return False
|
|
|
|
def test_validation():
|
|
"""Test data validation"""
|
|
print("Testing data validation...")
|
|
|
|
try:
|
|
from src.validators import DataValidator
|
|
|
|
# Test valid measurement
|
|
valid_measurement = {
|
|
'timestamp': datetime.now(),
|
|
'station_id': 1,
|
|
'water_level': 5.5,
|
|
'discharge': 100.0,
|
|
'discharge_percent': 75.0
|
|
}
|
|
|
|
result = DataValidator.validate_measurement(valid_measurement)
|
|
if result:
|
|
print("✅ Valid measurement passed validation")
|
|
else:
|
|
print("❌ Valid measurement failed validation")
|
|
return False
|
|
|
|
# Test invalid measurement
|
|
invalid_measurement = {
|
|
'timestamp': datetime.now(),
|
|
'station_id': 999, # Invalid station ID
|
|
'water_level': -999.0, # Invalid water level
|
|
'discharge': -50.0 # Invalid discharge
|
|
}
|
|
|
|
result = DataValidator.validate_measurement(invalid_measurement)
|
|
if not result:
|
|
print("✅ Invalid measurement correctly rejected")
|
|
else:
|
|
print("❌ Invalid measurement incorrectly accepted")
|
|
return False
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Validation test failed: {e}")
|
|
return False
|
|
|
|
def test_rate_limiter():
|
|
"""Test rate limiting"""
|
|
print("Testing rate limiter...")
|
|
|
|
try:
|
|
from src.rate_limiter import RateLimiter
|
|
|
|
# Create rate limiter (2 requests per second)
|
|
limiter = RateLimiter(max_requests=2, time_window_seconds=1)
|
|
|
|
# Test allowed requests
|
|
for i in range(2):
|
|
if not limiter.is_allowed():
|
|
print(f"❌ Request {i+1} should be allowed")
|
|
return False
|
|
|
|
# Test blocked request
|
|
if limiter.is_allowed():
|
|
print("❌ Third request should be blocked")
|
|
return False
|
|
|
|
print("✅ Rate limiter working correctly")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Rate limiter test failed: {e}")
|
|
return False
|
|
|
|
def test_logging():
|
|
"""Test logging configuration"""
|
|
print("Testing logging...")
|
|
|
|
try:
|
|
from src.logging_config import setup_logging, get_logger
|
|
|
|
# Setup logging
|
|
logger = setup_logging(log_level="INFO", enable_console=False)
|
|
|
|
# Get a logger
|
|
test_logger = get_logger("test")
|
|
test_logger.info("Test log message")
|
|
|
|
print("✅ Logging setup successful")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Logging test failed: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Run all tests"""
|
|
print("🧪 Running integration tests for Northern Thailand Ping River Monitor v3.1.0")
|
|
print("=" * 60)
|
|
|
|
tests = [
|
|
test_imports,
|
|
test_configuration,
|
|
test_metrics,
|
|
test_validation,
|
|
test_rate_limiter,
|
|
test_logging
|
|
]
|
|
|
|
passed = 0
|
|
failed = 0
|
|
|
|
for test in tests:
|
|
try:
|
|
if test():
|
|
passed += 1
|
|
else:
|
|
failed += 1
|
|
except Exception as e:
|
|
print(f"❌ Test {test.__name__} crashed: {e}")
|
|
failed += 1
|
|
print()
|
|
|
|
print("=" * 60)
|
|
print(f"Test Results: {passed} passed, {failed} failed")
|
|
|
|
if failed == 0:
|
|
print("🎉 All tests passed! The system is ready to use.")
|
|
print("\nNext steps:")
|
|
print("1. Run 'python run.py --test' to test data collection")
|
|
print("2. Run 'python run.py --web-api' to start the web interface")
|
|
print("3. Visit http://localhost:8000 for the dashboard")
|
|
return True
|
|
else:
|
|
print("❌ Some tests failed. Please check the errors above.")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1) |