- stm32_link.py: synchronous serial interface to STM32 debug protocol (ping, telemetry read/avg, param read/write, frame parser) - tuner.py: automated tuning combining HIOKI + STM32 measurements (param sweep, deadtime optimization, multi-point sweep, CSV/plot output) - CLI commands: stm32-read, stm32-write, tune-param, tune-deadtime - README: complete step-by-step guide covering setup, sweeps, analysis, tuning, shade profiles, debug console, and parameter reference Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
441 lines
16 KiB
Python
441 lines
16 KiB
Python
"""Synchronous serial link to the STM32 debug protocol.
|
|
|
|
Provides blocking read/write of telemetry and parameters, suitable
|
|
for automated tuning scripts (not a TUI). Reuses the binary protocol
|
|
from code64/debug_console/protocol.py.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import struct
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
import serial
|
|
|
|
# ── Protocol constants ───────────────────────────────────────────────
|
|
|
|
SYNC_BYTE = 0xAA
|
|
|
|
CMD_TELEMETRY = 0x01
|
|
CMD_PARAM_WRITE = 0x02
|
|
CMD_PARAM_WRITE_ACK = 0x03
|
|
CMD_PARAM_READ_ALL = 0x04
|
|
CMD_PARAM_VALUE = 0x05
|
|
CMD_PING = 0x10
|
|
CMD_PONG = 0x11
|
|
CMD_ERROR_MSG = 0xE0
|
|
|
|
PTYPE_FLOAT = 0
|
|
PTYPE_UINT16 = 1
|
|
PTYPE_UINT8 = 2
|
|
PTYPE_INT32 = 3
|
|
|
|
|
|
# ── CRC8 (poly 0x07) ────────────────────────────────────────────────
|
|
|
|
_CRC8_TABLE = [0] * 256
|
|
|
|
def _init_crc8():
|
|
for i in range(256):
|
|
crc = i
|
|
for _ in range(8):
|
|
crc = ((crc << 1) ^ 0x07) & 0xFF if crc & 0x80 else (crc << 1) & 0xFF
|
|
_CRC8_TABLE[i] = crc
|
|
|
|
_init_crc8()
|
|
|
|
|
|
def crc8(data: bytes) -> int:
|
|
crc = 0x00
|
|
for b in data:
|
|
crc = _CRC8_TABLE[crc ^ b]
|
|
return crc
|
|
|
|
|
|
# ── Telemetry ────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class Telemetry:
|
|
"""Decoded telemetry packet from the STM32."""
|
|
vin: float = 0.0 # mV
|
|
vout: float = 0.0 # mV
|
|
iin: float = 0.0 # mA (negative = into converter)
|
|
iout: float = 0.0 # mA
|
|
vfly: float = 0.0 # mV
|
|
etemp: float = 0.0 # °C
|
|
last_tmp: int = 0
|
|
VREF: int = 0
|
|
vfly_correction: int = 0
|
|
vfly_integral: float = 0.0
|
|
vfly_avg_debug: float = 0.0
|
|
cc_output_f: float = 0.0
|
|
mppt_iref: float = 0.0
|
|
mppt_last_vin: float = 0.0
|
|
mppt_last_iin: float = 0.0
|
|
p_in: float = 0.0
|
|
p_out: float = 0.0
|
|
seq: int = 0
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
@property
|
|
def vin_V(self) -> float:
|
|
return self.vin / 1000.0
|
|
|
|
@property
|
|
def vout_V(self) -> float:
|
|
return self.vout / 1000.0
|
|
|
|
@property
|
|
def iin_A(self) -> float:
|
|
return self.iin / 1000.0
|
|
|
|
@property
|
|
def iout_A(self) -> float:
|
|
return self.iout / 1000.0
|
|
|
|
@property
|
|
def power_in_W(self) -> float:
|
|
return self.vin * (-self.iin) / 1e6
|
|
|
|
@property
|
|
def power_out_W(self) -> float:
|
|
return self.vout * self.iout / 1e6
|
|
|
|
@property
|
|
def efficiency(self) -> float:
|
|
p_in = self.power_in_W
|
|
return (self.power_out_W / p_in * 100.0) if p_in > 0.1 else 0.0
|
|
|
|
|
|
_TELEM_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes
|
|
_TELEM_SIZE = struct.calcsize(_TELEM_FMT)
|
|
|
|
|
|
def _decode_telemetry(payload: bytes) -> Optional[Telemetry]:
|
|
if len(payload) < _TELEM_SIZE:
|
|
return None
|
|
v = struct.unpack(_TELEM_FMT, payload[:_TELEM_SIZE])
|
|
return Telemetry(
|
|
vin=v[0], vout=v[1], iin=v[2], iout=v[3], vfly=v[4], etemp=v[5],
|
|
last_tmp=v[6], VREF=v[7], vfly_correction=v[8],
|
|
vfly_integral=v[10], vfly_avg_debug=v[11],
|
|
cc_output_f=v[12], mppt_iref=v[13],
|
|
mppt_last_vin=v[14], mppt_last_iin=v[15],
|
|
p_in=v[16], p_out=v[17], seq=v[18],
|
|
)
|
|
|
|
|
|
# ── Parameter definitions ────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class ParamDef:
|
|
id: int
|
|
name: str
|
|
ptype: int
|
|
group: str
|
|
min_val: float = -1e9
|
|
max_val: float = 1e9
|
|
fmt: str = ".4f"
|
|
|
|
|
|
PARAMS = [
|
|
# Compensator
|
|
ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"),
|
|
# Vfly
|
|
ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"),
|
|
ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"),
|
|
ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"),
|
|
ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"),
|
|
ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"),
|
|
# CC
|
|
ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"),
|
|
ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"),
|
|
ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"),
|
|
ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"),
|
|
ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"),
|
|
ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"),
|
|
# MPPT
|
|
ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
|
|
ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
|
|
ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
|
|
ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
|
|
ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"),
|
|
ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"),
|
|
ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
|
|
ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"),
|
|
# Global
|
|
ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"),
|
|
# Deadtime
|
|
ParamDef(0x60, "dt_0_3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
|
|
ParamDef(0x61, "dt_3_5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
|
|
ParamDef(0x62, "dt_5_10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
|
|
ParamDef(0x63, "dt_10_20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
|
|
ParamDef(0x64, "dt_20_30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
|
|
ParamDef(0x65, "dt_30_45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
|
|
]
|
|
|
|
PARAM_BY_ID: dict[int, ParamDef] = {p.id: p for p in PARAMS}
|
|
PARAM_BY_NAME: dict[str, ParamDef] = {p.name: p for p in PARAMS}
|
|
|
|
# Deadtime brackets — current thresholds in mA matching firmware
|
|
DT_BRACKETS = [
|
|
(0x60, "dt_0_3A", 0, 3000),
|
|
(0x61, "dt_3_5A", 3000, 5000),
|
|
(0x62, "dt_5_10A", 5000, 10000),
|
|
(0x63, "dt_10_20A", 10000, 20000),
|
|
(0x64, "dt_20_30A", 20000, 30000),
|
|
(0x65, "dt_30_45A", 30000, 45000),
|
|
]
|
|
|
|
|
|
# ── Frame building ───────────────────────────────────────────────────
|
|
|
|
def _build_frame(cmd: int, payload: bytes = b"") -> bytes:
|
|
header = bytes([SYNC_BYTE, cmd, len(payload)])
|
|
frame = header + payload
|
|
return frame + bytes([crc8(frame)])
|
|
|
|
|
|
def _build_param_write(param_id: int, ptype: int, value) -> bytes:
|
|
if ptype == PTYPE_FLOAT:
|
|
val_bytes = struct.pack("<f", float(value))
|
|
elif ptype == PTYPE_UINT16:
|
|
val_bytes = struct.pack("<HH", int(value), 0)
|
|
elif ptype == PTYPE_UINT8:
|
|
val_bytes = struct.pack("<Bxxx", int(value))
|
|
elif ptype == PTYPE_INT32:
|
|
val_bytes = struct.pack("<i", int(value))
|
|
else:
|
|
val_bytes = struct.pack("<I", int(value))
|
|
payload = struct.pack("<BBxx", param_id, ptype) + val_bytes
|
|
return _build_frame(CMD_PARAM_WRITE, payload)
|
|
|
|
|
|
def _decode_param_value(payload: bytes) -> Optional[tuple[int, float]]:
|
|
if len(payload) < 8:
|
|
return None
|
|
param_id, ptype = payload[0], payload[1]
|
|
vb = payload[4:8]
|
|
if ptype == PTYPE_FLOAT:
|
|
value = struct.unpack("<f", vb)[0]
|
|
elif ptype == PTYPE_UINT16:
|
|
value = float(struct.unpack("<H", vb[:2])[0])
|
|
elif ptype == PTYPE_UINT8:
|
|
value = float(vb[0])
|
|
elif ptype == PTYPE_INT32:
|
|
value = float(struct.unpack("<i", vb)[0])
|
|
else:
|
|
value = float(struct.unpack("<I", vb)[0])
|
|
return (param_id, value)
|
|
|
|
|
|
# ── Frame parser state machine ───────────────────────────────────────
|
|
|
|
class _FrameParser:
|
|
def __init__(self):
|
|
self.state = 0 # WAIT_SYNC
|
|
self.cmd = 0
|
|
self.length = 0
|
|
self.buf = bytearray()
|
|
self.payload = bytearray()
|
|
self.idx = 0
|
|
|
|
def feed(self, data: bytes):
|
|
for b in data:
|
|
if self.state == 0: # WAIT_SYNC
|
|
if b == SYNC_BYTE:
|
|
self.buf = bytearray([b])
|
|
self.state = 1
|
|
elif self.state == 1: # WAIT_CMD
|
|
self.cmd = b
|
|
self.buf.append(b)
|
|
self.state = 2
|
|
elif self.state == 2: # WAIT_LEN
|
|
self.length = b
|
|
self.buf.append(b)
|
|
self.payload = bytearray()
|
|
self.idx = 0
|
|
if b == 0:
|
|
self.state = 4
|
|
elif b > 128:
|
|
self.state = 0
|
|
else:
|
|
self.state = 3
|
|
elif self.state == 3: # WAIT_PAYLOAD
|
|
self.payload.append(b)
|
|
self.buf.append(b)
|
|
self.idx += 1
|
|
if self.idx >= self.length:
|
|
self.state = 4
|
|
elif self.state == 4: # WAIT_CRC
|
|
expected = crc8(bytes(self.buf))
|
|
self.state = 0
|
|
if b == expected:
|
|
yield (self.cmd, bytes(self.payload))
|
|
|
|
|
|
# ── STM32Link — synchronous serial interface ─────────────────────────
|
|
|
|
class STM32Link:
|
|
"""Blocking serial link to STM32 debug protocol.
|
|
|
|
Usage::
|
|
|
|
link = STM32Link("COM28")
|
|
link.ping()
|
|
t = link.read_telemetry()
|
|
print(f"Vin={t.vin_V:.1f}V Iout={t.iout_A:.1f}A EFF={t.efficiency:.1f}%")
|
|
link.write_param("dt_10_20A", 18)
|
|
link.close()
|
|
"""
|
|
|
|
def __init__(self, port: str, baudrate: int = 460800, timeout: float = 2.0):
|
|
self.ser = serial.Serial(port, baudrate, timeout=timeout)
|
|
self._parser = _FrameParser()
|
|
self._param_cache: dict[int, float] = {}
|
|
|
|
def close(self):
|
|
if self.ser and self.ser.is_open:
|
|
self.ser.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
self.close()
|
|
|
|
# ── Low-level ────────────────────────────────────────────────────
|
|
|
|
def _send(self, frame: bytes):
|
|
self.ser.write(frame)
|
|
|
|
def _recv_frames(self, timeout: float = 1.0) -> list[tuple[int, bytes]]:
|
|
"""Read available data and return decoded frames."""
|
|
frames = []
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
data = self.ser.read(self.ser.in_waiting or 1)
|
|
if data:
|
|
for cmd, payload in self._parser.feed(data):
|
|
frames.append((cmd, payload))
|
|
if frames:
|
|
# Drain any remaining data
|
|
time.sleep(0.02)
|
|
data = self.ser.read(self.ser.in_waiting)
|
|
if data:
|
|
for cmd, payload in self._parser.feed(data):
|
|
frames.append((cmd, payload))
|
|
break
|
|
return frames
|
|
|
|
def _wait_for(self, target_cmd: int, timeout: float = 2.0) -> Optional[bytes]:
|
|
"""Wait for a specific command response, processing others."""
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
break
|
|
data = self.ser.read(self.ser.in_waiting or 1)
|
|
if data:
|
|
for cmd, payload in self._parser.feed(data):
|
|
if cmd == target_cmd:
|
|
return payload
|
|
# Cache param values seen in passing
|
|
if cmd in (CMD_PARAM_VALUE, CMD_PARAM_WRITE_ACK):
|
|
result = _decode_param_value(payload)
|
|
if result:
|
|
self._param_cache[result[0]] = result[1]
|
|
# Cache telemetry too
|
|
if cmd == CMD_TELEMETRY:
|
|
self._last_telemetry = _decode_telemetry(payload)
|
|
return None
|
|
|
|
# ── Commands ─────────────────────────────────────────────────────
|
|
|
|
def ping(self, timeout: float = 2.0) -> bool:
|
|
"""Send PING, return True if PONG received."""
|
|
self._send(_build_frame(CMD_PING))
|
|
return self._wait_for(CMD_PONG, timeout) is not None
|
|
|
|
def read_telemetry(self, timeout: float = 2.0) -> Optional[Telemetry]:
|
|
"""Wait for next telemetry packet."""
|
|
payload = self._wait_for(CMD_TELEMETRY, timeout)
|
|
if payload:
|
|
return _decode_telemetry(payload)
|
|
return None
|
|
|
|
def read_telemetry_avg(self, n: int = 10, timeout: float = 5.0) -> Optional[Telemetry]:
|
|
"""Read n telemetry packets and return the average."""
|
|
samples: list[Telemetry] = []
|
|
deadline = time.monotonic() + timeout
|
|
while len(samples) < n and time.monotonic() < deadline:
|
|
t = self.read_telemetry(timeout=deadline - time.monotonic())
|
|
if t:
|
|
samples.append(t)
|
|
if not samples:
|
|
return None
|
|
# Average all float fields
|
|
avg = Telemetry()
|
|
for attr in ("vin", "vout", "iin", "iout", "vfly", "etemp",
|
|
"vfly_integral", "vfly_avg_debug", "cc_output_f",
|
|
"mppt_iref", "mppt_last_vin", "mppt_last_iin",
|
|
"p_in", "p_out"):
|
|
setattr(avg, attr, sum(getattr(s, attr) for s in samples) / len(samples))
|
|
avg.seq = samples[-1].seq
|
|
return avg
|
|
|
|
def request_all_params(self):
|
|
"""Request all parameter values from the STM32."""
|
|
self._send(_build_frame(CMD_PARAM_READ_ALL))
|
|
|
|
def read_all_params(self, timeout: float = 3.0) -> dict[str, float]:
|
|
"""Request and collect all parameter values."""
|
|
self._param_cache.clear()
|
|
self.request_all_params()
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
data = self.ser.read(self.ser.in_waiting or 1)
|
|
if data:
|
|
for cmd, payload in self._parser.feed(data):
|
|
if cmd == CMD_PARAM_VALUE:
|
|
result = _decode_param_value(payload)
|
|
if result:
|
|
self._param_cache[result[0]] = result[1]
|
|
time.sleep(0.05)
|
|
# Convert to name->value
|
|
return {
|
|
PARAM_BY_ID[pid].name: val
|
|
for pid, val in self._param_cache.items()
|
|
if pid in PARAM_BY_ID
|
|
}
|
|
|
|
def write_param(self, name: str, value: float, wait_ack: bool = True) -> bool:
|
|
"""Write a parameter by name. Returns True if ACK received."""
|
|
pdef = PARAM_BY_NAME.get(name)
|
|
if not pdef:
|
|
raise ValueError(f"Unknown parameter: {name!r}")
|
|
if value < pdef.min_val or value > pdef.max_val:
|
|
raise ValueError(
|
|
f"{name}: {value} out of range [{pdef.min_val}, {pdef.max_val}]"
|
|
)
|
|
frame = _build_param_write(pdef.id, pdef.ptype, value)
|
|
self._send(frame)
|
|
if wait_ack:
|
|
payload = self._wait_for(CMD_PARAM_WRITE_ACK, timeout=2.0)
|
|
if payload:
|
|
result = _decode_param_value(payload)
|
|
if result:
|
|
self._param_cache[result[0]] = result[1]
|
|
return True
|
|
return False
|
|
return True
|
|
|
|
def write_param_by_id(self, param_id: int, value: float) -> bool:
|
|
"""Write a parameter by ID."""
|
|
pdef = PARAM_BY_ID.get(param_id)
|
|
if not pdef:
|
|
raise ValueError(f"Unknown param ID: 0x{param_id:02X}")
|
|
return self.write_param(pdef.name, value)
|