Files
Northern-Thailand-Ping-Rive…/scripts/migrate_sqlite_to_postgres.py
grabowski 6c7c128b4d Major refactor: Migrate to uv, add PostgreSQL support, and comprehensive tooling
- **Migration to uv package manager**: Replace pip/requirements with modern pyproject.toml
  - Add pyproject.toml with complete dependency management
  - Update all scripts and Makefile to use uv commands
  - Maintain backward compatibility with existing workflows

- **PostgreSQL integration and migration tools**:
  - Enhanced config.py with automatic password URL encoding
  - Complete PostgreSQL setup scripts and documentation
  - High-performance SQLite to PostgreSQL migration tool (91x speed improvement)
  - Support for both connection strings and individual components

- **Executable distribution system**:
  - PyInstaller integration for standalone .exe creation
  - Automated build scripts with batch file generation
  - Complete packaging system for end-user distribution

- **Enhanced data management**:
  - Fix --fill-gaps command with proper method implementation
  - Add gap detection and historical data backfill capabilities
  - Implement data update functionality for existing records
  - Add comprehensive database adapter methods

- **Developer experience improvements**:
  - Password encoding tools for special characters
  - Interactive setup wizards for PostgreSQL configuration
  - Comprehensive documentation and migration guides
  - Automated testing and validation tools

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-26 15:10:10 +07:00

619 lines
26 KiB
Python

