#!/usr/bin/env python3 """ FastAPI web interface for water monitoring system """ import asyncio import threading from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from .water_scraper_v3 import EnhancedWaterMonitorScraper from .config import Config from .models import WaterMeasurement, StationInfo, ScrapingResult, StationCreateRequest, StationUpdateRequest, StationStatus from .health_check import HealthCheckManager, DatabaseHealthCheck, APIHealthCheck, MemoryHealthCheck from .metrics import get_metrics_collector, increment_counter, set_gauge from .logging_config import setup_logging, get_logger logger = get_logger(__name__) # Pydantic models for API responses class StationResponse(BaseModel): station_id: int station_code: str thai_name: str english_name: str latitude: Optional[float] = None longitude: Optional[float] = None geohash: Optional[str] = None status: str = "active" class StationCreateModel(BaseModel): station_code: str = Field(..., description="Station code (e.g., P.1, P.20)") thai_name: str = Field(..., description="Thai name of the station") english_name: str = Field(..., description="English name of the station") latitude: Optional[float] = Field(None, ge=-90, le=90, description="Latitude coordinate") longitude: Optional[float] = Field(None, ge=-180, le=180, description="Longitude coordinate") geohash: Optional[str] = Field(None, description="Geohash for the location") status: str = Field("active", description="Station status") class StationUpdateModel(BaseModel): thai_name: Optional[str] = Field(None, description="Thai name of the station") english_name: Optional[str] = Field(None, description="English name of the station") latitude: Optional[float] = Field(None, ge=-90, le=90, description="Latitude coordinate") longitude: Optional[float] = Field(None, ge=-180, le=180, description="Longitude coordinate") geohash: Optional[str] = Field(None, description="Geohash for the location") status: Optional[str] = Field(None, description="Station status") class MeasurementResponse(BaseModel): timestamp: datetime station_code: str station_name_en: str station_name_th: str water_level: float discharge: float discharge_percent: Optional[float] = None status: str = "active" class HealthResponse(BaseModel): overall_status: str timestamp: str checks: Dict[str, Dict[str, Any]] class MetricsResponse(BaseModel): counters: Dict[str, float] gauges: Dict[str, float] histograms: Dict[str, Dict[str, float]] class ScrapingStatusResponse(BaseModel): is_running: bool last_run: Optional[datetime] = None next_run: Optional[datetime] = None total_runs: int = 0 successful_runs: int = 0 failed_runs: int = 0 # Global application state app_state = { "scraper": None, "health_manager": None, "scraping_task": None, "is_scraping": False, "scraping_stats": { "total_runs": 0, "successful_runs": 0, "failed_runs": 0, "last_run": None, "next_run": None } } @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager""" # Startup logger.info("Starting Water Monitor API...") # Initialize configuration try: Config.validate_config() logger.info("Configuration validated successfully") except Exception as e: logger.error(f"Configuration validation failed: {e}") raise # Initialize scraper db_config = Config.get_database_config() app_state["scraper"] = EnhancedWaterMonitorScraper(db_config) # Initialize health checks health_manager = HealthCheckManager() health_manager.add_check(DatabaseHealthCheck(app_state["scraper"].db_adapter)) health_manager.add_check(APIHealthCheck(Config.API_URL, app_state["scraper"].session)) health_manager.add_check(MemoryHealthCheck(max_memory_mb=1000)) app_state["health_manager"] = health_manager # Start background scraping task app_state["scraping_task"] = asyncio.create_task(background_scraping_task()) logger.info("Water Monitor API started successfully") yield # Shutdown logger.info("Shutting down Water Monitor API...") if app_state["scraping_task"]: app_state["scraping_task"].cancel() try: await app_state["scraping_task"] except asyncio.CancelledError: pass logger.info("Water Monitor API shutdown complete") # Create FastAPI app app = FastAPI( title="Northern Thailand Ping River Monitor API", description="Real-time water level monitoring system for Northern Thailand's Ping River Basin stations", version="3.1.3", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) async def background_scraping_task(): """Background task for periodic data scraping""" while True: try: if not app_state["is_scraping"]: app_state["is_scraping"] = True # Run scraping cycle scraper = app_state["scraper"] if scraper: logger.info("Starting background scraping cycle") start_time = datetime.now() try: result = scraper.run_scraping_cycle() # Update stats app_state["scraping_stats"]["total_runs"] += 1 app_state["scraping_stats"]["last_run"] = start_time if result: app_state["scraping_stats"]["successful_runs"] += 1 increment_counter("scraping_cycles_successful") logger.info("Background scraping cycle completed successfully") else: app_state["scraping_stats"]["failed_runs"] += 1 increment_counter("scraping_cycles_failed") logger.warning("Background scraping cycle completed with no new data") # Update metrics set_gauge("last_scraping_timestamp", start_time.timestamp()) except Exception as e: app_state["scraping_stats"]["failed_runs"] += 1 increment_counter("scraping_cycles_failed") logger.error(f"Background scraping cycle failed: {e}") app_state["is_scraping"] = False # Calculate next run time interval_seconds = Config.SCRAPING_INTERVAL_HOURS * 3600 app_state["scraping_stats"]["next_run"] = datetime.now() + timedelta(seconds=interval_seconds) # Wait for next cycle await asyncio.sleep(interval_seconds) except asyncio.CancelledError: logger.info("Background scraping task cancelled") break except Exception as e: logger.error(f"Error in background scraping task: {e}") await asyncio.sleep(60) # Wait a minute before retrying # API Routes @app.get("/", response_class=HTMLResponse) async def root(): """Root endpoint with basic dashboard""" html_content = """ Northern Thailand Ping River Monitor

