#!/usr/bin/env python3 """ Enhanced Water Monitor Scraper with multiple database backend support """ import requests import datetime import time import schedule import json import os from typing import List, Dict, Optional try: from .database_adapters import create_database_adapter, DatabaseAdapter from .models import WaterMeasurement, StationInfo, ScrapingResult, StationStatus from .validators import DataValidator from .exceptions import APIConnectionError, DataValidationError, DatabaseConnectionError from .metrics import increment_counter, set_gauge, record_histogram, Timer from .rate_limiter import RateLimiter, RequestTracker from .logging_config import get_logger except ImportError: # Handle case when running as standalone script from database_adapters import create_database_adapter, DatabaseAdapter import logging def get_logger(name): return logging.getLogger(name) def increment_counter(*args, **kwargs): pass def set_gauge(*args, **kwargs): pass def record_histogram(*args, **kwargs): pass class Timer: def __init__(self, *args, **kwargs): pass def __enter__(self): return self def __exit__(self, *args): pass class RateLimiter: def __init__(self, *args, **kwargs): pass def wait_if_needed(self): pass class RequestTracker: def __init__(self): pass def record_request(self, *args, **kwargs): pass class DataValidator: @staticmethod def validate_measurements(measurements): return measurements # Get logger instance logger = get_logger(__name__) class EnhancedWaterMonitorScraper: def __init__(self, db_config: Dict): """ Initialize scraper with database configuration Args: db_config: Database configuration dictionary """ self.api_url = "https://hyd-app-db.rid.go.th/webservice/getGroupHourlyWaterLevelReportAllHL.ashx" self.db_config = db_config.copy() # Make a copy to avoid modifying original self.db_adapter = None # Scheduler state tracking self.last_successful_update = None self.retry_mode = False self.next_hourly_check = None # Rate limiting and request tracking self.rate_limiter = RateLimiter(max_requests=10, time_window_seconds=60) self.request_tracker = RequestTracker() # HTTP session for API requests self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest' }) # Station mapping with correct names and geolocation data self.station_mapping = { '1': { 'code': 'P.20', 'thai_name': 'บ้านเชียงดาว', 'english_name': 'Ban Chiang Dao', 'latitude': 19.36731448032191, 'longitude': 98.9688487015384, 'geohash': None }, '2': { 'code': 'P.75', 'thai_name': 'บ้านช่อแล', 'english_name': 'Ban Chai Lat', 'latitude': 19.145972935976225, 'longitude': 99.00735727149247, 'geohash': None }, '3': { 'code': 'P.92', 'thai_name': 'บ้านเมืองกึ๊ด', 'english_name': 'Ban Muang Aut', 'latitude': 19.220518985435646, 'longitude': 98.84733127007874, 'geohash': None }, '4': { 'code': 'P.4A', 'thai_name': 'บ้านแม่แตง', 'english_name': 'Ban Mae Taeng', 'latitude': 19.1222679952378, 'longitude': 98.94437462084075, 'geohash': None }, '5': { 'code': 'P.67', 'thai_name': 'บ้านแม่แต', 'english_name': 'Ban Tae', 'latitude': 19.009762080002453, 'longitude': 98.95978297135508, 'geohash': None }, '6': { 'code': 'P.21', 'thai_name': 'บ้านริมใต้', 'english_name': 'Ban Rim Tai', 'latitude': 18.917459157963293, 'longitude': 98.97018092996231, 'geohash': None }, '7': { 'code': 'P.103', 'thai_name': 'สะพานวงแหวนรอบ 3', 'english_name': 'Ring Bridge 3', 'latitude': 18.86664807441675, 'longitude': 98.9781107622432, 'geohash': None }, '8': { 'code': 'P.1', 'thai_name': 'สะพานนวรัฐ', 'english_name': 'Nawarat Bridge', 'latitude': 18.7875, 'longitude': 99.0045, 'geohash': 'w5q6uuhvfcfp25' }, '9': { 'code': 'P.82', 'thai_name': 'บ้านสบวิน', 'english_name': 'Ban Sob win', 'latitude': 18.6519444, 'longitude': 98.69, 'geohash': None }, '10': { 'code': 'P.84', 'thai_name': 'บ้านพันตน', 'english_name': 'Ban Panton', 'latitude': 18.591315274591334, 'longitude': 98.79657058508496, 'geohash': None }, '11': { 'code': 'P.81', 'thai_name': 'บ้านโป่ง', 'english_name': 'Ban Pong', 'latitude': 13.805661820610888, 'longitude': 99.87174946122846, 'geohash': None }, '12': { 'code': 'P.5', 'thai_name': 'สะพานท่านาง', 'english_name': 'Tha Nang Bridge', 'latitude': 18.580269437546555, 'longitude': 99.01021397084362, 'geohash': None }, '13': { 'code': 'P.77', 'thai_name': 'บ้านสบแม่สะป๊วด', 'english_name': 'Baan Sop Mae Sapuord', 'latitude': 18.433347475179602, 'longitude': 99.08510036666527, 'geohash': None }, '14': { 'code': 'P.87', 'thai_name': 'บ้านป่าซาง', 'english_name': 'Ban Pa Sang', 'latitude': 18.519121825282486, 'longitude': 98.94224374138238, 'geohash': None }, '15': { 'code': 'P.76', 'thai_name': 'บ้านแม่อีไฮ', 'english_name': 'Banb Mae I Hai', 'latitude': 18.141465831254404, 'longitude': 98.89642508267181, 'geohash': None }, '16': { 'code': 'P.85', 'thai_name': 'บ้านหล่ายแก้ว', 'english_name': 'Baan Lai Kaew', 'latitude': 18.17856361002219, 'longitude': 98.63023114782287, 'geohash': None } } self.init_database() def init_database(self): """Initialize database connection""" try: # Extract db_type and pass remaining config as kwargs db_config_copy = self.db_config.copy() db_type = db_config_copy.pop('type') self.db_adapter = create_database_adapter(db_type, **db_config_copy) success = self.db_adapter.connect() if success: logger.info(f"Successfully connected to {db_type.upper()} database") set_gauge("database_connected", 1) increment_counter("database_connections_successful") else: logger.error(f"Failed to connect to {db_type.upper()} database") set_gauge("database_connected", 0) increment_counter("database_connections_failed") except Exception as e: logger.error(f"Error initializing database: {e}") set_gauge("database_connected", 0) increment_counter("database_connections_failed") self.db_adapter = None def fetch_water_data_for_date(self, target_date: datetime.datetime) -> Optional[List[Dict]]: """Fetch water levels and discharge data from API for a specific date""" with Timer("api_request_duration"): try: logger.info(f"Starting data fetch from API for date: {target_date.strftime('%Y-%m-%d')}") # Rate limiting self.rate_limiter.wait_if_needed() # Create Thai format date (Buddhist calendar) thai_year = target_date.year + 543 thai_date = f"{target_date.day:02d}/{target_date.month:02d}/{thai_year}" # API parameters payload = { 'DW[UtokID]': '1', 'DW[BasinID]': '6', 'DW[TimeCurrent]': thai_date, '_search': 'false', 'nd': str(int(time.time() * 1000)), 'rows': '100', 'page': '1', 'sidx': 'indexhourly', 'sord': 'asc' } logger.debug(f"API parameters: {payload}") # POST request to API start_time = time.time() response = self.session.post(self.api_url, data=payload, timeout=30) response_time = time.time() - start_time response.raise_for_status() # Record successful request self.request_tracker.record_request(True, response_time) increment_counter("api_requests_successful") record_histogram("api_response_time", response_time) # Parse JSON response try: json_data = response.json() logger.debug(f"API response received: {len(str(json_data))} characters") except ValueError as e: logger.error(f"Error parsing JSON response: {e}") self.request_tracker.record_request(False, response_time, "json_parse_error") increment_counter("api_requests_failed") return None water_data = [] # Parse JSON data if json_data and isinstance(json_data, dict) and 'rows' in json_data: for row in json_data['rows']: try: # Parse timestamp time_str = row.get('hourlytime', '') if not time_str: continue try: # Format: "1.00", "2.00", ..., "24.00" api_hour = int(float(time_str)) if api_hour < 1 or api_hour > 24: continue if api_hour == 24: # Hour 24 = midnight (00:00) of the next day data_time = target_date.replace(hour=0, minute=0, second=0, microsecond=0) data_time = data_time + datetime.timedelta(days=1) else: # Hours 1-23 = 01:00-23:00 of the same day data_time = target_date.replace(hour=api_hour, minute=0, second=0, microsecond=0) except (ValueError, IndexError): logger.warning(f"Could not parse timestamp: {time_str}") continue # Parse all water levels and discharge values station_count = 0 for station_num in range(1, 17): # Stations 1-16 wl_key = f'wlvalues{station_num}' q_key = f'qvalues{station_num}' qp_key = f'QPercent{station_num}' # Check if both water level and discharge data exist if wl_key in row and q_key in row: try: water_level = row[wl_key] discharge = row[q_key] discharge_percent = row.get(qp_key) # Skip if values are None or invalid if water_level is None or discharge is None: continue # Convert to float water_level = float(water_level) discharge = float(discharge) discharge_percent = float(discharge_percent) if discharge_percent is not None else None station_info = self.station_mapping.get(str(station_num), { 'code': f'P.{19+station_num}', 'thai_name': f'Station {station_num}', 'english_name': f'Station {station_num}' }) water_data.append({ 'timestamp': data_time, 'station_id': station_num, 'station_code': station_info['code'], 'station_name_en': station_info['english_name'], 'station_name_th': station_info['thai_name'], 'latitude': station_info.get('latitude'), 'longitude': station_info.get('longitude'), 'geohash': station_info.get('geohash'), 'water_level': water_level, 'water_level_unit': 'm', 'discharge': discharge, 'discharge_unit': 'cms', 'discharge_percent': discharge_percent, 'status': 'active' }) station_count += 1 except (ValueError, TypeError) as e: logger.warning(f"Could not parse data for station {station_num}: {e}") continue logger.debug(f"Processed {station_count} stations for time {time_str}") except Exception as e: logger.warning(f"Error processing data row: {e}") continue # Validate data water_data = DataValidator.validate_measurements(water_data) logger.info(f"Successfully fetched {len(water_data)} data points from API for {target_date.strftime('%Y-%m-%d')}") return water_data except requests.RequestException as e: logger.error(f"Network error fetching API data: {e}") self.request_tracker.record_request(False, 0, "network_error") increment_counter("api_requests_failed") return None except Exception as e: logger.error(f"Unexpected error fetching API data: {e}") self.request_tracker.record_request(False, 0, "unexpected_error") increment_counter("api_requests_failed") return None def fetch_water_data(self) -> Optional[List[Dict]]: """Fetch water levels and discharge data from API for current date""" current_date = datetime.datetime.now() return self.fetch_water_data_for_date(current_date) def save_to_database(self, water_data: List[Dict], max_retries: int = 3) -> bool: """Save water measurements to database with retry logic""" if not self.db_adapter: logger.error("Database adapter not initialized") return False if not water_data: logger.warning("No data to save") return False for attempt in range(max_retries): try: success = self.db_adapter.save_measurements(water_data) if success: logger.info(f"Successfully saved {len(water_data)} measurements to database") increment_counter("database_saves_successful") set_gauge("last_save_timestamp", time.time()) return True else: logger.warning(f"Save attempt {attempt + 1} failed, retrying...") except Exception as e: if "database is locked" in str(e).lower() and attempt < max_retries - 1: logger.warning(f"Database locked on attempt {attempt + 1}, retrying in {2 ** attempt} seconds...") time.sleep(2 ** attempt) # Exponential backoff continue else: logger.error(f"Error saving to database (attempt {attempt + 1}): {e}") if attempt == max_retries - 1: increment_counter("database_saves_failed") return False return False def get_latest_data(self, limit: int = 100) -> List[Dict]: """Get latest data from database""" if not self.db_adapter: return [] try: return self.db_adapter.get_latest_measurements(limit=limit) except Exception as e: logger.error(f"Error getting latest data: {e}") return [] def run_scraping_cycle(self) -> bool: """Run a complete scraping cycle""" logger.info("Starting scraping cycle...") try: # Fetch current data water_data = self.fetch_water_data() if water_data: success = self.save_to_database(water_data) if success: logger.info("Scraping cycle completed successfully") increment_counter("scraping_cycles_successful") return True else: logger.error("Failed to save data") increment_counter("scraping_cycles_failed") return False else: logger.warning("No data fetched") increment_counter("scraping_cycles_failed") return False except Exception as e: logger.error(f"Scraping cycle failed: {e}") increment_counter("scraping_cycles_failed") return False # Main execution for standalone usage if __name__ == "__main__": import argparse import sys # Configure basic logging for standalone usage import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('water_monitor.log'), logging.StreamHandler() ] ) parser = argparse.ArgumentParser(description="Thailand Water Monitor") parser.add_argument("--test", action="store_true", help="Run single test cycle") args = parser.parse_args() # Default SQLite configuration db_config = { 'type': 'sqlite', 'connection_string': 'sqlite:///water_levels.db' } try: scraper = EnhancedWaterMonitorScraper(db_config) if args.test: logger.info("Running test cycle...") result = scraper.run_scraping_cycle() if result: logger.info("✅ Test completed successfully") sys.exit(0) else: logger.error("❌ Test failed") sys.exit(1) else: logger.info("Starting continuous monitoring...") schedule.every(1).hours.do(scraper.run_scraping_cycle) # Run initial cycle scraper.run_scraping_cycle() while True: schedule.run_pending() time.sleep(60) except KeyboardInterrupt: logger.info("Monitoring stopped by user") except Exception as e: logger.error(f"Error: {e}") sys.exit(1)