"""Binary protocol encoder/decoder matching STM32 debug_protocol.h""" import struct from dataclasses import dataclass, field from typing import Optional SYNC_BYTE = 0xAA HEADER_SIZE = 3 CMD_TELEMETRY = 0x01 CMD_PARAM_WRITE = 0x02 CMD_PARAM_WRITE_ACK = 0x03 CMD_PARAM_READ_ALL = 0x04 CMD_PARAM_VALUE = 0x05 CMD_PING = 0x10 CMD_PONG = 0x11 CMD_ERROR_MSG = 0xE0 PTYPE_FLOAT = 0 PTYPE_UINT16 = 1 PTYPE_UINT8 = 2 PTYPE_INT32 = 3 # CRC8 table (poly 0x07) _CRC8_TABLE = [0] * 256 def _init_crc8(): for i in range(256): crc = i for _ in range(8): if crc & 0x80: crc = ((crc << 1) ^ 0x07) & 0xFF else: crc = (crc << 1) & 0xFF _CRC8_TABLE[i] = crc _init_crc8() def crc8(data: bytes) -> int: crc = 0x00 for b in data: crc = _CRC8_TABLE[crc ^ b] return crc @dataclass class TelemetryData: vin: float = 0.0 vout: float = 0.0 iin: float = 0.0 iout: float = 0.0 vfly: float = 0.0 etemp: float = 0.0 last_tmp: int = 0 VREF: int = 0 vfly_correction: int = 0 vfly_integral: float = 0.0 vfly_avg_debug: float = 0.0 cc_output_f: float = 0.0 mppt_iref: float = 0.0 mppt_last_vin: float = 0.0 mppt_last_iin: float = 0.0 p_in: float = 0.0 p_out: float = 0.0 seq: int = 0 TELEMETRY_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes TELEMETRY_SIZE = struct.calcsize(TELEMETRY_FMT) def decode_telemetry(payload: bytes) -> Optional[TelemetryData]: if len(payload) < TELEMETRY_SIZE: return None vals = struct.unpack(TELEMETRY_FMT, payload[:TELEMETRY_SIZE]) return TelemetryData( vin=vals[0], vout=vals[1], iin=vals[2], iout=vals[3], vfly=vals[4], etemp=vals[5], last_tmp=vals[6], VREF=vals[7], vfly_correction=vals[8], # vals[9] is pad vfly_integral=vals[10], vfly_avg_debug=vals[11], cc_output_f=vals[12], mppt_iref=vals[13], mppt_last_vin=vals[14], mppt_last_iin=vals[15], p_in=vals[16], p_out=vals[17], seq=vals[18], ) def build_frame(cmd: int, payload: bytes = b"") -> bytes: header = bytes([SYNC_BYTE, cmd, len(payload)]) frame_no_crc = header + payload return frame_no_crc + bytes([crc8(frame_no_crc)]) def build_param_write(param_id: int, param_type: int, value) -> bytes: if param_type == PTYPE_FLOAT: val_bytes = struct.pack(" bytes: return build_frame(CMD_PING) def build_param_read_all() -> bytes: return build_frame(CMD_PARAM_READ_ALL) def decode_param_value(payload: bytes) -> Optional[tuple[int, float]]: """Decode a PARAM_VALUE payload. Returns (param_id, value) or None.""" if len(payload) < 8: return None param_id = payload[0] param_type = payload[1] value_bytes = payload[4:8] if param_type == PTYPE_FLOAT: value = struct.unpack(" 128: self.state = self.WAIT_SYNC else: self.state = self.WAIT_PAYLOAD elif self.state == self.WAIT_PAYLOAD: self.payload.append(b) self.buf.append(b) self.idx += 1 if self.idx >= self.length: self.state = self.WAIT_CRC elif self.state == self.WAIT_CRC: expected = crc8(bytes(self.buf)) self.state = self.WAIT_SYNC if b == expected: yield (self.cmd, bytes(self.payload)) # Parameter registry @dataclass class ParamDef: id: int name: str ptype: int group: str min_val: float = -1e9 max_val: float = 1e9 fmt: str = ".4f" PARAMS = [ # Compensator ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"), # Vfly ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"), ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"), ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"), ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"), ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"), # CC ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"), ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"), ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"), ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"), ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"), ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"), # MPPT ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"), ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"), ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"), ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"), ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"), # Global ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"), # Deadtime ParamDef(0x60, "dt 0-3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x61, "dt 3-5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x62, "dt 5-10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x63, "dt 10-20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x64, "dt 20-30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ParamDef(0x65, "dt 30-45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), ] PARAM_BY_ID = {p.id: p for p in PARAMS} PARAM_BY_NAME = {p.name: p for p in PARAMS}