🏔️ Northern Thailand Ping River Monitor API

Real-time water level monitoring system for the Ping River Basin in Northern Thailand

📊 Quick Status

API is running and monitoring 16 water stations along the Ping River

Coverage: From Chiang Dao to Nakhon Sawan

Data collection interval: Every hour

🔗 API Endpoints

GET /health - System health status
GET /metrics - Application metrics
GET /stations - List all monitoring stations
POST /stations - Add new monitoring station
PUT /stations/{station_id} - Update station information
GET /measurements/latest - Latest measurements
GET /measurements/station/{station_code} - Station-specific data
POST /scrape/trigger - Trigger manual data collection
GET /scraping/status - Scraping status
GET /docs - Interactive API documentation

📈 Monitoring

• Grafana dashboards available for data visualization

• Health checks monitor database, API, and system resources

• Metrics collection for performance monitoring

""" return HTMLResponse(content=html_content) @app.get("/health", response_model=HealthResponse) async def get_health(): """Get system health status""" increment_counter("api_requests", labels={"endpoint": "health"}) health_manager = app_state["health_manager"] if not health_manager: raise HTTPException(status_code=503, detail="Health manager not initialized") # Run health checks results = health_manager.run_all_checks() summary = health_manager.get_health_summary() return HealthResponse(**summary) @app.get("/metrics", response_model=MetricsResponse) async def get_metrics(): """Get application metrics""" increment_counter("api_requests", labels={"endpoint": "metrics"}) metrics_collector = get_metrics_collector() metrics = metrics_collector.get_all_metrics() return MetricsResponse(**metrics) @app.get("/stations", response_model=List[StationResponse]) async def get_stations(): """Get list of all monitoring stations""" increment_counter("api_requests", labels={"endpoint": "stations"}) scraper = app_state["scraper"] if not scraper: raise HTTPException(status_code=503, detail="Scraper not initialized") stations = [] for station_id, station_info in scraper.station_mapping.items(): stations.append(StationResponse( station_id=int(station_id), station_code=station_info["code"], thai_name=station_info["thai_name"], english_name=station_info["english_name"], latitude=station_info.get("latitude"), longitude=station_info.get("longitude"), status="active" )) return stations @app.post("/stations", response_model=StationResponse) async def create_station(station: StationCreateModel): """Create a new monitoring station""" increment_counter("api_requests", labels={"endpoint": "create_station"}) scraper = app_state["scraper"] if not scraper: raise HTTPException(status_code=503, detail="Scraper not initialized") try: # Find next available station ID existing_ids = [int(sid) for sid in scraper.station_mapping.keys()] new_station_id = max(existing_ids) + 1 if existing_ids else 1 # Add to station mapping scraper.station_mapping[str(new_station_id)] = { 'code': station.station_code, 'thai_name': station.thai_name, 'english_name': station.english_name, 'latitude': station.latitude, 'longitude': station.longitude, 'geohash': station.geohash } logger.info(f"Created new station: {station.station_code} ({station.english_name})") return StationResponse( station_id=new_station_id, station_code=station.station_code, thai_name=station.thai_name, english_name=station.english_name, latitude=station.latitude, longitude=station.longitude, geohash=station.geohash, status=station.status ) except Exception as e: logger.error(f"Error creating station: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.put("/stations/{station_id}", response_model=StationResponse) async def update_station(station_id: int, updates: StationUpdateModel): """Update an existing monitoring station""" increment_counter("api_requests", labels={"endpoint": "update_station"}) scraper = app_state["scraper"] if not scraper: raise HTTPException(status_code=503, detail="Scraper not initialized") station_key = str(station_id) if station_key not in scraper.station_mapping: raise HTTPException(status_code=404, detail="Station not found") try: station_info = scraper.station_mapping[station_key] # Update fields if provided if updates.thai_name is not None: station_info['thai_name'] = updates.thai_name if updates.english_name is not None: station_info['english_name'] = updates.english_name if updates.latitude is not None: station_info['latitude'] = updates.latitude if updates.longitude is not None: station_info['longitude'] = updates.longitude if updates.geohash is not None: station_info['geohash'] = updates.geohash logger.info(f"Updated station {station_id}: {station_info['code']}") return StationResponse( station_id=station_id, station_code=station_info['code'], thai_name=station_info['thai_name'], english_name=station_info['english_name'], latitude=station_info.get('latitude'), longitude=station_info.get('longitude'), geohash=station_info.get('geohash'), status=updates.status or "active" ) except Exception as e: logger.error(f"Error updating station {station_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/stations/{station_id}") async def delete_station(station_id: int): """Delete a monitoring station""" increment_counter("api_requests", labels={"endpoint": "delete_station"}) scraper = app_state["scraper"] if not scraper: raise HTTPException(status_code=503, detail="Scraper not initialized") station_key = str(station_id) if station_key not in scraper.station_mapping: raise HTTPException(status_code=404, detail="Station not found") try: station_info = scraper.station_mapping.pop(station_key) logger.info(f"Deleted station {station_id}: {station_info['code']}") return {"message": f"Station {station_info['code']} deleted successfully"} except Exception as e: logger.error(f"Error deleting station {station_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/stations/{station_id}", response_model=StationResponse) async def get_station(station_id: int): """Get details of a specific monitoring station""" increment_counter("api_requests", labels={"endpoint": "get_station"}) scraper = app_state["scraper"] if not scraper: raise HTTPException(status_code=503, detail="Scraper not initialized") station_key = str(station_id) if station_key not in scraper.station_mapping: raise HTTPException(status_code=404, detail="Station not found") station_info = scraper.station_mapping[station_key] return StationResponse( station_id=station_id, station_code=station_info['code'], thai_name=station_info['thai_name'], english_name=station_info['english_name'], latitude=station_info.get('latitude'), longitude=station_info.get('longitude'), geohash=station_info.get('geohash'), status="active" ) @app.get("/measurements/latest", response_model=List[MeasurementResponse]) async def get_latest_measurements(limit: int = 100): """Get latest measurements from all stations""" increment_counter("api_requests", labels={"endpoint": "measurements_latest"}) scraper = app_state["scraper"] if not scraper or not scraper.db_adapter: raise HTTPException(status_code=503, detail="Database not available") try: measurements = scraper.get_latest_data(limit=limit) response = [] for measurement in measurements: response.append(MeasurementResponse( timestamp=measurement["timestamp"], station_code=measurement["station_code"], station_name_en=measurement["station_name_en"], station_name_th=measurement["station_name_th"], water_level=measurement["water_level"], discharge=measurement["discharge"], discharge_percent=measurement.get("discharge_percent"), status=measurement.get("status", "active") )) return response except Exception as e: logger.error(f"Error fetching latest measurements: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/measurements/station/{station_code}", response_model=List[MeasurementResponse]) async def get_station_measurements( station_code: str, hours: int = 24, limit: int = 1000 ): """Get measurements for a specific station""" increment_counter("api_requests", labels={"endpoint": "measurements_station"}) scraper = app_state["scraper"] if not scraper or not scraper.db_adapter: raise HTTPException(status_code=503, detail="Database not available") try: # Get measurements for the specified time range end_time = datetime.now() start_time = end_time - timedelta(hours=hours) measurements = scraper.db_adapter.get_measurements_by_timerange( start_time, end_time, station_codes=[station_code] ) # Limit results measurements = measurements[:limit] response = [] for measurement in measurements: response.append(MeasurementResponse( timestamp=measurement["timestamp"], station_code=measurement["station_code"], station_name_en=measurement["station_name_en"], station_name_th=measurement["station_name_th"], water_level=measurement["water_level"], discharge=measurement["discharge"], discharge_percent=measurement.get("discharge_percent"), status=measurement.get("status", "active") )) return response except Exception as e: logger.error(f"Error fetching station measurements: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/scrape/trigger") async def trigger_scraping(background_tasks: BackgroundTasks): """Trigger manual data scraping""" increment_counter("api_requests", labels={"endpoint": "scrape_trigger"}) if app_state["is_scraping"]: raise HTTPException(status_code=409, detail="Scraping already in progress") scraper = app_state["scraper"] if not scraper: raise HTTPException(status_code=503, detail="Scraper not initialized") def run_scraping(): """Background task to run scraping""" try: app_state["is_scraping"] = True logger.info("Manual scraping triggered via API") result = scraper.run_scraping_cycle() # Update stats app_state["scraping_stats"]["total_runs"] += 1 app_state["scraping_stats"]["last_run"] = datetime.now() if result: app_state["scraping_stats"]["successful_runs"] += 1 increment_counter("manual_scraping_successful") else: app_state["scraping_stats"]["failed_runs"] += 1 increment_counter("manual_scraping_failed") except Exception as e: app_state["scraping_stats"]["failed_runs"] += 1 increment_counter("manual_scraping_failed") logger.error(f"Manual scraping failed: {e}") finally: app_state["is_scraping"] = False background_tasks.add_task(run_scraping) return {"message": "Scraping triggered", "status": "started"} @app.get("/scraping/status", response_model=ScrapingStatusResponse) async def get_scraping_status(): """Get current scraping status""" increment_counter("api_requests", labels={"endpoint": "scraping_status"}) stats = app_state["scraping_stats"] return ScrapingStatusResponse( is_running=app_state["is_scraping"], last_run=stats["last_run"], next_run=stats["next_run"], total_runs=stats["total_runs"], successful_runs=stats["successful_runs"], failed_runs=stats["failed_runs"] ) @app.get("/config") async def get_config(): """Get current configuration (sensitive data masked)""" increment_counter("api_requests", labels={"endpoint": "config"}) config = Config.get_all_settings() # Mask sensitive information for key in config: if 'password' in key.lower() or 'secret' in key.lower(): if config[key]: config[key] = '*' * 8 return config if __name__ == "__main__": import uvicorn # Setup logging setup_logging( log_level=Config.LOG_LEVEL, log_file=Config.LOG_FILE, enable_console=True, enable_colors=True ) # Run the API server uvicorn.run( "web_api:app", host="0.0.0.0", port=8000, reload=False, log_config=None # Use our custom logging )