"""Synchronous serial link to the STM32 debug protocol. Provides blocking read/write of telemetry and parameters, suitable for automated tuning scripts (not a TUI). Reuses the binary protocol from code64/debug_console/protocol.py. """ from __future__ import annotations import struct import time from dataclasses import dataclass, field from typing import Optional import serial # ── Protocol constants ─────────────────────────────────────────────── SYNC_BYTE = 0xAA CMD_TELEMETRY = 0x01 CMD_PARAM_WRITE = 0x02 CMD_PARAM_WRITE_ACK = 0x03 CMD_PARAM_READ_ALL = 0x04 CMD_PARAM_VALUE = 0x05 CMD_PING = 0x10 CMD_PONG = 0x11 CMD_ERROR_MSG = 0xE0 PTYPE_FLOAT = 0 PTYPE_UINT16 = 1 PTYPE_UINT8 = 2 PTYPE_INT32 = 3 # ── CRC8 (poly 0x07) ──────────────────────────────────────────────── _CRC8_TABLE = [0] * 256 def _init_crc8(): for i in range(256): crc = i for _ in range(8): crc = ((crc << 1) ^ 0x07) & 0xFF if crc & 0x80 else (crc << 1) & 0xFF _CRC8_TABLE[i] = crc _init_crc8() def crc8(data: bytes) -> int: crc = 0x00 for b in data: crc = _CRC8_TABLE[crc ^ b] return crc # ── Telemetry ──────────────────────────────────────────────────────── @dataclass class Telemetry: """Decoded telemetry packet from the STM32.""" vin: float = 0.0 # mV vout: float = 0.0 # mV iin: float = 0.0 # mA (negative = into converter) iout: float = 0.0 # mA vfly: float = 0.0 # mV etemp: float = 0.0 # °C last_tmp: int = 0 VREF: int = 0 vfly_correction: int = 0 vfly_integral: float = 0.0 vfly_avg_debug: float = 0.0 cc_output_f: float = 0.0 mppt_iref: float = 0.0 mppt_last_vin: float = 0.0 mppt_last_iin: float = 0.0 p_in: float = 0.0 p_out: float = 0.0 seq: int = 0 timestamp: float = field(default_factory=time.time) @property def vin_V(self) -> float: return self.vin / 1000.0 @property def vout_V(self) -> float: return self.vout / 1000.0 @property def iin_A(self) -> float: return self.iin / 1000.0 @property def iout_A(self) -> float: return self.iout / 1000.0 @property def power_in_W(self) -> float: return self.vin * (-self.iin) / 1e6 @property def power_out_W(self) -> float: return self.vout * self.iout / 1e6 @property def efficiency(self) -> float: p_in = self.power_in_W return (self.power_out_W / p_in * 100.0) if p_in > 0.1 else 0.0 _TELEM_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes _TELEM_SIZE = struct.calcsize(_TELEM_FMT) def _decode_telemetry(payload: bytes) -> Optional[Telemetry]: if len(payload) < _TELEM_SIZE: return None v = struct.unpack(_TELEM_FMT, payload[:_TELEM_SIZE]) return Telemetry( vin=v[0], vout=v[1], iin=v[2], iout=v[3], vfly=v[4], etemp=v[5], last_tmp=v[6], VREF=v[7], vfly_correction=v[8], vfly_integral=v[10], vfly_avg_debug=v[11], cc_output_f=v[12], mppt_iref=v[13], mppt_last_vin=v[14], mppt_last_iin=v[15], p_in=v[16], p_out=v[17], seq=v[18], ) # ── Parameter definitions ──────────────────────────────────────────── @dataclass class ParamDef: id: int name: str ptype: int group: str min_val: float = -1e9 max_val: float = 1e9 fmt: str = ".4f" PARAMS = [ # Compensator ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"), # Vfly ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"), ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"), ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"), ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"), ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"), # CC ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"), ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"), ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"), ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"), ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"), ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"), # MPPT ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"), ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"), ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"), ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"), ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"), # Global ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"), # Deadtime ParamDef(0x60, "dt_0_3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x61, "dt_3_5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x62, "dt_5_10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x63, "dt_10_20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x64, "dt_20_30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x65, "dt_30_45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ] PARAM_BY_ID: dict[int, ParamDef] = {p.id: p for p in PARAMS} PARAM_BY_NAME: dict[str, ParamDef] = {p.name: p for p in PARAMS} # Deadtime brackets — current thresholds in mA matching firmware DT_BRACKETS = [ (0x60, "dt_0_3A", 0, 3000), (0x61, "dt_3_5A", 3000, 5000), (0x62, "dt_5_10A", 5000, 10000), (0x63, "dt_10_20A", 10000, 20000), (0x64, "dt_20_30A", 20000, 30000), (0x65, "dt_30_45A", 30000, 45000), ] # ── Frame building ─────────────────────────────────────────────────── def _build_frame(cmd: int, payload: bytes = b"") -> bytes: header = bytes([SYNC_BYTE, cmd, len(payload)]) frame = header + payload return frame + bytes([crc8(frame)]) def _build_param_write(param_id: int, ptype: int, value) -> bytes: if ptype == PTYPE_FLOAT: val_bytes = struct.pack(" Optional[tuple[int, float]]: if len(payload) < 8: return None param_id, ptype = payload[0], payload[1] vb = payload[4:8] if ptype == PTYPE_FLOAT: value = struct.unpack(" 128: self.state = 0 else: self.state = 3 elif self.state == 3: # WAIT_PAYLOAD self.payload.append(b) self.buf.append(b) self.idx += 1 if self.idx >= self.length: self.state = 4 elif self.state == 4: # WAIT_CRC expected = crc8(bytes(self.buf)) self.state = 0 if b == expected: yield (self.cmd, bytes(self.payload)) # ── STM32Link — synchronous serial interface ───────────────────────── class STM32Link: """Blocking serial link to STM32 debug protocol. Usage:: link = STM32Link("COM28") link.ping() t = link.read_telemetry() print(f"Vin={t.vin_V:.1f}V Iout={t.iout_A:.1f}A EFF={t.efficiency:.1f}%") link.write_param("dt_10_20A", 18) link.close() """ def __init__(self, port: str, baudrate: int = 460800, timeout: float = 2.0): self.ser = serial.Serial(port, baudrate, timeout=timeout) self._parser = _FrameParser() self._param_cache: dict[int, float] = {} def close(self): if self.ser and self.ser.is_open: self.ser.close() def __enter__(self): return self def __exit__(self, *exc): self.close() # ── Low-level ──────────────────────────────────────────────────── def _send(self, frame: bytes): self.ser.write(frame) def _recv_frames(self, timeout: float = 1.0) -> list[tuple[int, bytes]]: """Read available data and return decoded frames.""" frames = [] deadline = time.monotonic() + timeout while time.monotonic() < deadline: data = self.ser.read(self.ser.in_waiting or 1) if data: for cmd, payload in self._parser.feed(data): frames.append((cmd, payload)) if frames: # Drain any remaining data time.sleep(0.02) data = self.ser.read(self.ser.in_waiting) if data: for cmd, payload in self._parser.feed(data): frames.append((cmd, payload)) break return frames def _wait_for(self, target_cmd: int, timeout: float = 2.0) -> Optional[bytes]: """Wait for a specific command response, processing others.""" deadline = time.monotonic() + timeout while time.monotonic() < deadline: remaining = deadline - time.monotonic() if remaining <= 0: break data = self.ser.read(self.ser.in_waiting or 1) if data: for cmd, payload in self._parser.feed(data): if cmd == target_cmd: return payload # Cache param values seen in passing if cmd in (CMD_PARAM_VALUE, CMD_PARAM_WRITE_ACK): result = _decode_param_value(payload) if result: self._param_cache[result[0]] = result[1] # Cache telemetry too if cmd == CMD_TELEMETRY: self._last_telemetry = _decode_telemetry(payload) return None # ── Commands ───────────────────────────────────────────────────── def ping(self, timeout: float = 2.0) -> bool: """Send PING, return True if PONG received.""" self._send(_build_frame(CMD_PING)) return self._wait_for(CMD_PONG, timeout) is not None def read_telemetry(self, timeout: float = 2.0) -> Optional[Telemetry]: """Wait for next telemetry packet.""" payload = self._wait_for(CMD_TELEMETRY, timeout) if payload: return _decode_telemetry(payload) return None def read_telemetry_avg(self, n: int = 10, timeout: float = 5.0) -> Optional[Telemetry]: """Read n telemetry packets and return the average.""" samples: list[Telemetry] = [] deadline = time.monotonic() + timeout while len(samples) < n and time.monotonic() < deadline: t = self.read_telemetry(timeout=deadline - time.monotonic()) if t: samples.append(t) if not samples: return None # Average all float fields avg = Telemetry() for attr in ("vin", "vout", "iin", "iout", "vfly", "etemp", "vfly_integral", "vfly_avg_debug", "cc_output_f", "mppt_iref", "mppt_last_vin", "mppt_last_iin", "p_in", "p_out"): setattr(avg, attr, sum(getattr(s, attr) for s in samples) / len(samples)) avg.seq = samples[-1].seq return avg def request_all_params(self): """Request all parameter values from the STM32.""" self._send(_build_frame(CMD_PARAM_READ_ALL)) def read_all_params(self, timeout: float = 3.0) -> dict[str, float]: """Request and collect all parameter values.""" self._param_cache.clear() self.request_all_params() deadline = time.monotonic() + timeout while time.monotonic() < deadline: data = self.ser.read(self.ser.in_waiting or 1) if data: for cmd, payload in self._parser.feed(data): if cmd == CMD_PARAM_VALUE: result = _decode_param_value(payload) if result: self._param_cache[result[0]] = result[1] time.sleep(0.05) # Convert to name->value return { PARAM_BY_ID[pid].name: val for pid, val in self._param_cache.items() if pid in PARAM_BY_ID } def write_param(self, name: str, value: float, wait_ack: bool = True) -> bool: """Write a parameter by name. Returns True if ACK received.""" pdef = PARAM_BY_NAME.get(name) if not pdef: raise ValueError(f"Unknown parameter: {name!r}") if value < pdef.min_val or value > pdef.max_val: raise ValueError( f"{name}: {value} out of range [{pdef.min_val}, {pdef.max_val}]" ) frame = _build_param_write(pdef.id, pdef.ptype, value) self._send(frame) if wait_ack: payload = self._wait_for(CMD_PARAM_WRITE_ACK, timeout=2.0) if payload: result = _decode_param_value(payload) if result: self._param_cache[result[0]] = result[1] return True return False return True def write_param_by_id(self, param_id: int, value: float) -> bool: """Write a parameter by ID.""" pdef = PARAM_BY_ID.get(param_id) if not pdef: raise ValueError(f"Unknown param ID: 0x{param_id:02X}") return self.write_param(pdef.name, value)