STM32G474RB firmware for solar buck converter with MPPT, CC control, Vfly compensation, and adaptive deadtime. Includes Textual TUI debug console for real-time telemetry, parameter tuning, and SQLite logging. Added pyproject.toml for uv: `cd code64 && uv run debug-console` Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
210 lines
7.6 KiB
Python
210 lines
7.6 KiB
Python
"""Textual TUI application for LVSolarBuck debug console."""
|
|
import time
|
|
from textual.app import App, ComposeResult
|
|
from textual.containers import Horizontal, Vertical, VerticalScroll
|
|
from textual.widgets import Header, Footer, RichLog
|
|
|
|
from .protocol import (
|
|
CMD_TELEMETRY, CMD_PARAM_WRITE_ACK, CMD_PARAM_VALUE, CMD_PONG, CMD_ERROR_MSG,
|
|
decode_telemetry, decode_param_value, build_ping, build_param_read_all,
|
|
PARAM_BY_ID,
|
|
)
|
|
from .serial_worker import SerialWorker
|
|
from .data_logger import DataLogger
|
|
from .widgets.telemetry_panel import TelemetryPanel
|
|
from .widgets.param_group import ParamGroup
|
|
from .widgets.status_bar import StatusBar
|
|
|
|
|
|
class DebugConsoleApp(App):
|
|
"""LVSolarBuck Debug Console"""
|
|
|
|
TITLE = "LVSolarBuck Debug Console"
|
|
|
|
CSS = """
|
|
Screen {
|
|
layout: vertical;
|
|
}
|
|
#main {
|
|
layout: horizontal;
|
|
height: 1fr;
|
|
}
|
|
#left-panel {
|
|
width: 35;
|
|
min-width: 30;
|
|
border-right: solid $primary;
|
|
}
|
|
#right-panel {
|
|
width: 1fr;
|
|
min-width: 40;
|
|
}
|
|
#error-log {
|
|
height: 8;
|
|
min-height: 4;
|
|
border-top: solid $error;
|
|
background: $surface;
|
|
}
|
|
"""
|
|
|
|
BINDINGS = [
|
|
("q", "quit", "Quit"),
|
|
("p", "ping", "Ping"),
|
|
("r", "read_params", "Read Params"),
|
|
("f", "toggle_filter", "Filter On/Off"),
|
|
("l", "show_log_path", "Log Path"),
|
|
]
|
|
|
|
def __init__(self, port: str, baudrate: int = 460800):
|
|
super().__init__()
|
|
self.serial_port = port
|
|
self.serial_baudrate = baudrate
|
|
self.worker: SerialWorker | None = None
|
|
self.logger: DataLogger | None = None
|
|
self._telem_count = 0
|
|
self._fps_time = time.monotonic()
|
|
self._last_seq = -1
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
with Horizontal(id="main"):
|
|
with Vertical(id="left-panel"):
|
|
yield TelemetryPanel()
|
|
with VerticalScroll(id="right-panel"):
|
|
yield ParamGroup("Global", self._send_data)
|
|
yield ParamGroup("Compensator", self._send_data)
|
|
yield ParamGroup("Vfly", self._send_data)
|
|
yield ParamGroup("CC", self._send_data)
|
|
yield ParamGroup("MPPT", self._send_data)
|
|
yield ParamGroup("Deadtime", self._send_data)
|
|
yield RichLog(id="error-log", markup=True, wrap=True)
|
|
yield StatusBar()
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
self.sub_title = f"{self.serial_port} {self.serial_baudrate}"
|
|
log = self.query_one("#error-log", RichLog)
|
|
log.write("[dim]Error log ready[/dim]")
|
|
self.logger = DataLogger(log_dir="logs")
|
|
self.logger.log_session_info(self.serial_port, self.serial_baudrate)
|
|
log.write(f"[dim]Logging to {self.logger.db_path}[/dim]")
|
|
self.worker = SerialWorker(self.serial_port, self.serial_baudrate)
|
|
self.worker.set_frame_callback(self._on_frame)
|
|
self.worker.set_status_callback(self._on_status)
|
|
self.worker.start()
|
|
self.set_interval(0.5, self._update_status)
|
|
self.set_interval(2.0, self._request_params)
|
|
|
|
def on_unmount(self) -> None:
|
|
if self.worker:
|
|
self.worker.stop()
|
|
if self.logger:
|
|
self.logger.close()
|
|
|
|
def _send_data(self, data: bytes) -> None:
|
|
if self.worker:
|
|
self.worker.send(data)
|
|
|
|
def _request_params(self) -> None:
|
|
if self.worker and self.worker.connected:
|
|
self._send_data(build_param_read_all())
|
|
|
|
def _on_frame(self, cmd: int, payload: bytes) -> None:
|
|
if cmd == CMD_TELEMETRY:
|
|
t = decode_telemetry(payload)
|
|
if t is not None:
|
|
self._telem_count += 1
|
|
if self._last_seq >= 0:
|
|
expected = (self._last_seq + 1) & 0xFF
|
|
if t.seq != expected:
|
|
diff = (t.seq - expected) & 0xFF
|
|
if self.worker:
|
|
self.worker.drop_count += diff
|
|
self._last_seq = t.seq
|
|
if self.logger:
|
|
self.logger.log_telemetry(t)
|
|
self.call_from_thread(self._update_ui, t)
|
|
elif cmd == CMD_PARAM_VALUE:
|
|
result = decode_param_value(payload)
|
|
if result:
|
|
param_id, value = result
|
|
if self.logger:
|
|
self.logger.log_param(param_id, value)
|
|
self.call_from_thread(self._update_param, param_id, value)
|
|
elif cmd == CMD_PARAM_WRITE_ACK:
|
|
result = decode_param_value(payload)
|
|
if result:
|
|
param_id, value = result
|
|
if self.logger:
|
|
self.logger.log_param(param_id, value)
|
|
self.call_from_thread(self._update_param, param_id, value)
|
|
self.call_from_thread(self.notify, "Parameter ACK", severity="information")
|
|
elif cmd == CMD_ERROR_MSG:
|
|
msg = payload.decode("ascii", errors="replace")
|
|
self.call_from_thread(self._log_error, msg)
|
|
elif cmd == CMD_PONG:
|
|
self.call_from_thread(self.notify, "PONG received")
|
|
|
|
def _on_status(self, connected: bool) -> None:
|
|
self.call_from_thread(self._update_connected, connected)
|
|
|
|
def _log_error(self, msg: str) -> None:
|
|
log = self.query_one("#error-log", RichLog)
|
|
ts = time.strftime("%H:%M:%S")
|
|
is_guard = msg.startswith("GUARD") or msg.startswith("DIAG") or msg.startswith("RAW") or msg.startswith("CTRL") or msg.startswith("HW ") or msg.startswith("HRTIM") or msg.startswith("FLAGS") or msg == "Startup"
|
|
if is_guard:
|
|
log.write(f"[yellow][{ts}] {msg}[/yellow]")
|
|
else:
|
|
log.write(f"[bold red][{ts}] {msg}[/bold red]")
|
|
self.notify(f"STM32: {msg}", severity="error", timeout=10)
|
|
|
|
def _update_ui(self, t) -> None:
|
|
self.query_one(TelemetryPanel).update_telemetry(t)
|
|
status = self.query_one(StatusBar)
|
|
if self.worker:
|
|
status.pkt_count = self.worker.pkt_count
|
|
status.drop_count = self.worker.drop_count
|
|
status.last_seq = t.seq
|
|
status.refresh_status()
|
|
|
|
def _update_param(self, param_id: int, value: float) -> None:
|
|
for group in self.query(ParamGroup):
|
|
group.update_param(param_id, value)
|
|
self.query_one(TelemetryPanel).update_dt_param(param_id, value)
|
|
|
|
def _update_connected(self, connected: bool) -> None:
|
|
status = self.query_one(StatusBar)
|
|
status.connected = connected
|
|
status.refresh_status()
|
|
if connected:
|
|
self._request_params()
|
|
|
|
def _update_status(self) -> None:
|
|
now = time.monotonic()
|
|
elapsed = now - self._fps_time
|
|
fps = self._telem_count / elapsed if elapsed > 0 else 0.0
|
|
self._telem_count = 0
|
|
self._fps_time = now
|
|
status = self.query_one(StatusBar)
|
|
status.fps = fps
|
|
if self.worker:
|
|
status.connected = self.worker.connected
|
|
status.refresh_status()
|
|
|
|
def action_ping(self) -> None:
|
|
self._send_data(build_ping())
|
|
|
|
def action_read_params(self) -> None:
|
|
self._request_params()
|
|
|
|
def action_toggle_filter(self) -> None:
|
|
panel = self.query_one(TelemetryPanel)
|
|
panel.filter_enabled = not panel.filter_enabled
|
|
state = "ON" if panel.filter_enabled else "OFF"
|
|
self.notify(f"Filter {state}")
|
|
|
|
def action_show_log_path(self) -> None:
|
|
if self.logger:
|
|
self.notify(f"Log: {self.logger.db_path}", timeout=10)
|
|
else:
|
|
self.notify("Logger not active", severity="warning")
|