"""SQLite data logger for continuous telemetry recording.""" import os import sqlite3 import time from dataclasses import fields from .protocol import TelemetryData, PARAM_BY_ID # Telemetry fields to log (all numeric fields from TelemetryData) _TELEM_FIELDS = [f.name for f in fields(TelemetryData)] # Columns that are integers in the DB _INT_FIELDS = {"last_tmp", "VREF", "vfly_correction", "seq"} class DataLogger: """Streaming SQLite logger — one DB file per session.""" BATCH_SIZE = 50 # commit every N rows (~5s at 10Hz) def __init__(self, log_dir: str = "logs"): os.makedirs(log_dir, exist_ok=True) ts = time.strftime("%Y%m%d_%H%M%S") self.db_path = os.path.join(log_dir, f"session_{ts}.db") self._conn = sqlite3.connect(self.db_path, isolation_level=None, check_same_thread=False) self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA synchronous=NORMAL") self._create_tables() self._pending = 0 self._conn.execute("BEGIN") self._t0 = time.monotonic() def _create_tables(self): self._conn.execute( "CREATE TABLE IF NOT EXISTS session_info " "(key TEXT PRIMARY KEY, value TEXT)" ) self._conn.execute( "CREATE TABLE IF NOT EXISTS params (" " timestamp REAL NOT NULL," " param_id INTEGER PRIMARY KEY," " param_name TEXT NOT NULL," " param_group TEXT NOT NULL," " param_type INTEGER NOT NULL," " value REAL NOT NULL" ")" ) # Build telemetry column list from dataclass fields col_defs = ["rowid INTEGER PRIMARY KEY", "timestamp REAL NOT NULL", "mono REAL NOT NULL"] for name in _TELEM_FIELDS: if name in _INT_FIELDS: col_defs.append(f"{name} INTEGER") else: col_defs.append(f"{name} REAL") # Add computed columns col_defs.append("p_in_local REAL") col_defs.append("p_out_local REAL") self._conn.execute( f"CREATE TABLE IF NOT EXISTS telemetry ({', '.join(col_defs)})" ) self._conn.execute( "CREATE INDEX IF NOT EXISTS idx_telem_time ON telemetry(timestamp)" ) def log_session_info(self, port: str, baudrate: int): self._conn.execute( "INSERT OR REPLACE INTO session_info VALUES (?, ?)", ("start_time", time.strftime("%Y-%m-%d %H:%M:%S")), ) self._conn.execute( "INSERT OR REPLACE INTO session_info VALUES (?, ?)", ("serial_port", port), ) self._conn.execute( "INSERT OR REPLACE INTO session_info VALUES (?, ?)", ("baudrate", str(baudrate)), ) def log_telemetry(self, t: TelemetryData): now = time.time() mono = time.monotonic() - self._t0 values = [now, mono] for name in _TELEM_FIELDS: values.append(getattr(t, name)) # computed p_in/p_out from raw telemetry values values.append(t.vin * (-t.iin) / 1e6) values.append(t.vout * t.iout / 1e6) placeholders = ", ".join(["?"] * len(values)) self._conn.execute( f"INSERT INTO telemetry (timestamp, mono, " f"{', '.join(_TELEM_FIELDS)}, p_in_local, p_out_local) " f"VALUES ({placeholders})", values, ) self._pending += 1 if self._pending >= self.BATCH_SIZE: self._conn.execute("COMMIT") self._conn.execute("BEGIN") self._pending = 0 def log_param(self, param_id: int, value: float): pdef = PARAM_BY_ID.get(param_id) if not pdef: return self._conn.execute( "INSERT OR REPLACE INTO params VALUES (?, ?, ?, ?, ?, ?)", (time.time(), param_id, pdef.name, pdef.group, pdef.ptype, value), ) def close(self): if self._conn: try: self._conn.execute("COMMIT") except sqlite3.OperationalError: pass self._conn.close() self._conn = None # ---- Static analysis helpers ---- @staticmethod def load_telemetry(db_path: str): """Load telemetry table into a numpy structured array. Requires numpy (not needed for logging, only for post-analysis). """ import numpy as np conn = sqlite3.connect(db_path) cur = conn.execute("SELECT * FROM telemetry") col_names = [d[0] for d in cur.description] rows = cur.fetchall() conn.close() if not rows: return np.array([]) dtypes = [] for name in col_names: if name in ("rowid", "last_tmp", "VREF", "vfly_correction", "seq"): dtypes.append((name, "i4")) else: dtypes.append((name, "f8")) return np.array([tuple(r) for r in rows], dtype=dtypes) @staticmethod def load_params(db_path: str) -> dict[str, float]: """Load parameter snapshot as {name: value} dict.""" conn = sqlite3.connect(db_path) rows = conn.execute( "SELECT param_name, value FROM params" ).fetchall() conn.close() return {name: val for name, val in rows}