Files
mppt-testbench/testbench/stm32_link.py
grabowski 1c41910c1e Add STM32 tuning integration and rewrite README with step-by-step guide
- stm32_link.py: synchronous serial interface to STM32 debug protocol
  (ping, telemetry read/avg, param read/write, frame parser)
- tuner.py: automated tuning combining HIOKI + STM32 measurements
  (param sweep, deadtime optimization, multi-point sweep, CSV/plot output)
- CLI commands: stm32-read, stm32-write, tune-param, tune-deadtime
- README: complete step-by-step guide covering setup, sweeps, analysis,
  tuning, shade profiles, debug console, and parameter reference

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 16:52:09 +07:00

441 lines
16 KiB
Python

"""Synchronous serial link to the STM32 debug protocol.
Provides blocking read/write of telemetry and parameters, suitable
for automated tuning scripts (not a TUI). Reuses the binary protocol
from code64/debug_console/protocol.py.
"""
from __future__ import annotations
import struct
import time
from dataclasses import dataclass, field
from typing import Optional
import serial
# ── Protocol constants ───────────────────────────────────────────────
SYNC_BYTE = 0xAA
CMD_TELEMETRY = 0x01
CMD_PARAM_WRITE = 0x02
CMD_PARAM_WRITE_ACK = 0x03
CMD_PARAM_READ_ALL = 0x04
CMD_PARAM_VALUE = 0x05
CMD_PING = 0x10
CMD_PONG = 0x11
CMD_ERROR_MSG = 0xE0
PTYPE_FLOAT = 0
PTYPE_UINT16 = 1
PTYPE_UINT8 = 2
PTYPE_INT32 = 3
# ── CRC8 (poly 0x07) ────────────────────────────────────────────────
_CRC8_TABLE = [0] * 256
def _init_crc8():
for i in range(256):
crc = i
for _ in range(8):
crc = ((crc << 1) ^ 0x07) & 0xFF if crc & 0x80 else (crc << 1) & 0xFF
_CRC8_TABLE[i] = crc
_init_crc8()
def crc8(data: bytes) -> int:
crc = 0x00
for b in data:
crc = _CRC8_TABLE[crc ^ b]
return crc
# ── Telemetry ────────────────────────────────────────────────────────
@dataclass
class Telemetry:
"""Decoded telemetry packet from the STM32."""
vin: float = 0.0 # mV
vout: float = 0.0 # mV
iin: float = 0.0 # mA (negative = into converter)
iout: float = 0.0 # mA
vfly: float = 0.0 # mV
etemp: float = 0.0 # °C
last_tmp: int = 0
VREF: int = 0
vfly_correction: int = 0
vfly_integral: float = 0.0
vfly_avg_debug: float = 0.0
cc_output_f: float = 0.0
mppt_iref: float = 0.0
mppt_last_vin: float = 0.0
mppt_last_iin: float = 0.0
p_in: float = 0.0
p_out: float = 0.0
seq: int = 0
timestamp: float = field(default_factory=time.time)
@property
def vin_V(self) -> float:
return self.vin / 1000.0
@property
def vout_V(self) -> float:
return self.vout / 1000.0
@property
def iin_A(self) -> float:
return self.iin / 1000.0
@property
def iout_A(self) -> float:
return self.iout / 1000.0
@property
def power_in_W(self) -> float:
return self.vin * (-self.iin) / 1e6
@property
def power_out_W(self) -> float:
return self.vout * self.iout / 1e6
@property
def efficiency(self) -> float:
p_in = self.power_in_W
return (self.power_out_W / p_in * 100.0) if p_in > 0.1 else 0.0
_TELEM_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes
_TELEM_SIZE = struct.calcsize(_TELEM_FMT)
def _decode_telemetry(payload: bytes) -> Optional[Telemetry]:
if len(payload) < _TELEM_SIZE:
return None
v = struct.unpack(_TELEM_FMT, payload[:_TELEM_SIZE])
return Telemetry(
vin=v[0], vout=v[1], iin=v[2], iout=v[3], vfly=v[4], etemp=v[5],
last_tmp=v[6], VREF=v[7], vfly_correction=v[8],
vfly_integral=v[10], vfly_avg_debug=v[11],
cc_output_f=v[12], mppt_iref=v[13],
mppt_last_vin=v[14], mppt_last_iin=v[15],
p_in=v[16], p_out=v[17], seq=v[18],
)
# ── Parameter definitions ────────────────────────────────────────────
@dataclass
class ParamDef:
id: int
name: str
ptype: int
group: str
min_val: float = -1e9
max_val: float = 1e9
fmt: str = ".4f"
PARAMS = [
# Compensator
ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"),
# Vfly
ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"),
ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"),
ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"),
ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"),
ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"),
# CC
ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"),
ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"),
ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"),
ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"),
ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"),
ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"),
# MPPT
ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"),
ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"),
ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"),
# Global
ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"),
# Deadtime
ParamDef(0x60, "dt_0_3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x61, "dt_3_5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x62, "dt_5_10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x63, "dt_10_20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x64, "dt_20_30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x65, "dt_30_45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
]
PARAM_BY_ID: dict[int, ParamDef] = {p.id: p for p in PARAMS}
PARAM_BY_NAME: dict[str, ParamDef] = {p.name: p for p in PARAMS}
# Deadtime brackets — current thresholds in mA matching firmware
DT_BRACKETS = [
(0x60, "dt_0_3A", 0, 3000),
(0x61, "dt_3_5A", 3000, 5000),
(0x62, "dt_5_10A", 5000, 10000),
(0x63, "dt_10_20A", 10000, 20000),
(0x64, "dt_20_30A", 20000, 30000),
(0x65, "dt_30_45A", 30000, 45000),
]
# ── Frame building ───────────────────────────────────────────────────
def _build_frame(cmd: int, payload: bytes = b"") -> bytes:
header = bytes([SYNC_BYTE, cmd, len(payload)])
frame = header + payload
return frame + bytes([crc8(frame)])
def _build_param_write(param_id: int, ptype: int, value) -> bytes:
if ptype == PTYPE_FLOAT:
val_bytes = struct.pack("<f", float(value))
elif ptype == PTYPE_UINT16:
val_bytes = struct.pack("<HH", int(value), 0)
elif ptype == PTYPE_UINT8:
val_bytes = struct.pack("<Bxxx", int(value))
elif ptype == PTYPE_INT32:
val_bytes = struct.pack("<i", int(value))
else:
val_bytes = struct.pack("<I", int(value))
payload = struct.pack("<BBxx", param_id, ptype) + val_bytes
return _build_frame(CMD_PARAM_WRITE, payload)
def _decode_param_value(payload: bytes) -> Optional[tuple[int, float]]:
if len(payload) < 8:
return None
param_id, ptype = payload[0], payload[1]
vb = payload[4:8]
if ptype == PTYPE_FLOAT:
value = struct.unpack("<f", vb)[0]
elif ptype == PTYPE_UINT16:
value = float(struct.unpack("<H", vb[:2])[0])
elif ptype == PTYPE_UINT8:
value = float(vb[0])
elif ptype == PTYPE_INT32:
value = float(struct.unpack("<i", vb)[0])
else:
value = float(struct.unpack("<I", vb)[0])
return (param_id, value)
# ── Frame parser state machine ───────────────────────────────────────
class _FrameParser:
def __init__(self):
self.state = 0 # WAIT_SYNC
self.cmd = 0
self.length = 0
self.buf = bytearray()
self.payload = bytearray()
self.idx = 0
def feed(self, data: bytes):
for b in data:
if self.state == 0: # WAIT_SYNC
if b == SYNC_BYTE:
self.buf = bytearray([b])
self.state = 1
elif self.state == 1: # WAIT_CMD
self.cmd = b
self.buf.append(b)
self.state = 2
elif self.state == 2: # WAIT_LEN
self.length = b
self.buf.append(b)
self.payload = bytearray()
self.idx = 0
if b == 0:
self.state = 4
elif b > 128:
self.state = 0
else:
self.state = 3
elif self.state == 3: # WAIT_PAYLOAD
self.payload.append(b)
self.buf.append(b)
self.idx += 1
if self.idx >= self.length:
self.state = 4
elif self.state == 4: # WAIT_CRC
expected = crc8(bytes(self.buf))
self.state = 0
if b == expected:
yield (self.cmd, bytes(self.payload))
# ── STM32Link — synchronous serial interface ─────────────────────────
class STM32Link:
"""Blocking serial link to STM32 debug protocol.
Usage::
link = STM32Link("COM28")
link.ping()
t = link.read_telemetry()
print(f"Vin={t.vin_V:.1f}V Iout={t.iout_A:.1f}A EFF={t.efficiency:.1f}%")
link.write_param("dt_10_20A", 18)
link.close()
"""
def __init__(self, port: str, baudrate: int = 460800, timeout: float = 2.0):
self.ser = serial.Serial(port, baudrate, timeout=timeout)
self._parser = _FrameParser()
self._param_cache: dict[int, float] = {}
def close(self):
if self.ser and self.ser.is_open:
self.ser.close()
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
# ── Low-level ────────────────────────────────────────────────────
def _send(self, frame: bytes):
self.ser.write(frame)
def _recv_frames(self, timeout: float = 1.0) -> list[tuple[int, bytes]]:
"""Read available data and return decoded frames."""
frames = []
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
data = self.ser.read(self.ser.in_waiting or 1)
if data:
for cmd, payload in self._parser.feed(data):
frames.append((cmd, payload))
if frames:
# Drain any remaining data
time.sleep(0.02)
data = self.ser.read(self.ser.in_waiting)
if data:
for cmd, payload in self._parser.feed(data):
frames.append((cmd, payload))
break
return frames
def _wait_for(self, target_cmd: int, timeout: float = 2.0) -> Optional[bytes]:
"""Wait for a specific command response, processing others."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
data = self.ser.read(self.ser.in_waiting or 1)
if data:
for cmd, payload in self._parser.feed(data):
if cmd == target_cmd:
return payload
# Cache param values seen in passing
if cmd in (CMD_PARAM_VALUE, CMD_PARAM_WRITE_ACK):
result = _decode_param_value(payload)
if result:
self._param_cache[result[0]] = result[1]
# Cache telemetry too
if cmd == CMD_TELEMETRY:
self._last_telemetry = _decode_telemetry(payload)
return None
# ── Commands ─────────────────────────────────────────────────────
def ping(self, timeout: float = 2.0) -> bool:
"""Send PING, return True if PONG received."""
self._send(_build_frame(CMD_PING))
return self._wait_for(CMD_PONG, timeout) is not None
def read_telemetry(self, timeout: float = 2.0) -> Optional[Telemetry]:
"""Wait for next telemetry packet."""
payload = self._wait_for(CMD_TELEMETRY, timeout)
if payload:
return _decode_telemetry(payload)
return None
def read_telemetry_avg(self, n: int = 10, timeout: float = 5.0) -> Optional[Telemetry]:
"""Read n telemetry packets and return the average."""
samples: list[Telemetry] = []
deadline = time.monotonic() + timeout
while len(samples) < n and time.monotonic() < deadline:
t = self.read_telemetry(timeout=deadline - time.monotonic())
if t:
samples.append(t)
if not samples:
return None
# Average all float fields
avg = Telemetry()
for attr in ("vin", "vout", "iin", "iout", "vfly", "etemp",
"vfly_integral", "vfly_avg_debug", "cc_output_f",
"mppt_iref", "mppt_last_vin", "mppt_last_iin",
"p_in", "p_out"):
setattr(avg, attr, sum(getattr(s, attr) for s in samples) / len(samples))
avg.seq = samples[-1].seq
return avg
def request_all_params(self):
"""Request all parameter values from the STM32."""
self._send(_build_frame(CMD_PARAM_READ_ALL))
def read_all_params(self, timeout: float = 3.0) -> dict[str, float]:
"""Request and collect all parameter values."""
self._param_cache.clear()
self.request_all_params()
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
data = self.ser.read(self.ser.in_waiting or 1)
if data:
for cmd, payload in self._parser.feed(data):
if cmd == CMD_PARAM_VALUE:
result = _decode_param_value(payload)
if result:
self._param_cache[result[0]] = result[1]
time.sleep(0.05)
# Convert to name->value
return {
PARAM_BY_ID[pid].name: val
for pid, val in self._param_cache.items()
if pid in PARAM_BY_ID
}
def write_param(self, name: str, value: float, wait_ack: bool = True) -> bool:
"""Write a parameter by name. Returns True if ACK received."""
pdef = PARAM_BY_NAME.get(name)
if not pdef:
raise ValueError(f"Unknown parameter: {name!r}")
if value < pdef.min_val or value > pdef.max_val:
raise ValueError(
f"{name}: {value} out of range [{pdef.min_val}, {pdef.max_val}]"
)
frame = _build_param_write(pdef.id, pdef.ptype, value)
self._send(frame)
if wait_ack:
payload = self._wait_for(CMD_PARAM_WRITE_ACK, timeout=2.0)
if payload:
result = _decode_param_value(payload)
if result:
self._param_cache[result[0]] = result[1]
return True
return False
return True
def write_param_by_id(self, param_id: int, value: float) -> bool:
"""Write a parameter by ID."""
pdef = PARAM_BY_ID.get(param_id)
if not pdef:
raise ValueError(f"Unknown param ID: 0x{param_id:02X}")
return self.write_param(pdef.name, value)