STM32G474RB firmware for solar buck converter with MPPT, CC control, Vfly compensation, and adaptive deadtime. Includes Textual TUI debug console for real-time telemetry, parameter tuning, and SQLite logging. Added pyproject.toml for uv: `cd code64 && uv run debug-console` Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
154 lines
5.3 KiB
Python
154 lines
5.3 KiB
Python
"""SQLite data logger for continuous telemetry recording."""
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from dataclasses import fields
|
|
|
|
from .protocol import TelemetryData, PARAM_BY_ID
|
|
|
|
# Telemetry fields to log (all numeric fields from TelemetryData)
|
|
_TELEM_FIELDS = [f.name for f in fields(TelemetryData)]
|
|
|
|
# Columns that are integers in the DB
|
|
_INT_FIELDS = {"last_tmp", "VREF", "vfly_correction", "seq"}
|
|
|
|
|
|
class DataLogger:
|
|
"""Streaming SQLite logger — one DB file per session."""
|
|
|
|
BATCH_SIZE = 50 # commit every N rows (~5s at 10Hz)
|
|
|
|
def __init__(self, log_dir: str = "logs"):
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
ts = time.strftime("%Y%m%d_%H%M%S")
|
|
self.db_path = os.path.join(log_dir, f"session_{ts}.db")
|
|
self._conn = sqlite3.connect(self.db_path, isolation_level=None,
|
|
check_same_thread=False)
|
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
|
self._conn.execute("PRAGMA synchronous=NORMAL")
|
|
self._create_tables()
|
|
self._pending = 0
|
|
self._conn.execute("BEGIN")
|
|
self._t0 = time.monotonic()
|
|
|
|
def _create_tables(self):
|
|
self._conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS session_info "
|
|
"(key TEXT PRIMARY KEY, value TEXT)"
|
|
)
|
|
self._conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS params ("
|
|
" timestamp REAL NOT NULL,"
|
|
" param_id INTEGER PRIMARY KEY,"
|
|
" param_name TEXT NOT NULL,"
|
|
" param_group TEXT NOT NULL,"
|
|
" param_type INTEGER NOT NULL,"
|
|
" value REAL NOT NULL"
|
|
")"
|
|
)
|
|
# Build telemetry column list from dataclass fields
|
|
col_defs = ["rowid INTEGER PRIMARY KEY", "timestamp REAL NOT NULL",
|
|
"mono REAL NOT NULL"]
|
|
for name in _TELEM_FIELDS:
|
|
if name in _INT_FIELDS:
|
|
col_defs.append(f"{name} INTEGER")
|
|
else:
|
|
col_defs.append(f"{name} REAL")
|
|
# Add computed columns
|
|
col_defs.append("p_in_local REAL")
|
|
col_defs.append("p_out_local REAL")
|
|
self._conn.execute(
|
|
f"CREATE TABLE IF NOT EXISTS telemetry ({', '.join(col_defs)})"
|
|
)
|
|
self._conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_telem_time ON telemetry(timestamp)"
|
|
)
|
|
|
|
def log_session_info(self, port: str, baudrate: int):
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO session_info VALUES (?, ?)",
|
|
("start_time", time.strftime("%Y-%m-%d %H:%M:%S")),
|
|
)
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO session_info VALUES (?, ?)",
|
|
("serial_port", port),
|
|
)
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO session_info VALUES (?, ?)",
|
|
("baudrate", str(baudrate)),
|
|
)
|
|
|
|
def log_telemetry(self, t: TelemetryData):
|
|
now = time.time()
|
|
mono = time.monotonic() - self._t0
|
|
values = [now, mono]
|
|
for name in _TELEM_FIELDS:
|
|
values.append(getattr(t, name))
|
|
# computed p_in/p_out from raw telemetry values
|
|
values.append(t.vin * (-t.iin) / 1e6)
|
|
values.append(t.vout * t.iout / 1e6)
|
|
placeholders = ", ".join(["?"] * len(values))
|
|
self._conn.execute(
|
|
f"INSERT INTO telemetry (timestamp, mono, "
|
|
f"{', '.join(_TELEM_FIELDS)}, p_in_local, p_out_local) "
|
|
f"VALUES ({placeholders})",
|
|
values,
|
|
)
|
|
self._pending += 1
|
|
if self._pending >= self.BATCH_SIZE:
|
|
self._conn.execute("COMMIT")
|
|
self._conn.execute("BEGIN")
|
|
self._pending = 0
|
|
|
|
def log_param(self, param_id: int, value: float):
|
|
pdef = PARAM_BY_ID.get(param_id)
|
|
if not pdef:
|
|
return
|
|
self._conn.execute(
|
|
"INSERT OR REPLACE INTO params VALUES (?, ?, ?, ?, ?, ?)",
|
|
(time.time(), param_id, pdef.name, pdef.group, pdef.ptype, value),
|
|
)
|
|
|
|
def close(self):
|
|
if self._conn:
|
|
try:
|
|
self._conn.execute("COMMIT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
# ---- Static analysis helpers ----
|
|
|
|
@staticmethod
|
|
def load_telemetry(db_path: str):
|
|
"""Load telemetry table into a numpy structured array.
|
|
|
|
Requires numpy (not needed for logging, only for post-analysis).
|
|
"""
|
|
import numpy as np
|
|
conn = sqlite3.connect(db_path)
|
|
cur = conn.execute("SELECT * FROM telemetry")
|
|
col_names = [d[0] for d in cur.description]
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
if not rows:
|
|
return np.array([])
|
|
dtypes = []
|
|
for name in col_names:
|
|
if name in ("rowid", "last_tmp", "VREF", "vfly_correction", "seq"):
|
|
dtypes.append((name, "i4"))
|
|
else:
|
|
dtypes.append((name, "f8"))
|
|
return np.array([tuple(r) for r in rows], dtype=dtypes)
|
|
|
|
@staticmethod
|
|
def load_params(db_path: str) -> dict[str, float]:
|
|
"""Load parameter snapshot as {name: value} dict."""
|
|
conn = sqlite3.connect(db_path)
|
|
rows = conn.execute(
|
|
"SELECT param_name, value FROM params"
|
|
).fetchall()
|
|
conn.close()
|
|
return {name: val for name, val in rows}
|