"""Textual TUI application for LVSolarBuck debug console.""" import time from textual.app import App, ComposeResult from textual.containers import Horizontal, Vertical, VerticalScroll from textual.widgets import Header, Footer, RichLog from .protocol import ( CMD_TELEMETRY, CMD_PARAM_WRITE_ACK, CMD_PARAM_VALUE, CMD_PONG, CMD_ERROR_MSG, decode_telemetry, decode_param_value, build_ping, build_param_read_all, PARAM_BY_ID, ) from .serial_worker import SerialWorker from .data_logger import DataLogger from .widgets.telemetry_panel import TelemetryPanel from .widgets.param_group import ParamGroup from .widgets.status_bar import StatusBar class DebugConsoleApp(App): """LVSolarBuck Debug Console""" TITLE = "LVSolarBuck Debug Console" CSS = """ Screen { layout: vertical; } #main { layout: horizontal; height: 1fr; } #left-panel { width: 35; min-width: 30; border-right: solid $primary; } #right-panel { width: 1fr; min-width: 40; } #error-log { height: 8; min-height: 4; border-top: solid $error; background: $surface; } """ BINDINGS = [ ("q", "quit", "Quit"), ("p", "ping", "Ping"), ("r", "read_params", "Read Params"), ("f", "toggle_filter", "Filter On/Off"), ("l", "show_log_path", "Log Path"), ] def __init__(self, port: str, baudrate: int = 460800): super().__init__() self.serial_port = port self.serial_baudrate = baudrate self.worker: SerialWorker | None = None self.logger: DataLogger | None = None self._telem_count = 0 self._fps_time = time.monotonic() self._last_seq = -1 def compose(self) -> ComposeResult: yield Header() with Horizontal(id="main"): with Vertical(id="left-panel"): yield TelemetryPanel() with VerticalScroll(id="right-panel"): yield ParamGroup("Global", self._send_data) yield ParamGroup("Compensator", self._send_data) yield ParamGroup("Vfly", self._send_data) yield ParamGroup("CC", self._send_data) yield ParamGroup("MPPT", self._send_data) yield ParamGroup("Deadtime", self._send_data) yield RichLog(id="error-log", markup=True, wrap=True) yield StatusBar() yield Footer() def on_mount(self) -> None: self.sub_title = f"{self.serial_port} {self.serial_baudrate}" log = self.query_one("#error-log", RichLog) log.write("[dim]Error log ready[/dim]") self.logger = DataLogger(log_dir="logs") self.logger.log_session_info(self.serial_port, self.serial_baudrate) log.write(f"[dim]Logging to {self.logger.db_path}[/dim]") self.worker = SerialWorker(self.serial_port, self.serial_baudrate) self.worker.set_frame_callback(self._on_frame) self.worker.set_status_callback(self._on_status) self.worker.start() self.set_interval(0.5, self._update_status) self.set_interval(2.0, self._request_params) def on_unmount(self) -> None: if self.worker: self.worker.stop() if self.logger: self.logger.close() def _send_data(self, data: bytes) -> None: if self.worker: self.worker.send(data) def _request_params(self) -> None: if self.worker and self.worker.connected: self._send_data(build_param_read_all()) def _on_frame(self, cmd: int, payload: bytes) -> None: if cmd == CMD_TELEMETRY: t = decode_telemetry(payload) if t is not None: self._telem_count += 1 if self._last_seq >= 0: expected = (self._last_seq + 1) & 0xFF if t.seq != expected: diff = (t.seq - expected) & 0xFF if self.worker: self.worker.drop_count += diff self._last_seq = t.seq if self.logger: self.logger.log_telemetry(t) self.call_from_thread(self._update_ui, t) elif cmd == CMD_PARAM_VALUE: result = decode_param_value(payload) if result: param_id, value = result if self.logger: self.logger.log_param(param_id, value) self.call_from_thread(self._update_param, param_id, value) elif cmd == CMD_PARAM_WRITE_ACK: result = decode_param_value(payload) if result: param_id, value = result if self.logger: self.logger.log_param(param_id, value) self.call_from_thread(self._update_param, param_id, value) self.call_from_thread(self.notify, "Parameter ACK", severity="information") elif cmd == CMD_ERROR_MSG: msg = payload.decode("ascii", errors="replace") self.call_from_thread(self._log_error, msg) elif cmd == CMD_PONG: self.call_from_thread(self.notify, "PONG received") def _on_status(self, connected: bool) -> None: self.call_from_thread(self._update_connected, connected) def _log_error(self, msg: str) -> None: log = self.query_one("#error-log", RichLog) ts = time.strftime("%H:%M:%S") is_guard = msg.startswith("GUARD") or msg.startswith("DIAG") or msg.startswith("RAW") or msg.startswith("CTRL") or msg.startswith("HW ") or msg.startswith("HRTIM") or msg.startswith("FLAGS") or msg == "Startup" if is_guard: log.write(f"[yellow][{ts}] {msg}[/yellow]") else: log.write(f"[bold red][{ts}] {msg}[/bold red]") self.notify(f"STM32: {msg}", severity="error", timeout=10) def _update_ui(self, t) -> None: self.query_one(TelemetryPanel).update_telemetry(t) status = self.query_one(StatusBar) if self.worker: status.pkt_count = self.worker.pkt_count status.drop_count = self.worker.drop_count status.last_seq = t.seq status.refresh_status() def _update_param(self, param_id: int, value: float) -> None: for group in self.query(ParamGroup): group.update_param(param_id, value) self.query_one(TelemetryPanel).update_dt_param(param_id, value) def _update_connected(self, connected: bool) -> None: status = self.query_one(StatusBar) status.connected = connected status.refresh_status() if connected: self._request_params() def _update_status(self) -> None: now = time.monotonic() elapsed = now - self._fps_time fps = self._telem_count / elapsed if elapsed > 0 else 0.0 self._telem_count = 0 self._fps_time = now status = self.query_one(StatusBar) status.fps = fps if self.worker: status.connected = self.worker.connected status.refresh_status() def action_ping(self) -> None: self._send_data(build_ping()) def action_read_params(self) -> None: self._request_params() def action_toggle_filter(self) -> None: panel = self.query_one(TelemetryPanel) panel.filter_enabled = not panel.filter_enabled state = "ON" if panel.filter_enabled else "OFF" self.notify(f"Filter {state}") def action_show_log_path(self) -> None: if self.logger: self.notify(f"Log: {self.logger.db_path}", timeout=10) else: self.notify("Logger not active", severity="warning")