#!/usr/bin/env python3 """ SQLite to PostgreSQL Migration Tool Migrates all data from SQLite database to PostgreSQL """ import os import sys import logging import sqlite3 from datetime import datetime, timezone from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) @dataclass class MigrationStats: stations_migrated: int = 0 measurements_migrated: int = 0 errors: List[str] = None start_time: Optional[datetime] = None end_time: Optional[datetime] = None def __post_init__(self): if self.errors is None: self.errors = [] class SQLiteToPostgresMigrator: def __init__(self, sqlite_path: str, postgres_config: Dict[str, Any]): self.sqlite_path = sqlite_path self.postgres_config = postgres_config self.sqlite_conn = None self.postgres_adapter = None self.stats = MigrationStats() # Setup logging with UTF-8 encoding logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('migration.log', encoding='utf-8') ] ) self.logger = logging.getLogger(__name__) def connect_databases(self) -> bool: """Connect to both SQLite and PostgreSQL databases""" try: # Connect to SQLite if not os.path.exists(self.sqlite_path): self.logger.error(f"SQLite database not found: {self.sqlite_path}") return False self.sqlite_conn = sqlite3.connect(self.sqlite_path) self.sqlite_conn.row_factory = sqlite3.Row # For dict-like access self.logger.info(f"Connected to SQLite database: {self.sqlite_path}") # Connect to PostgreSQL from database_adapters import create_database_adapter self.postgres_adapter = create_database_adapter( self.postgres_config['type'], connection_string=self.postgres_config['connection_string'] ) if not self.postgres_adapter.connect(): self.logger.error("Failed to connect to PostgreSQL") return False self.logger.info("Connected to PostgreSQL database") return True except Exception as e: self.logger.error(f"Database connection error: {e}") return False def analyze_sqlite_schema(self) -> Dict[str, List[str]]: """Analyze SQLite database structure""" try: cursor = self.sqlite_conn.cursor() # Get all tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'") tables = [row[0] for row in cursor.fetchall()] schema_info = {} for table in tables: cursor.execute(f"PRAGMA table_info({table})") columns = [row[1] for row in cursor.fetchall()] schema_info[table] = columns # Get row count cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] self.logger.info(f"Table '{table}': {len(columns)} columns, {count} rows") return schema_info except Exception as e: self.logger.error(f"Schema analysis error: {e}") return {} def migrate_stations(self) -> bool: """Migrate station data""" try: cursor = self.sqlite_conn.cursor() # Try different possible table names and structures station_queries = [ # Modern structure """SELECT id, station_code, station_name_th as thai_name, station_name_en as english_name, latitude, longitude, geohash, created_at, updated_at FROM stations""", # Alternative structure 1 """SELECT id, station_code, thai_name, english_name, latitude, longitude, geohash, created_at, updated_at FROM stations""", # Legacy structure """SELECT station_id as id, station_code, station_name as thai_name, station_name as english_name, lat as latitude, lon as longitude, NULL as geohash, datetime('now') as created_at, datetime('now') as updated_at FROM water_stations""", # Simple structure """SELECT rowid as id, station_code, name as thai_name, name as english_name, NULL as latitude, NULL as longitude, NULL as geohash, datetime('now') as created_at, datetime('now') as updated_at FROM stations""", ] stations_data = [] for query in station_queries: try: cursor.execute(query) rows = cursor.fetchall() if rows: self.logger.info(f"Found {len(rows)} stations using query variant") for row in rows: station = { 'station_id': row[0], 'station_code': row[1] or f"STATION_{row[0]}", 'station_name_th': row[2] or f"Station {row[0]}", 'station_name_en': row[3] or f"Station {row[0]}", 'latitude': row[4], 'longitude': row[5], 'geohash': row[6], 'status': 'active' } stations_data.append(station) break except sqlite3.OperationalError as e: if "no such table" in str(e).lower() or "no such column" in str(e).lower(): continue else: raise if not stations_data: self.logger.warning("No stations found in SQLite database") return True # Insert stations into PostgreSQL using raw SQL # Since the adapter is designed for measurements, we'll use direct SQL try: from sqlalchemy import create_engine, text engine = create_engine(self.postgres_config['connection_string']) # Process stations individually to avoid transaction rollback issues for station in stations_data: try: with engine.begin() as conn: # Use PostgreSQL UPSERT syntax with correct column names station_sql = """ INSERT INTO stations (id, station_code, thai_name, english_name, latitude, longitude, geohash) VALUES (:station_id, :station_code, :thai_name, :english_name, :latitude, :longitude, :geohash) ON CONFLICT (id) DO UPDATE SET thai_name = EXCLUDED.thai_name, english_name = EXCLUDED.english_name, latitude = EXCLUDED.latitude, longitude = EXCLUDED.longitude, geohash = EXCLUDED.geohash, updated_at = CURRENT_TIMESTAMP """ conn.execute(text(station_sql), { 'station_id': station['station_id'], 'station_code': station['station_code'], 'thai_name': station['station_name_th'], 'english_name': station['station_name_en'], 'latitude': station.get('latitude'), 'longitude': station.get('longitude'), 'geohash': station.get('geohash') }) self.stats.stations_migrated += 1 except Exception as e: error_msg = f"Error migrating station {station.get('station_code', 'unknown')}: {str(e)[:100]}..." self.logger.warning(error_msg) self.stats.errors.append(error_msg) self.logger.info(f"Migrated {self.stats.stations_migrated} stations") except Exception as e: self.logger.error(f"Station migration failed: {e}") return False self.logger.info(f"Migrated {self.stats.stations_migrated} stations") return True except Exception as e: self.logger.error(f"Station migration error: {e}") return False def migrate_measurements(self, batch_size: int = 5000) -> bool: """Migrate measurement data in batches""" try: cursor = self.sqlite_conn.cursor() # Try different possible measurement table structures measurement_queries = [ # Modern structure """SELECT w.timestamp, w.station_id, s.station_code, s.station_name_th, s.station_name_en, w.water_level, w.discharge, w.discharge_percent, w.status FROM water_measurements w JOIN stations s ON w.station_id = s.id ORDER BY w.timestamp""", # Alternative with different join """SELECT w.timestamp, w.station_id, s.station_code, s.thai_name, s.english_name, w.water_level, w.discharge, w.discharge_percent, 'active' as status FROM water_measurements w JOIN stations s ON w.station_id = s.id ORDER BY w.timestamp""", # Legacy structure """SELECT timestamp, station_id, station_code, station_name, station_name, water_level, discharge, discharge_percent, 'active' as status FROM measurements ORDER BY timestamp""", # Simple structure without joins """SELECT timestamp, station_id, 'UNKNOWN' as station_code, 'Unknown' as station_name_th, 'Unknown' as station_name_en, water_level, discharge, discharge_percent, 'active' as status FROM water_measurements ORDER BY timestamp""", ] measurements_processed = 0 for query in measurement_queries: try: # Get total count first count_query = query.replace("SELECT", "SELECT COUNT(*) FROM (SELECT").replace("ORDER BY w.timestamp", "") + ")" cursor.execute(count_query) total_measurements = cursor.fetchone()[0] if total_measurements == 0: continue self.logger.info(f"Found {total_measurements} measurements to migrate") # Process in batches offset = 0 while True: batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}" cursor.execute(batch_query) rows = cursor.fetchall() if not rows: break # Convert to measurement format measurements = [] for row in rows: try: # Parse timestamp timestamp_str = row[0] if isinstance(timestamp_str, str): try: timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) except: # Try other common formats for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S']: try: timestamp = datetime.strptime(timestamp_str, fmt) break except: continue else: timestamp = datetime.now() else: timestamp = timestamp_str measurement = { 'timestamp': timestamp, 'station_id': row[1] or 999, 'station_code': row[2] or 'UNKNOWN', 'station_name_th': row[3] or 'Unknown', 'station_name_en': row[4] or 'Unknown', 'water_level': float(row[5]) if row[5] is not None else None, 'discharge': float(row[6]) if row[6] is not None else None, 'discharge_percent': float(row[7]) if row[7] is not None else None, 'status': row[8] or 'active' } measurements.append(measurement) except Exception as e: error_msg = f"Error processing measurement row: {e}" self.logger.warning(error_msg) continue # Save batch to PostgreSQL using fast bulk insert if measurements: try: self._fast_bulk_insert(measurements) measurements_processed += len(measurements) self.stats.measurements_migrated += len(measurements) self.logger.info(f"Migrated {measurements_processed}/{total_measurements} measurements") except Exception as e: error_msg = f"Error saving measurement batch: {e}" self.logger.error(error_msg) self.stats.errors.append(error_msg) offset += batch_size # If we processed measurements, we're done if measurements_processed > 0: break except sqlite3.OperationalError as e: if "no such table" in str(e).lower() or "no such column" in str(e).lower(): continue else: raise if measurements_processed == 0: self.logger.warning("No measurements found in SQLite database") else: self.logger.info(f"Successfully migrated {measurements_processed} measurements") return True except Exception as e: self.logger.error(f"Measurement migration error: {e}") return False def _fast_bulk_insert(self, measurements: List[Dict]) -> bool: """Super fast bulk insert using PostgreSQL COPY or VALUES clause""" try: import psycopg2 from urllib.parse import urlparse import io # Parse connection string for direct psycopg2 connection parsed = urlparse(self.postgres_config['connection_string']) # Try super fast COPY method first try: conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, database=parsed.path[1:], user=parsed.username, password=parsed.password ) with conn: with conn.cursor() as cur: # Prepare data for COPY data_buffer = io.StringIO() null_val = '\\N' for m in measurements: data_buffer.write(f"{m['timestamp']}\t{m['station_id']}\t{m['water_level'] or null_val}\t{m['discharge'] or null_val}\t{m['discharge_percent'] or null_val}\t{m['status']}\n") data_buffer.seek(0) # Use COPY for maximum speed cur.copy_from( data_buffer, 'water_measurements', columns=('timestamp', 'station_id', 'water_level', 'discharge', 'discharge_percent', 'status'), sep='\t' ) conn.close() return True except Exception as copy_error: # Fallback to SQLAlchemy bulk insert self.logger.debug(f"COPY failed, using bulk VALUES: {copy_error}") from sqlalchemy import create_engine, text engine = create_engine(self.postgres_config['connection_string']) with engine.begin() as conn: # Use PostgreSQL's fast bulk insert with ON CONFLICT values_list = [] for m in measurements: timestamp = m['timestamp'].isoformat() if hasattr(m['timestamp'], 'isoformat') else str(m['timestamp']) values_list.append( f"('{timestamp}', {m['station_id']}, {m['water_level'] or 'NULL'}, " f"{m['discharge'] or 'NULL'}, {m['discharge_percent'] or 'NULL'}, '{m['status']}')" ) # Build bulk insert query with ON CONFLICT handling bulk_sql = f""" INSERT INTO water_measurements (timestamp, station_id, water_level, discharge, discharge_percent, status) VALUES {','.join(values_list)} ON CONFLICT (timestamp, station_id) DO UPDATE SET water_level = EXCLUDED.water_level, discharge = EXCLUDED.discharge, discharge_percent = EXCLUDED.discharge_percent, status = EXCLUDED.status """ conn.execute(text(bulk_sql)) return True except Exception as e: self.logger.warning(f"Fast bulk insert failed: {e}") # Final fallback to original method try: success = self.postgres_adapter.save_measurements(measurements) return success except Exception as fallback_e: self.logger.error(f"All insert methods failed: {fallback_e}") return False def verify_migration(self) -> bool: """Verify the migration by comparing counts""" try: # Get SQLite counts cursor = self.sqlite_conn.cursor() sqlite_stations = 0 sqlite_measurements = 0 # Try to get station count for table in ['stations', 'water_stations']: try: cursor.execute(f"SELECT COUNT(*) FROM {table}") sqlite_stations = cursor.fetchone()[0] break except: continue # Try to get measurement count for table in ['water_measurements', 'measurements']: try: cursor.execute(f"SELECT COUNT(*) FROM {table}") sqlite_measurements = cursor.fetchone()[0] break except: continue # Get PostgreSQL counts postgres_measurements = self.postgres_adapter.get_latest_measurements(limit=999999) postgres_count = len(postgres_measurements) self.logger.info("Migration Verification:") self.logger.info(f"SQLite stations: {sqlite_stations}") self.logger.info(f"SQLite measurements: {sqlite_measurements}") self.logger.info(f"PostgreSQL measurements retrieved: {postgres_count}") self.logger.info(f"Migrated stations: {self.stats.stations_migrated}") self.logger.info(f"Migrated measurements: {self.stats.measurements_migrated}") return True except Exception as e: self.logger.error(f"Verification error: {e}") return False def run_migration(self, sqlite_path: str = None) -> bool: """Run the complete migration process""" self.stats.start_time = datetime.now() if sqlite_path: self.sqlite_path = sqlite_path self.logger.info("=" * 60) self.logger.info("SQLite to PostgreSQL Migration Tool") self.logger.info("=" * 60) self.logger.info(f"SQLite database: {self.sqlite_path}") self.logger.info(f"PostgreSQL: {self.postgres_config['type']}") try: # Step 1: Connect to databases self.logger.info("Step 1: Connecting to databases...") if not self.connect_databases(): return False # Step 2: Analyze SQLite schema self.logger.info("Step 2: Analyzing SQLite database structure...") schema_info = self.analyze_sqlite_schema() if not schema_info: self.logger.error("Could not analyze SQLite database structure") return False # Step 3: Migrate stations self.logger.info("Step 3: Migrating station data...") if not self.migrate_stations(): self.logger.error("Station migration failed") return False # Step 4: Migrate measurements self.logger.info("Step 4: Migrating measurement data...") if not self.migrate_measurements(): self.logger.error("Measurement migration failed") return False # Step 5: Verify migration self.logger.info("Step 5: Verifying migration...") self.verify_migration() self.stats.end_time = datetime.now() duration = self.stats.end_time - self.stats.start_time # Final report self.logger.info("=" * 60) self.logger.info("MIGRATION COMPLETED") self.logger.info("=" * 60) self.logger.info(f"Duration: {duration}") self.logger.info(f"Stations migrated: {self.stats.stations_migrated}") self.logger.info(f"Measurements migrated: {self.stats.measurements_migrated}") if self.stats.errors: self.logger.warning(f"Errors encountered: {len(self.stats.errors)}") for error in self.stats.errors[:10]: # Show first 10 errors self.logger.warning(f" - {error}") if len(self.stats.errors) > 10: self.logger.warning(f" ... and {len(self.stats.errors) - 10} more errors") else: self.logger.info("No errors encountered") return True except Exception as e: self.logger.error(f"Migration failed: {e}") return False finally: # Cleanup if self.sqlite_conn: self.sqlite_conn.close() def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="Migrate SQLite data to PostgreSQL") parser.add_argument("sqlite_path", nargs="?", help="Path to SQLite database file") parser.add_argument("--batch-size", type=int, default=5000, help="Batch size for processing measurements") parser.add_argument("--fast", action="store_true", help="Use maximum speed mode (batch-size 10000)") parser.add_argument("--dry-run", action="store_true", help="Analyze only, don't migrate") args = parser.parse_args() # Set fast mode if args.fast: args.batch_size = 10000 # Get SQLite path sqlite_path = args.sqlite_path if not sqlite_path: # Try to find common SQLite database files possible_paths = [ "water_levels.db", "water_monitoring.db", "database.db", "../water_levels.db" ] for path in possible_paths: if os.path.exists(path): sqlite_path = path break if not sqlite_path: print("SQLite database file not found. Please specify the path:") print(" python migrate_sqlite_to_postgres.py /path/to/database.db") return False # Get PostgreSQL configuration try: from config import Config postgres_config = Config.get_database_config() if postgres_config['type'] != 'postgresql': print("Error: PostgreSQL not configured. Set DB_TYPE=postgresql in your .env file") return False except Exception as e: print(f"Error loading PostgreSQL configuration: {e}") return False # Run migration migrator = SQLiteToPostgresMigrator(sqlite_path, postgres_config) if args.dry_run: print("DRY RUN MODE - Analyzing SQLite database structure only") if migrator.connect_databases(): schema_info = migrator.analyze_sqlite_schema() print("\nSQLite database structure analysis complete.") print("Run without --dry-run to perform the actual migration.") return True success = migrator.run_migration() return success if __name__ == "__main__": success = main() sys.exit(0 if success else 1)