#!/usr/bin/env python3
"""
SQLite to PostgreSQL Migration Tool
Migrates all data from SQLite database to PostgreSQL
"""
import os
import sys
import logging
import sqlite3
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
# Add src to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
@dataclass
class MigrationStats:
stations_migrated: int = 0
measurements_migrated: int = 0
errors: List[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
def __post_init__(self):
if self.errors is None:
self.errors = []
class SQLiteToPostgresMigrator:
def __init__(self, sqlite_path: str, postgres_config: Dict[str, Any]):
self.sqlite_path = sqlite_path
self.postgres_config = postgres_config
self.sqlite_conn = None
self.postgres_adapter = None
self.stats = MigrationStats()
# Setup logging with UTF-8 encoding
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('migration.log', encoding='utf-8')
]
)
self.logger = logging.getLogger(__name__)
def connect_databases(self) -> bool:
"""Connect to both SQLite and PostgreSQL databases"""
try:
# Connect to SQLite
if not os.path.exists(self.sqlite_path):
self.logger.error(f"SQLite database not found: {self.sqlite_path}")
return False
self.sqlite_conn = sqlite3.connect(self.sqlite_path)
self.sqlite_conn.row_factory = sqlite3.Row # For dict-like access
self.logger.info(f"Connected to SQLite database: {self.sqlite_path}")
# Connect to PostgreSQL
from database_adapters import create_database_adapter
self.postgres_adapter = create_database_adapter(
self.postgres_config['type'],
connection_string=self.postgres_config['connection_string']
)
if not self.postgres_adapter.connect():
self.logger.error("Failed to connect to PostgreSQL")
return False
self.logger.info("Connected to PostgreSQL database")
return True
except Exception as e:
self.logger.error(f"Database connection error: {e}")
return False
def analyze_sqlite_schema(self) -> Dict[str, List[str]]:
"""Analyze SQLite database structure"""
try:
cursor = self.sqlite_conn.cursor()
# Get all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [row[0] for row in cursor.fetchall()]
schema_info = {}
for table in tables:
cursor.execute(f"PRAGMA table_info({table})")
columns = [row[1] for row in cursor.fetchall()]
schema_info[table] = columns
# Get row count
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
self.logger.info(f"Table '{table}': {len(columns)} columns, {count} rows")
return schema_info
except Exception as e:
self.logger.error(f"Schema analysis error: {e}")
return {}
def migrate_stations(self) -> bool:
"""Migrate station data"""
try:
cursor = self.sqlite_conn.cursor()
# Try different possible table names and structures
station_queries = [
# Modern structure
"""SELECT id, station_code, station_name_th as thai_name, station_name_en as english_name,
latitude, longitude, geohash, created_at, updated_at
FROM stations""",
# Alternative structure 1
"""SELECT id, station_code, thai_name, english_name,
latitude, longitude, geohash, created_at, updated_at
FROM stations""",
# Legacy structure
"""SELECT station_id as id, station_code, station_name as thai_name,
station_name as english_name, lat as latitude, lon as longitude,
NULL as geohash, datetime('now') as created_at, datetime('now') as updated_at
FROM water_stations""",
# Simple structure
"""SELECT rowid as id, station_code, name as thai_name, name as english_name,
NULL as latitude, NULL as longitude, NULL as geohash,
datetime('now') as created_at, datetime('now') as updated_at
FROM stations""",
]
stations_data = []
for query in station_queries:
try:
cursor.execute(query)
rows = cursor.fetchall()
if rows:
self.logger.info(f"Found {len(rows)} stations using query variant")
for row in rows:
station = {
'station_id': row[0],
'station_code': row[1] or f"STATION_{row[0]}",
'station_name_th': row[2] or f"Station {row[0]}",
'station_name_en': row[3] or f"Station {row[0]}",
'latitude': row[4],
'longitude': row[5],
'geohash': row[6],
'status': 'active'
}
stations_data.append(station)
break
except sqlite3.OperationalError as e:
if "no such table" in str(e).lower() or "no such column" in str(e).lower():
continue
else:
raise
if not stations_data:
self.logger.warning("No stations found in SQLite database")
return True
# Insert stations into PostgreSQL using raw SQL
# Since the adapter is designed for measurements, we'll use direct SQL
try:
from sqlalchemy import create_engine, text
engine = create_engine(self.postgres_config['connection_string'])
# Process stations individually to avoid transaction rollback issues
for station in stations_data:
try:
with engine.begin() as conn:
# Use PostgreSQL UPSERT syntax with correct column names
station_sql = """
INSERT INTO stations (id, station_code, thai_name, english_name, latitude, longitude, geohash)
VALUES (:station_id, :station_code, :thai_name, :english_name, :latitude, :longitude, :geohash)
ON CONFLICT (id) DO UPDATE SET
thai_name = EXCLUDED.thai_name,
english_name = EXCLUDED.english_name,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
geohash = EXCLUDED.geohash,
updated_at = CURRENT_TIMESTAMP
"""
conn.execute(text(station_sql), {
'station_id': station['station_id'],
'station_code': station['station_code'],
'thai_name': station['station_name_th'],
'english_name': station['station_name_en'],
'latitude': station.get('latitude'),
'longitude': station.get('longitude'),
'geohash': station.get('geohash')
})
self.stats.stations_migrated += 1
except Exception as e:
error_msg = f"Error migrating station {station.get('station_code', 'unknown')}: {str(e)[:100]}..."
self.logger.warning(error_msg)
self.stats.errors.append(error_msg)
self.logger.info(f"Migrated {self.stats.stations_migrated} stations")
except Exception as e:
self.logger.error(f"Station migration failed: {e}")
return False
self.logger.info(f"Migrated {self.stats.stations_migrated} stations")
return True
except Exception as e:
self.logger.error(f"Station migration error: {e}")
return False
def migrate_measurements(self, batch_size: int = 5000) -> bool:
"""Migrate measurement data in batches"""
try:
cursor = self.sqlite_conn.cursor()
# Try different possible measurement table structures
measurement_queries = [
# Modern structure
"""SELECT w.timestamp, w.station_id, s.station_code, s.station_name_th, s.station_name_en,
w.water_level, w.discharge, w.discharge_percent, w.status
FROM water_measurements w
JOIN stations s ON w.station_id = s.id
ORDER BY w.timestamp""",
# Alternative with different join
"""SELECT w.timestamp, w.station_id, s.station_code, s.thai_name, s.english_name,
w.water_level, w.discharge, w.discharge_percent, 'active' as status
FROM water_measurements w
JOIN stations s ON w.station_id = s.id
ORDER BY w.timestamp""",
# Legacy structure
"""SELECT timestamp, station_id, station_code, station_name, station_name,
water_level, discharge, discharge_percent, 'active' as status
FROM measurements
ORDER BY timestamp""",
# Simple structure without joins
"""SELECT timestamp, station_id, 'UNKNOWN' as station_code, 'Unknown' as station_name_th, 'Unknown' as station_name_en,
water_level, discharge, discharge_percent, 'active' as status
FROM water_measurements
ORDER BY timestamp""",
]
measurements_processed = 0
for query in measurement_queries:
try:
# Get total count first
count_query = query.replace("SELECT", "SELECT COUNT(*) FROM (SELECT").replace("ORDER BY w.timestamp", "") + ")"
cursor.execute(count_query)
total_measurements = cursor.fetchone()[0]
if total_measurements == 0:
continue
self.logger.info(f"Found {total_measurements} measurements to migrate")
# Process in batches
offset = 0
while True:
batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
cursor.execute(batch_query)
rows = cursor.fetchall()
if not rows:
break
# Convert to measurement format
measurements = []
for row in rows:
try:
# Parse timestamp
timestamp_str = row[0]
if isinstance(timestamp_str, str):
try:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
except:
# Try other common formats
for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S']:
try:
timestamp = datetime.strptime(timestamp_str, fmt)
break
except:
continue
else:
timestamp = datetime.now()
else:
timestamp = timestamp_str
measurement = {
'timestamp': timestamp,
'station_id': row[1] or 999,
'station_code': row[2] or 'UNKNOWN',
'station_name_th': row[3] or 'Unknown',
'station_name_en': row[4] or 'Unknown',
'water_level': float(row[5]) if row[5] is not None else None,
'discharge': float(row[6]) if row[6] is not None else None,
'discharge_percent': float(row[7]) if row[7] is not None else None,
'status': row[8] or 'active'
}
measurements.append(measurement)
except Exception as e:
error_msg = f"Error processing measurement row: {e}"
self.logger.warning(error_msg)
continue
# Save batch to PostgreSQL using fast bulk insert
if measurements:
try:
self._fast_bulk_insert(measurements)
measurements_processed += len(measurements)
self.stats.measurements_migrated += len(measurements)
self.logger.info(f"Migrated {measurements_processed}/{total_measurements} measurements")
except Exception as e:
error_msg = f"Error saving measurement batch: {e}"
self.logger.error(error_msg)
self.stats.errors.append(error_msg)
offset += batch_size
# If we processed measurements, we're done
if measurements_processed > 0:
break
except sqlite3.OperationalError as e:
if "no such table" in str(e).lower() or "no such column" in str(e).lower():
continue
else:
raise
if measurements_processed == 0:
self.logger.warning("No measurements found in SQLite database")
else:
self.logger.info(f"Successfully migrated {measurements_processed} measurements")
return True
except Exception as e:
self.logger.error(f"Measurement migration error: {e}")
return False
def _fast_bulk_insert(self, measurements: List[Dict]) -> bool:
"""Super fast bulk insert using PostgreSQL COPY or VALUES clause"""
try:
import psycopg2
from urllib.parse import urlparse
import io
# Parse connection string for direct psycopg2 connection
parsed = urlparse(self.postgres_config['connection_string'])
# Try super fast COPY method first
try:
conn = psycopg2.connect(
host=parsed.hostname,
port=parsed.port or 5432,
database=parsed.path[1:],
user=parsed.username,
password=parsed.password
)
with conn:
with conn.cursor() as cur:
# Prepare data for COPY
data_buffer = io.StringIO()
null_val = '\\N'
for m in measurements:
data_buffer.write(f"{m['timestamp']}\t{m['station_id']}\t{m['water_level'] or null_val}\t{m['discharge'] or null_val}\t{m['discharge_percent'] or null_val}\t{m['status']}\n")
data_buffer.seek(0)
# Use COPY for maximum speed
cur.copy_from(
data_buffer,
'water_measurements',
columns=('timestamp', 'station_id', 'water_level', 'discharge', 'discharge_percent', 'status'),
sep='\t'
)
conn.close()
return True
except Exception as copy_error:
# Fallback to SQLAlchemy bulk insert
self.logger.debug(f"COPY failed, using bulk VALUES: {copy_error}")
from sqlalchemy import create_engine, text
engine = create_engine(self.postgres_config['connection_string'])
with engine.begin() as conn:
# Use PostgreSQL's fast bulk insert with ON CONFLICT
values_list = []
for m in measurements:
timestamp = m['timestamp'].isoformat() if hasattr(m['timestamp'], 'isoformat') else str(m['timestamp'])
values_list.append(
f"('{timestamp}', {m['station_id']}, {m['water_level'] or 'NULL'}, "
f"{m['discharge'] or 'NULL'}, {m['discharge_percent'] or 'NULL'}, '{m['status']}')"
)
# Build bulk insert query with ON CONFLICT handling
bulk_sql = f"""
INSERT INTO water_measurements (timestamp, station_id, water_level, discharge, discharge_percent, status)
VALUES {','.join(values_list)}
ON CONFLICT (timestamp, station_id) DO UPDATE SET
water_level = EXCLUDED.water_level,
discharge = EXCLUDED.discharge,
discharge_percent = EXCLUDED.discharge_percent,
status = EXCLUDED.status
"""
conn.execute(text(bulk_sql))
return True
except Exception as e:
self.logger.warning(f"Fast bulk insert failed: {e}")
# Final fallback to original method
try:
success = self.postgres_adapter.save_measurements(measurements)
return success
except Exception as fallback_e:
self.logger.error(f"All insert methods failed: {fallback_e}")
return False
def verify_migration(self) -> bool:
"""Verify the migration by comparing counts"""
try:
# Get SQLite counts
cursor = self.sqlite_conn.cursor()
sqlite_stations = 0
sqlite_measurements = 0
# Try to get station count
for table in ['stations', 'water_stations']:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
sqlite_stations = cursor.fetchone()[0]
break
except:
continue
# Try to get measurement count
for table in ['water_measurements', 'measurements']:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
sqlite_measurements = cursor.fetchone()[0]
break
except:
continue
# Get PostgreSQL counts
postgres_measurements = self.postgres_adapter.get_latest_measurements(limit=999999)
postgres_count = len(postgres_measurements)
self.logger.info("Migration Verification:")
self.logger.info(f"SQLite stations: {sqlite_stations}")
self.logger.info(f"SQLite measurements: {sqlite_measurements}")
self.logger.info(f"PostgreSQL measurements retrieved: {postgres_count}")
self.logger.info(f"Migrated stations: {self.stats.stations_migrated}")
self.logger.info(f"Migrated measurements: {self.stats.measurements_migrated}")
return True
except Exception as e:
self.logger.error(f"Verification error: {e}")
return False
def run_migration(self, sqlite_path: str = None) -> bool:
"""Run the complete migration process"""
self.stats.start_time = datetime.now()
if sqlite_path:
self.sqlite_path = sqlite_path
self.logger.info("=" * 60)
self.logger.info("SQLite to PostgreSQL Migration Tool")
self.logger.info("=" * 60)
self.logger.info(f"SQLite database: {self.sqlite_path}")
self.logger.info(f"PostgreSQL: {self.postgres_config['type']}")
try:
# Step 1: Connect to databases
self.logger.info("Step 1: Connecting to databases...")
if not self.connect_databases():
return False
# Step 2: Analyze SQLite schema
self.logger.info("Step 2: Analyzing SQLite database structure...")
schema_info = self.analyze_sqlite_schema()
if not schema_info:
self.logger.error("Could not analyze SQLite database structure")
return False
# Step 3: Migrate stations
self.logger.info("Step 3: Migrating station data...")
if not self.migrate_stations():
self.logger.error("Station migration failed")
return False
# Step 4: Migrate measurements
self.logger.info("Step 4: Migrating measurement data...")
if not self.migrate_measurements():
self.logger.error("Measurement migration failed")
return False
# Step 5: Verify migration
self.logger.info("Step 5: Verifying migration...")
self.verify_migration()
self.stats.end_time = datetime.now()
duration = self.stats.end_time - self.stats.start_time
# Final report
self.logger.info("=" * 60)
self.logger.info("MIGRATION COMPLETED")
self.logger.info("=" * 60)
self.logger.info(f"Duration: {duration}")
self.logger.info(f"Stations migrated: {self.stats.stations_migrated}")
self.logger.info(f"Measurements migrated: {self.stats.measurements_migrated}")
if self.stats.errors:
self.logger.warning(f"Errors encountered: {len(self.stats.errors)}")
for error in self.stats.errors[:10]: # Show first 10 errors
self.logger.warning(f" - {error}")
if len(self.stats.errors) > 10:
self.logger.warning(f" ... and {len(self.stats.errors) - 10} more errors")
else:
self.logger.info("No errors encountered")
return True
except Exception as e:
self.logger.error(f"Migration failed: {e}")
return False
finally:
# Cleanup
if self.sqlite_conn:
self.sqlite_conn.close()
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="Migrate SQLite data to PostgreSQL")
parser.add_argument("sqlite_path", nargs="?", help="Path to SQLite database file")
parser.add_argument("--batch-size", type=int, default=5000, help="Batch size for processing measurements")
parser.add_argument("--fast", action="store_true", help="Use maximum speed mode (batch-size 10000)")
parser.add_argument("--dry-run", action="store_true", help="Analyze only, don't migrate")
args = parser.parse_args()
# Set fast mode
if args.fast:
args.batch_size = 10000
# Get SQLite path
sqlite_path = args.sqlite_path
if not sqlite_path:
# Try to find common SQLite database files
possible_paths = [
"water_levels.db",
"water_monitoring.db",
"database.db",
"../water_levels.db"
]
for path in possible_paths:
if os.path.exists(path):
sqlite_path = path
break
if not sqlite_path:
print("SQLite database file not found. Please specify the path:")
print(" python migrate_sqlite_to_postgres.py /path/to/database.db")
return False
# Get PostgreSQL configuration
try:
from config import Config
postgres_config = Config.get_database_config()
if postgres_config['type'] != 'postgresql':
print("Error: PostgreSQL not configured. Set DB_TYPE=postgresql in your .env file")
return False
except Exception as e:
print(f"Error loading PostgreSQL configuration: {e}")
return False
# Run migration
migrator = SQLiteToPostgresMigrator(sqlite_path, postgres_config)
if args.dry_run:
print("DRY RUN MODE - Analyzing SQLite database structure only")
if migrator.connect_databases():
schema_info = migrator.analyze_sqlite_schema()
print("\nSQLite database structure analysis complete.")
print("Run without --dry-run to perform the actual migration.")
return True
success = migrator.run_migration()
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)