#!/usr/bin/env python3 """ Migration script to add geolocation columns to existing water monitoring database """ import os import sys import sqlite3 import logging from typing import Dict, Any # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def migrate_sqlite(db_path: str = 'water_monitoring.db') -> bool: """Migrate SQLite database to add geolocation columns""" try: logging.info(f"Migrating SQLite database: {db_path}") # Connect to database conn = sqlite3.connect(db_path) cursor = conn.cursor() # Check if columns already exist cursor.execute("PRAGMA table_info(stations)") columns = [column[1] for column in cursor.fetchall()] logging.info(f"Current columns in stations table: {columns}") # Add columns if they don't exist columns_added = [] if 'latitude' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN latitude REAL") columns_added.append('latitude') logging.info("Added latitude column") if 'longitude' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN longitude REAL") columns_added.append('longitude') logging.info("Added longitude column") if 'geohash' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN geohash TEXT") columns_added.append('geohash') logging.info("Added geohash column") if columns_added: # Update P.1 station with sample geolocation data cursor.execute(""" UPDATE stations SET latitude = 15.6944, longitude = 100.2028, geohash = 'w5q6uuhvfcfp25' WHERE station_code = 'P.1' """) # Commit changes conn.commit() logging.info(f"Successfully added columns: {', '.join(columns_added)}") logging.info("Updated P.1 station with sample geolocation data") else: logging.info("All geolocation columns already exist") # Verify the changes cursor.execute("SELECT station_code, latitude, longitude, geohash FROM stations WHERE station_code = 'P.1'") result = cursor.fetchone() if result: logging.info(f"P.1 station geolocation: {result}") conn.close() return True except Exception as e: logging.error(f"Error migrating SQLite database: {e}") return False def migrate_postgresql(connection_string: str) -> bool: """Migrate PostgreSQL database to add geolocation columns""" try: import psycopg2 from urllib.parse import urlparse logging.info("Migrating PostgreSQL database") # Parse connection string parsed = urlparse(connection_string) # Connect to database conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, database=parsed.path[1:], # Remove leading slash user=parsed.username, password=parsed.password ) cursor = conn.cursor() # Check if columns exist cursor.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'stations' """) columns = [row[0] for row in cursor.fetchall()] logging.info(f"Current columns in stations table: {columns}") # Add columns if they don't exist columns_added = [] if 'latitude' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN latitude DECIMAL(10,8)") columns_added.append('latitude') logging.info("Added latitude column") if 'longitude' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN longitude DECIMAL(11,8)") columns_added.append('longitude') logging.info("Added longitude column") if 'geohash' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN geohash VARCHAR(20)") columns_added.append('geohash') logging.info("Added geohash column") if columns_added: # Update P.1 station with sample geolocation data cursor.execute(""" UPDATE stations SET latitude = 15.6944, longitude = 100.2028, geohash = 'w5q6uuhvfcfp25' WHERE station_code = 'P.1' """) # Commit changes conn.commit() logging.info(f"Successfully added columns: {', '.join(columns_added)}") logging.info("Updated P.1 station with sample geolocation data") else: logging.info("All geolocation columns already exist") conn.close() return True except ImportError: logging.error("psycopg2 not installed. Run: pip install psycopg2-binary") return False except Exception as e: logging.error(f"Error migrating PostgreSQL database: {e}") return False def migrate_mysql(connection_string: str) -> bool: """Migrate MySQL database to add geolocation columns""" try: import pymysql from urllib.parse import urlparse logging.info("Migrating MySQL database") # Parse connection string parsed = urlparse(connection_string) # Connect to database conn = pymysql.connect( host=parsed.hostname, port=parsed.port or 3306, database=parsed.path[1:], # Remove leading slash user=parsed.username, password=parsed.password ) cursor = conn.cursor() # Check if columns exist cursor.execute("DESCRIBE stations") columns = [row[0] for row in cursor.fetchall()] logging.info(f"Current columns in stations table: {columns}") # Add columns if they don't exist columns_added = [] if 'latitude' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN latitude DECIMAL(10,8)") columns_added.append('latitude') logging.info("Added latitude column") if 'longitude' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN longitude DECIMAL(11,8)") columns_added.append('longitude') logging.info("Added longitude column") if 'geohash' not in columns: cursor.execute("ALTER TABLE stations ADD COLUMN geohash VARCHAR(20)") columns_added.append('geohash') logging.info("Added geohash column") if columns_added: # Update P.1 station with sample geolocation data cursor.execute(""" UPDATE stations SET latitude = 15.6944, longitude = 100.2028, geohash = 'w5q6uuhvfcfp25' WHERE station_code = 'P.1' """) # Commit changes conn.commit() logging.info(f"Successfully added columns: {', '.join(columns_added)}") logging.info("Updated P.1 station with sample geolocation data") else: logging.info("All geolocation columns already exist") conn.close() return True except ImportError: logging.error("pymysql not installed. Run: pip install pymysql") return False except Exception as e: logging.error(f"Error migrating MySQL database: {e}") return False def load_config_from_env() -> Dict[str, Any]: """Load database configuration from environment variables""" db_type = os.getenv('DB_TYPE', 'sqlite').lower() if db_type == 'postgresql': return { 'type': 'postgresql', 'connection_string': os.getenv('POSTGRES_CONNECTION_STRING', 'postgresql://postgres:password@localhost/water_monitoring') } elif db_type == 'mysql': return { 'type': 'mysql', 'connection_string': os.getenv('MYSQL_CONNECTION_STRING', 'mysql://root:password@localhost/water_monitoring') } elif db_type == 'victoriametrics': logging.info("VictoriaMetrics doesn't require schema migration") return {'type': 'victoriametrics'} elif db_type == 'influxdb': logging.info("InfluxDB doesn't require schema migration") return {'type': 'influxdb'} else: # Default to SQLite return { 'type': 'sqlite', 'db_path': os.getenv('SQLITE_DB_PATH', 'water_monitoring.db') } def main(): """Main migration function""" logging.info("Starting geolocation column migration...") # Load configuration config = load_config_from_env() db_type = config['type'] logging.info(f"Detected database type: {db_type.upper()}") success = False if db_type == 'sqlite': db_path = config.get('db_path', 'water_monitoring.db') if not os.path.exists(db_path): logging.error(f"Database file not found: {db_path}") sys.exit(1) success = migrate_sqlite(db_path) elif db_type == 'postgresql': success = migrate_postgresql(config['connection_string']) elif db_type == 'mysql': success = migrate_mysql(config['connection_string']) elif db_type in ['victoriametrics', 'influxdb']: logging.info(f"{db_type.upper()} doesn't require schema migration") success = True else: logging.error(f"Unsupported database type: {db_type}") sys.exit(1) if success: logging.info("✅ Migration completed successfully!") logging.info("You can now restart your water monitoring application") logging.info("The system will automatically use the new geolocation columns") else: logging.error("❌ Migration failed!") sys.exit(1) if __name__ == "__main__": main()