Add LVSolarBuck64 firmware and debug console with uv support

STM32G474RB firmware for solar buck converter with MPPT, CC control,
Vfly compensation, and adaptive deadtime. Includes Textual TUI debug
console for real-time telemetry, parameter tuning, and SQLite logging.

Added pyproject.toml for uv: `cd code64 && uv run debug-console`

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 16:38:23 +07:00
parent 3f65b5f2f2
commit e7a23a3c7e
151 changed files with 231098 additions and 0 deletions
View File
+48
View File
@@ -0,0 +1,48 @@
"""Entry point: python -m debug_console [COM_PORT] [baudrate]"""
import sys
import serial.tools.list_ports
from .app import DebugConsoleApp
def select_port() -> str:
ports = sorted(serial.tools.list_ports.comports(), key=lambda p: p.device)
if not ports:
print("No COM ports found.")
sys.exit(1)
print("Available COM ports:")
for i, p in enumerate(ports):
desc = f" - {p.description}" if p.description and p.description != p.device else ""
print(f" [{i}] {p.device}{desc}")
while True:
choice = input(f"\nSelect port [0-{len(ports)-1}]: ").strip()
try:
idx = int(choice)
if 0 <= idx < len(ports):
return ports[idx].device
except ValueError:
# Allow typing the port name directly
for p in ports:
if choice.upper() == p.device.upper():
return p.device
print("Invalid selection, try again.")
def main():
if len(sys.argv) >= 2:
port = sys.argv[1]
else:
port = select_port()
baudrate = int(sys.argv[2]) if len(sys.argv) > 2 else 460800
print(f"Connecting to {port} at {baudrate} baud...")
app = DebugConsoleApp(port, baudrate)
app.run()
if __name__ == "__main__":
main()
+209
View File
@@ -0,0 +1,209 @@
"""Textual TUI application for LVSolarBuck debug console."""
import time
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.widgets import Header, Footer, RichLog
from .protocol import (
CMD_TELEMETRY, CMD_PARAM_WRITE_ACK, CMD_PARAM_VALUE, CMD_PONG, CMD_ERROR_MSG,
decode_telemetry, decode_param_value, build_ping, build_param_read_all,
PARAM_BY_ID,
)
from .serial_worker import SerialWorker
from .data_logger import DataLogger
from .widgets.telemetry_panel import TelemetryPanel
from .widgets.param_group import ParamGroup
from .widgets.status_bar import StatusBar
class DebugConsoleApp(App):
"""LVSolarBuck Debug Console"""
TITLE = "LVSolarBuck Debug Console"
CSS = """
Screen {
layout: vertical;
}
#main {
layout: horizontal;
height: 1fr;
}
#left-panel {
width: 35;
min-width: 30;
border-right: solid $primary;
}
#right-panel {
width: 1fr;
min-width: 40;
}
#error-log {
height: 8;
min-height: 4;
border-top: solid $error;
background: $surface;
}
"""
BINDINGS = [
("q", "quit", "Quit"),
("p", "ping", "Ping"),
("r", "read_params", "Read Params"),
("f", "toggle_filter", "Filter On/Off"),
("l", "show_log_path", "Log Path"),
]
def __init__(self, port: str, baudrate: int = 460800):
super().__init__()
self.serial_port = port
self.serial_baudrate = baudrate
self.worker: SerialWorker | None = None
self.logger: DataLogger | None = None
self._telem_count = 0
self._fps_time = time.monotonic()
self._last_seq = -1
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main"):
with Vertical(id="left-panel"):
yield TelemetryPanel()
with VerticalScroll(id="right-panel"):
yield ParamGroup("Global", self._send_data)
yield ParamGroup("Compensator", self._send_data)
yield ParamGroup("Vfly", self._send_data)
yield ParamGroup("CC", self._send_data)
yield ParamGroup("MPPT", self._send_data)
yield ParamGroup("Deadtime", self._send_data)
yield RichLog(id="error-log", markup=True, wrap=True)
yield StatusBar()
yield Footer()
def on_mount(self) -> None:
self.sub_title = f"{self.serial_port} {self.serial_baudrate}"
log = self.query_one("#error-log", RichLog)
log.write("[dim]Error log ready[/dim]")
self.logger = DataLogger(log_dir="logs")
self.logger.log_session_info(self.serial_port, self.serial_baudrate)
log.write(f"[dim]Logging to {self.logger.db_path}[/dim]")
self.worker = SerialWorker(self.serial_port, self.serial_baudrate)
self.worker.set_frame_callback(self._on_frame)
self.worker.set_status_callback(self._on_status)
self.worker.start()
self.set_interval(0.5, self._update_status)
self.set_interval(2.0, self._request_params)
def on_unmount(self) -> None:
if self.worker:
self.worker.stop()
if self.logger:
self.logger.close()
def _send_data(self, data: bytes) -> None:
if self.worker:
self.worker.send(data)
def _request_params(self) -> None:
if self.worker and self.worker.connected:
self._send_data(build_param_read_all())
def _on_frame(self, cmd: int, payload: bytes) -> None:
if cmd == CMD_TELEMETRY:
t = decode_telemetry(payload)
if t is not None:
self._telem_count += 1
if self._last_seq >= 0:
expected = (self._last_seq + 1) & 0xFF
if t.seq != expected:
diff = (t.seq - expected) & 0xFF
if self.worker:
self.worker.drop_count += diff
self._last_seq = t.seq
if self.logger:
self.logger.log_telemetry(t)
self.call_from_thread(self._update_ui, t)
elif cmd == CMD_PARAM_VALUE:
result = decode_param_value(payload)
if result:
param_id, value = result
if self.logger:
self.logger.log_param(param_id, value)
self.call_from_thread(self._update_param, param_id, value)
elif cmd == CMD_PARAM_WRITE_ACK:
result = decode_param_value(payload)
if result:
param_id, value = result
if self.logger:
self.logger.log_param(param_id, value)
self.call_from_thread(self._update_param, param_id, value)
self.call_from_thread(self.notify, "Parameter ACK", severity="information")
elif cmd == CMD_ERROR_MSG:
msg = payload.decode("ascii", errors="replace")
self.call_from_thread(self._log_error, msg)
elif cmd == CMD_PONG:
self.call_from_thread(self.notify, "PONG received")
def _on_status(self, connected: bool) -> None:
self.call_from_thread(self._update_connected, connected)
def _log_error(self, msg: str) -> None:
log = self.query_one("#error-log", RichLog)
ts = time.strftime("%H:%M:%S")
is_guard = msg.startswith("GUARD") or msg.startswith("DIAG") or msg.startswith("RAW") or msg.startswith("CTRL") or msg.startswith("HW ") or msg.startswith("HRTIM") or msg.startswith("FLAGS") or msg == "Startup"
if is_guard:
log.write(f"[yellow][{ts}] {msg}[/yellow]")
else:
log.write(f"[bold red][{ts}] {msg}[/bold red]")
self.notify(f"STM32: {msg}", severity="error", timeout=10)
def _update_ui(self, t) -> None:
self.query_one(TelemetryPanel).update_telemetry(t)
status = self.query_one(StatusBar)
if self.worker:
status.pkt_count = self.worker.pkt_count
status.drop_count = self.worker.drop_count
status.last_seq = t.seq
status.refresh_status()
def _update_param(self, param_id: int, value: float) -> None:
for group in self.query(ParamGroup):
group.update_param(param_id, value)
self.query_one(TelemetryPanel).update_dt_param(param_id, value)
def _update_connected(self, connected: bool) -> None:
status = self.query_one(StatusBar)
status.connected = connected
status.refresh_status()
if connected:
self._request_params()
def _update_status(self) -> None:
now = time.monotonic()
elapsed = now - self._fps_time
fps = self._telem_count / elapsed if elapsed > 0 else 0.0
self._telem_count = 0
self._fps_time = now
status = self.query_one(StatusBar)
status.fps = fps
if self.worker:
status.connected = self.worker.connected
status.refresh_status()
def action_ping(self) -> None:
self._send_data(build_ping())
def action_read_params(self) -> None:
self._request_params()
def action_toggle_filter(self) -> None:
panel = self.query_one(TelemetryPanel)
panel.filter_enabled = not panel.filter_enabled
state = "ON" if panel.filter_enabled else "OFF"
self.notify(f"Filter {state}")
def action_show_log_path(self) -> None:
if self.logger:
self.notify(f"Log: {self.logger.db_path}", timeout=10)
else:
self.notify("Logger not active", severity="warning")
+153
View File
@@ -0,0 +1,153 @@
"""SQLite data logger for continuous telemetry recording."""
import os
import sqlite3
import time
from dataclasses import fields
from .protocol import TelemetryData, PARAM_BY_ID
# Telemetry fields to log (all numeric fields from TelemetryData)
_TELEM_FIELDS = [f.name for f in fields(TelemetryData)]
# Columns that are integers in the DB
_INT_FIELDS = {"last_tmp", "VREF", "vfly_correction", "seq"}
class DataLogger:
"""Streaming SQLite logger — one DB file per session."""
BATCH_SIZE = 50 # commit every N rows (~5s at 10Hz)
def __init__(self, log_dir: str = "logs"):
os.makedirs(log_dir, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
self.db_path = os.path.join(log_dir, f"session_{ts}.db")
self._conn = sqlite3.connect(self.db_path, isolation_level=None,
check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._create_tables()
self._pending = 0
self._conn.execute("BEGIN")
self._t0 = time.monotonic()
def _create_tables(self):
self._conn.execute(
"CREATE TABLE IF NOT EXISTS session_info "
"(key TEXT PRIMARY KEY, value TEXT)"
)
self._conn.execute(
"CREATE TABLE IF NOT EXISTS params ("
" timestamp REAL NOT NULL,"
" param_id INTEGER PRIMARY KEY,"
" param_name TEXT NOT NULL,"
" param_group TEXT NOT NULL,"
" param_type INTEGER NOT NULL,"
" value REAL NOT NULL"
")"
)
# Build telemetry column list from dataclass fields
col_defs = ["rowid INTEGER PRIMARY KEY", "timestamp REAL NOT NULL",
"mono REAL NOT NULL"]
for name in _TELEM_FIELDS:
if name in _INT_FIELDS:
col_defs.append(f"{name} INTEGER")
else:
col_defs.append(f"{name} REAL")
# Add computed columns
col_defs.append("p_in_local REAL")
col_defs.append("p_out_local REAL")
self._conn.execute(
f"CREATE TABLE IF NOT EXISTS telemetry ({', '.join(col_defs)})"
)
self._conn.execute(
"CREATE INDEX IF NOT EXISTS idx_telem_time ON telemetry(timestamp)"
)
def log_session_info(self, port: str, baudrate: int):
self._conn.execute(
"INSERT OR REPLACE INTO session_info VALUES (?, ?)",
("start_time", time.strftime("%Y-%m-%d %H:%M:%S")),
)
self._conn.execute(
"INSERT OR REPLACE INTO session_info VALUES (?, ?)",
("serial_port", port),
)
self._conn.execute(
"INSERT OR REPLACE INTO session_info VALUES (?, ?)",
("baudrate", str(baudrate)),
)
def log_telemetry(self, t: TelemetryData):
now = time.time()
mono = time.monotonic() - self._t0
values = [now, mono]
for name in _TELEM_FIELDS:
values.append(getattr(t, name))
# computed p_in/p_out from raw telemetry values
values.append(t.vin * (-t.iin) / 1e6)
values.append(t.vout * t.iout / 1e6)
placeholders = ", ".join(["?"] * len(values))
self._conn.execute(
f"INSERT INTO telemetry (timestamp, mono, "
f"{', '.join(_TELEM_FIELDS)}, p_in_local, p_out_local) "
f"VALUES ({placeholders})",
values,
)
self._pending += 1
if self._pending >= self.BATCH_SIZE:
self._conn.execute("COMMIT")
self._conn.execute("BEGIN")
self._pending = 0
def log_param(self, param_id: int, value: float):
pdef = PARAM_BY_ID.get(param_id)
if not pdef:
return
self._conn.execute(
"INSERT OR REPLACE INTO params VALUES (?, ?, ?, ?, ?, ?)",
(time.time(), param_id, pdef.name, pdef.group, pdef.ptype, value),
)
def close(self):
if self._conn:
try:
self._conn.execute("COMMIT")
except sqlite3.OperationalError:
pass
self._conn.close()
self._conn = None
# ---- Static analysis helpers ----
@staticmethod
def load_telemetry(db_path: str):
"""Load telemetry table into a numpy structured array.
Requires numpy (not needed for logging, only for post-analysis).
"""
import numpy as np
conn = sqlite3.connect(db_path)
cur = conn.execute("SELECT * FROM telemetry")
col_names = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.close()
if not rows:
return np.array([])
dtypes = []
for name in col_names:
if name in ("rowid", "last_tmp", "VREF", "vfly_correction", "seq"):
dtypes.append((name, "i4"))
else:
dtypes.append((name, "f8"))
return np.array([tuple(r) for r in rows], dtype=dtypes)
@staticmethod
def load_params(db_path: str) -> dict[str, float]:
"""Load parameter snapshot as {name: value} dict."""
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT param_name, value FROM params"
).fetchall()
conn.close()
return {name: val for name, val in rows}
+233
View File
@@ -0,0 +1,233 @@
"""Binary protocol encoder/decoder matching STM32 debug_protocol.h"""
import struct
from dataclasses import dataclass, field
from typing import Optional
SYNC_BYTE = 0xAA
HEADER_SIZE = 3
CMD_TELEMETRY = 0x01
CMD_PARAM_WRITE = 0x02
CMD_PARAM_WRITE_ACK = 0x03
CMD_PARAM_READ_ALL = 0x04
CMD_PARAM_VALUE = 0x05
CMD_PING = 0x10
CMD_PONG = 0x11
CMD_ERROR_MSG = 0xE0
PTYPE_FLOAT = 0
PTYPE_UINT16 = 1
PTYPE_UINT8 = 2
PTYPE_INT32 = 3
# CRC8 table (poly 0x07)
_CRC8_TABLE = [0] * 256
def _init_crc8():
for i in range(256):
crc = i
for _ in range(8):
if crc & 0x80:
crc = ((crc << 1) ^ 0x07) & 0xFF
else:
crc = (crc << 1) & 0xFF
_CRC8_TABLE[i] = crc
_init_crc8()
def crc8(data: bytes) -> int:
crc = 0x00
for b in data:
crc = _CRC8_TABLE[crc ^ b]
return crc
@dataclass
class TelemetryData:
vin: float = 0.0
vout: float = 0.0
iin: float = 0.0
iout: float = 0.0
vfly: float = 0.0
etemp: float = 0.0
last_tmp: int = 0
VREF: int = 0
vfly_correction: int = 0
vfly_integral: float = 0.0
vfly_avg_debug: float = 0.0
cc_output_f: float = 0.0
mppt_iref: float = 0.0
mppt_last_vin: float = 0.0
mppt_last_iin: float = 0.0
p_in: float = 0.0
p_out: float = 0.0
seq: int = 0
TELEMETRY_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes
TELEMETRY_SIZE = struct.calcsize(TELEMETRY_FMT)
def decode_telemetry(payload: bytes) -> Optional[TelemetryData]:
if len(payload) < TELEMETRY_SIZE:
return None
vals = struct.unpack(TELEMETRY_FMT, payload[:TELEMETRY_SIZE])
return TelemetryData(
vin=vals[0], vout=vals[1], iin=vals[2], iout=vals[3],
vfly=vals[4], etemp=vals[5],
last_tmp=vals[6], VREF=vals[7], vfly_correction=vals[8],
# vals[9] is pad
vfly_integral=vals[10], vfly_avg_debug=vals[11],
cc_output_f=vals[12], mppt_iref=vals[13],
mppt_last_vin=vals[14], mppt_last_iin=vals[15],
p_in=vals[16], p_out=vals[17],
seq=vals[18],
)
def build_frame(cmd: int, payload: bytes = b"") -> bytes:
header = bytes([SYNC_BYTE, cmd, len(payload)])
frame_no_crc = header + payload
return frame_no_crc + bytes([crc8(frame_no_crc)])
def build_param_write(param_id: int, param_type: int, value) -> bytes:
if param_type == PTYPE_FLOAT:
val_bytes = struct.pack("<f", float(value))
elif param_type == PTYPE_UINT16:
val_bytes = struct.pack("<HH", int(value), 0) # pad to 4 bytes
elif param_type == PTYPE_UINT8:
val_bytes = struct.pack("<Bxxx", int(value))
elif param_type == PTYPE_INT32:
val_bytes = struct.pack("<i", int(value))
else:
val_bytes = struct.pack("<I", int(value))
payload = struct.pack("<BBxx", param_id, param_type) + val_bytes
return build_frame(CMD_PARAM_WRITE, payload)
def build_ping() -> bytes:
return build_frame(CMD_PING)
def build_param_read_all() -> bytes:
return build_frame(CMD_PARAM_READ_ALL)
def decode_param_value(payload: bytes) -> Optional[tuple[int, float]]:
"""Decode a PARAM_VALUE payload. Returns (param_id, value) or None."""
if len(payload) < 8:
return None
param_id = payload[0]
param_type = payload[1]
value_bytes = payload[4:8]
if param_type == PTYPE_FLOAT:
value = struct.unpack("<f", value_bytes)[0]
elif param_type == PTYPE_UINT16:
value = float(struct.unpack("<H", value_bytes[:2])[0])
elif param_type == PTYPE_UINT8:
value = float(value_bytes[0])
elif param_type == PTYPE_INT32:
value = float(struct.unpack("<i", value_bytes)[0])
else:
value = float(struct.unpack("<I", value_bytes)[0])
return (param_id, value)
class FrameParser:
"""State machine parser for incoming frames."""
WAIT_SYNC = 0
WAIT_CMD = 1
WAIT_LEN = 2
WAIT_PAYLOAD = 3
WAIT_CRC = 4
def __init__(self):
self.state = self.WAIT_SYNC
self.cmd = 0
self.length = 0
self.buf = bytearray()
self.payload = bytearray()
self.idx = 0
def feed(self, data: bytes):
"""Feed bytes, yield (cmd, payload) tuples for complete frames."""
for b in data:
if self.state == self.WAIT_SYNC:
if b == SYNC_BYTE:
self.buf = bytearray([b])
self.state = self.WAIT_CMD
elif self.state == self.WAIT_CMD:
self.cmd = b
self.buf.append(b)
self.state = self.WAIT_LEN
elif self.state == self.WAIT_LEN:
self.length = b
self.buf.append(b)
self.payload = bytearray()
self.idx = 0
if b == 0:
self.state = self.WAIT_CRC
elif b > 128:
self.state = self.WAIT_SYNC
else:
self.state = self.WAIT_PAYLOAD
elif self.state == self.WAIT_PAYLOAD:
self.payload.append(b)
self.buf.append(b)
self.idx += 1
if self.idx >= self.length:
self.state = self.WAIT_CRC
elif self.state == self.WAIT_CRC:
expected = crc8(bytes(self.buf))
self.state = self.WAIT_SYNC
if b == expected:
yield (self.cmd, bytes(self.payload))
# Parameter registry
@dataclass
class ParamDef:
id: int
name: str
ptype: int
group: str
min_val: float = -1e9
max_val: float = 1e9
fmt: str = ".4f"
PARAMS = [
# Compensator
ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"),
# Vfly
ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"),
ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"),
ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"),
ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"),
ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"),
# CC
ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"),
ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"),
ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"),
ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"),
ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"),
ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"),
# MPPT
ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"),
ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"),
ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"),
# Global
ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"),
# Deadtime
ParamDef(0x60, "dt 0-3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x61, "dt 3-5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x62, "dt 5-10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x63, "dt 10-20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x64, "dt 20-30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x65, "dt 30-45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
]
PARAM_BY_ID = {p.id: p for p in PARAMS}
PARAM_BY_NAME = {p.name: p for p in PARAMS}
+2
View File
@@ -0,0 +1,2 @@
pyserial>=3.5
textual>=0.40.0
+100
View File
@@ -0,0 +1,100 @@
"""Threading-based serial reader/writer."""
import threading
import queue
import time
from typing import Callable, Optional
import serial
from .protocol import FrameParser, build_ping
class SerialWorker:
"""Manages serial port in a background thread."""
def __init__(self, port: str, baudrate: int = 460800):
self.port = port
self.baudrate = baudrate
self.ser: Optional[serial.Serial] = None
self._rx_thread: Optional[threading.Thread] = None
self._running = False
self._parser = FrameParser()
self._tx_queue: queue.Queue = queue.Queue()
self._frame_callback: Optional[Callable] = None
self._status_callback: Optional[Callable] = None
self.connected = False
self.pkt_count = 0
self.drop_count = 0
self._last_seq = -1
def set_frame_callback(self, cb: Callable):
self._frame_callback = cb
def set_status_callback(self, cb: Callable):
self._status_callback = cb
def start(self):
self._running = True
self._rx_thread = threading.Thread(target=self._run, daemon=True)
self._rx_thread.start()
def stop(self):
self._running = False
if self._rx_thread:
self._rx_thread.join(timeout=2)
if self.ser and self.ser.is_open:
self.ser.close()
self.connected = False
def send(self, data: bytes):
self._tx_queue.put(data)
def _notify_status(self, connected: bool):
self.connected = connected
if self._status_callback:
self._status_callback(connected)
def _run(self):
while self._running:
# Connect
if not self.ser or not self.ser.is_open:
try:
self.ser = serial.Serial(
self.port, self.baudrate, timeout=0.1
)
self._parser = FrameParser()
self._notify_status(True)
except serial.SerialException:
self._notify_status(False)
time.sleep(1)
continue
# TX
try:
while not self._tx_queue.empty():
data = self._tx_queue.get_nowait()
self.ser.write(data)
except (serial.SerialException, OSError):
self._handle_disconnect()
continue
# RX
try:
data = self.ser.read(256)
if data:
for cmd, payload in self._parser.feed(data):
self.pkt_count += 1
if self._frame_callback:
self._frame_callback(cmd, payload)
except (serial.SerialException, OSError):
self._handle_disconnect()
def _handle_disconnect(self):
try:
if self.ser:
self.ser.close()
except Exception:
pass
self.ser = None
self._notify_status(False)
time.sleep(1)
+102
View File
@@ -0,0 +1,102 @@
"""Editable parameter rows per controller group."""
from textual.app import ComposeResult
from textual.containers import Vertical, Horizontal
from textual.widgets import Static, Input, Button
from ..protocol import ParamDef, PARAMS, build_param_write
class ParamRow(Horizontal):
"""Single parameter row with label, current value, input, and send button."""
DEFAULT_CSS = """
ParamRow {
height: auto;
width: 100%;
padding: 0;
margin: 0;
}
ParamRow .param-label {
width: 16;
height: 1;
padding: 0 1;
}
ParamRow .param-value {
width: 14;
height: 1;
padding: 0 1;
color: $success;
}
ParamRow Input {
width: 1fr;
}
ParamRow Button {
width: 6;
min-width: 6;
}
"""
def __init__(self, param: ParamDef, send_callback):
super().__init__()
self.param = param
self._send_callback = send_callback
def compose(self) -> ComposeResult:
yield Static(self.param.name, classes="param-label")
yield Static("---", id=f"val_{self.param.id:02x}", classes="param-value")
yield Input(placeholder="new", id=f"input_{self.param.id:02x}")
yield Button("Go", id=f"btn_{self.param.id:02x}", variant="primary")
def set_current_value(self, value: float):
fmt = self.param.fmt
label = self.query_one(f"#val_{self.param.id:02x}", Static)
label.update(f"{value:{fmt}}")
def on_button_pressed(self, event: Button.Pressed) -> None:
inp = self.query_one(Input)
try:
value = float(inp.value)
except ValueError:
return
frame = build_param_write(self.param.id, self.param.ptype, value)
self._send_callback(frame)
inp.value = ""
def on_input_submitted(self, event: Input.Submitted) -> None:
try:
value = float(event.value)
except ValueError:
return
frame = build_param_write(self.param.id, self.param.ptype, value)
self._send_callback(frame)
event.input.value = ""
class ParamGroup(Vertical):
"""A group of parameter rows for one controller."""
DEFAULT_CSS = """
ParamGroup {
width: 100%;
height: auto;
padding: 0 1;
margin: 0 0 1 0;
}
"""
def __init__(self, group_name: str, send_callback):
super().__init__()
self.group_name = group_name
self._send_callback = send_callback
self._params = [p for p in PARAMS if p.group == group_name]
def compose(self) -> ComposeResult:
yield Static(f"[bold]{self.group_name} CONTROLLER[/bold]")
for p in self._params:
yield ParamRow(p, self._send_callback)
def update_param(self, param_id: int, value: float):
for row in self.query(ParamRow):
if row.param.id == param_id:
row.set_current_value(value)
return
@@ -0,0 +1,28 @@
"""Connection status bar."""
from textual.widgets import Static
class StatusBar(Static):
"""Bottom status bar showing connection info."""
DEFAULT_CSS = """
StatusBar {
dock: bottom;
height: 1;
background: $primary-background;
color: $text;
padding: 0 1;
}
"""
def __init__(self):
super().__init__("")
self.connected = False
self.pkt_count = 0
self.drop_count = 0
self.last_seq = 0
self.fps = 0.0
def refresh_status(self):
conn = "[green]CONN:OK[/green]" if self.connected else "[red]CONN:LOST[/red]"
self.update(f" {conn} PKTS:{self.pkt_count} DROPS:{self.drop_count} SEQ:{self.last_seq} FPS:{self.fps:.1f}")
@@ -0,0 +1,106 @@
"""Read-only telemetry measurement display."""
from textual.widgets import Static
from ..protocol import TelemetryData
class TelemetryPanel(Static):
"""Displays all telemetry values."""
DEFAULT_CSS = """
TelemetryPanel {
width: 100%;
height: auto;
min-height: 20;
padding: 1;
}
"""
ALPHA = 0.05 # EMA filter coefficient
DT_BREAKPOINTS = [0, 3000, 5000, 10000, 20000, 30000, 45000]
DT_DEFAULTS = [25, 20, 20, 20, 15, 15]
DT_PARAM_IDS = [0x60, 0x61, 0x62, 0x63, 0x64, 0x65]
def __init__(self):
super().__init__("Waiting for telemetry...")
self._data: TelemetryData | None = None
self._vin_f: float = 0.0
self._vout_f: float = 0.0
self._iin_f: float = 0.0
self._iout_f: float = 0.0
self._vfly_f: float = 0.0
self._seeded: bool = False
self.filter_enabled: bool = True
self._dt_values: list[int] = list(self.DT_DEFAULTS)
def update_telemetry(self, t: TelemetryData):
self._data = t
# EMA filter on V/I
if not self._seeded:
self._vin_f = t.vin
self._vout_f = t.vout
self._iin_f = t.iin
self._iout_f = t.iout
self._vfly_f = t.vfly
self._seeded = True
else:
a = self.ALPHA
self._vin_f += a * (t.vin - self._vin_f)
self._vout_f += a * (t.vout - self._vout_f)
self._iin_f += a * (t.iin - self._iin_f)
self._iout_f += a * (t.iout - self._iout_f)
self._vfly_f += a * (t.vfly - self._vfly_f)
# Compute power and efficiency from filtered or raw values
if self.filter_enabled:
v_in, v_out, i_in, i_out = self._vin_f, self._vout_f, self._iin_f, self._iout_f
else:
v_in, v_out, i_in, i_out = t.vin, t.vout, t.iin, t.iout
p_in = v_in * (-i_in) / 1e6
p_out = v_out * i_out / 1e6
if p_in > 0.1:
eta = p_out / p_in * 100.0
else:
eta = 0.0
# Compute active deadtime from iout using same lookup as ISR
active_dt = self._dt_values[0]
for i in range(len(self.DT_BREAKPOINTS) - 1, -1, -1):
if i_out >= self.DT_BREAKPOINTS[i]:
active_dt = self._dt_values[min(i, len(self._dt_values) - 1)]
break
lines = [
"[bold]MEASUREMENTS[/bold]",
"",
f" vin : {self._vin_f if self.filter_enabled else t.vin:8.0f} mV",
f" vout : {self._vout_f if self.filter_enabled else t.vout:8.0f} mV",
f" iin : {self._iin_f if self.filter_enabled else t.iin:8.0f} mA",
f" iout : {self._iout_f if self.filter_enabled else t.iout:8.0f} mA",
f" P_IN : {p_in:8.2f} W",
f" P_OUT : {p_out:8.2f} W",
f" EFF : {eta:8.1f} %" if p_in > 0.1 else " EFF : --- %",
f" vfly : {self._vfly_f if self.filter_enabled else t.vfly:8.0f} mV",
f" etemp : {t.etemp:8.1f} C",
f" deadtime : {active_dt:8d} ticks",
f" : {active_dt / 1.36:8.1f} ns",
"",
f" last_tmp : {t.last_tmp:8d}",
f" VREF : {t.VREF:8d}",
f" vfly_corr : {t.vfly_correction:8d}",
"",
f" vfly_int : {t.vfly_integral:10.3f}",
f" vfly_avg : {t.vfly_avg_debug:10.1f}",
f" cc.out_f : {t.cc_output_f:10.1f}",
f" mppt.iref : {t.mppt_iref:8.0f} mA",
f" mppt.vin : {t.mppt_last_vin:8.0f}",
f" mppt.iin : {t.mppt_last_iin:8.0f}",
]
self.update("\n".join(lines))
def update_dt_param(self, param_id: int, value: float):
if param_id in self.DT_PARAM_IDS:
idx = self.DT_PARAM_IDS.index(param_id)
self._dt_values[idx] = int(value)