Add STM32 tuning integration and rewrite README with step-by-step guide

- stm32_link.py: synchronous serial interface to STM32 debug protocol
  (ping, telemetry read/avg, param read/write, frame parser)
- tuner.py: automated tuning combining HIOKI + STM32 measurements
  (param sweep, deadtime optimization, multi-point sweep, CSV/plot output)
- CLI commands: stm32-read, stm32-write, tune-param, tune-deadtime
- README: complete step-by-step guide covering setup, sweeps, analysis,
  tuning, shade profiles, debug console, and parameter reference

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 16:52:09 +07:00
parent e7a23a3c7e
commit 1c41910c1e
4 changed files with 1297 additions and 95 deletions

View File

@@ -550,6 +550,124 @@ def cmd_safe_off(bench: MPPTTestbench, _args: argparse.Namespace) -> None:
print("Done.")
# ── Tuning commands (instruments + STM32) ────────────────────────────
def cmd_stm32_read(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Read all STM32 parameters and telemetry."""
from testbench.stm32_link import STM32Link
with STM32Link(args.stm32_port, args.stm32_baud) as link:
if not link.ping():
print("STM32 not responding to ping!")
return
print("STM32 connected.\n")
# Read params
params = link.read_all_params()
if params:
print("Parameters:")
for name, val in sorted(params.items()):
print(f" {name:<20s} = {val}")
print()
# Read telemetry
t = link.read_telemetry_avg(n=20)
if t:
print("Telemetry (20-sample avg):")
print(f" Vin = {t.vin_V:8.2f} V")
print(f" Vout = {t.vout_V:8.2f} V")
print(f" Iin = {t.iin_A:8.2f} A")
print(f" Iout = {t.iout_A:8.2f} A")
print(f" Pin = {t.power_in_W:8.2f} W")
print(f" Pout = {t.power_out_W:8.2f} W")
print(f" EFF = {t.efficiency:8.1f} %")
print(f" Vfly = {t.vfly/1000:8.2f} V")
print(f" Temp = {t.etemp:8.1f} °C")
def cmd_stm32_write(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Write a parameter to the STM32."""
from testbench.stm32_link import STM32Link
with STM32Link(args.stm32_port, args.stm32_baud) as link:
if not link.ping():
print("STM32 not responding to ping!")
return
ack = link.write_param(args.param, args.value)
print(f"{args.param} = {args.value} {'ACK' if ack else 'NO ACK'}")
def cmd_tune_param(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Sweep an STM32 parameter while measuring efficiency."""
from testbench.stm32_link import STM32Link
from testbench.tuner import Tuner
with STM32Link(args.stm32_port, args.stm32_baud) as link:
if not link.ping():
print("STM32 not responding!")
return
tuner = Tuner(bench, link, settle_time=args.settle)
results = tuner.sweep_param(
param_name=args.param,
start=args.start,
stop=args.stop,
step=args.step,
voltage=args.voltage,
current_limit=args.current_limit,
load_mode=args.load_mode,
load_value=args.load_value,
settle_time=args.settle,
)
tuner.print_results(results)
if args.output:
tuner.write_csv(results, args.output)
if not args.no_plot and results:
tuner.plot_sweep(results, show=True)
def cmd_tune_deadtime(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Optimize deadtime for each current bracket."""
from testbench.stm32_link import STM32Link
from testbench.tuner import Tuner
with STM32Link(args.stm32_port, args.stm32_baud) as link:
if not link.ping():
print("STM32 not responding!")
return
tuner = Tuner(bench, link, settle_time=args.settle)
load_values = None
if args.load_values:
load_values = [float(x) for x in args.load_values.split(",")]
results = tuner.tune_deadtime(
dt_start=args.dt_start,
dt_stop=args.dt_stop,
dt_step=args.dt_step,
voltage=args.voltage,
current_limit=args.current_limit,
load_mode=args.load_mode,
load_values=load_values,
settle_time=args.settle,
)
if args.apply:
tuner.apply_best_deadtimes(results)
if args.output:
all_pts = []
for pts in results.values():
all_pts.extend(pts)
tuner.write_csv(all_pts, args.output)
# ── Offline plot command (no instruments needed) ─────────────────────
@@ -726,6 +844,9 @@ examples:
%(prog)s safe-off
%(prog)s plot-sweep sweep_vi_20260312_151212.csv
%(prog)s plot-sweep sweep_vi_20260312_151212.csv --no-show -o plots/
%(prog)s stm32-read --stm32-port COM28
%(prog)s tune-param --stm32-port COM28 --param dt_10_20A --start 14 --stop 40 --step 1 --voltage 60 --current-limit 20 --load-mode CP --load-value 300
%(prog)s tune-deadtime --stm32-port COM28 --voltage 60 --current-limit 20 --load-mode CP --apply
""",
)
@@ -750,6 +871,14 @@ examples:
"--timeout", type=int, default=5000,
help="VISA timeout in ms (default: 5000)",
)
parser.add_argument(
"--stm32-port", default="COM28",
help="STM32 debug serial port (default: COM28)",
)
parser.add_argument(
"--stm32-baud", type=int, default=460800,
help="STM32 debug baud rate (default: 460800)",
)
sub = parser.add_subparsers(dest="command", required=True)
@@ -845,6 +974,41 @@ examples:
# safe-off
sub.add_parser("safe-off", help="Emergency: turn off load and supply")
# stm32-read
sub.add_parser("stm32-read", help="Read STM32 parameters and telemetry")
# stm32-write
p_sw = sub.add_parser("stm32-write", help="Write a parameter to the STM32")
p_sw.add_argument("--param", required=True, help="Parameter name")
p_sw.add_argument("--value", type=float, required=True, help="Value to write")
# tune-param
p_tp = sub.add_parser("tune-param", help="Sweep an STM32 parameter while measuring efficiency")
p_tp.add_argument("--param", required=True, help="Parameter name (e.g. dt_10_20A, vfly_kp)")
p_tp.add_argument("--start", type=float, required=True, help="Start value")
p_tp.add_argument("--stop", type=float, required=True, help="Stop value")
p_tp.add_argument("--step", type=float, required=True, help="Step size")
p_tp.add_argument("--voltage", type=float, required=True, help="Supply voltage (V)")
p_tp.add_argument("--current-limit", type=float, required=True, help="Supply current limit (A)")
p_tp.add_argument("--load-mode", choices=["CC", "CP"], default="CP", help="Load mode")
p_tp.add_argument("--load-value", type=float, default=200.0, help="Load setpoint (A or W)")
p_tp.add_argument("--settle", type=float, default=3.0, help="Settle time per step (s)")
p_tp.add_argument("--no-plot", action="store_true", help="Skip plot")
p_tp.add_argument("-o", "--output", help="CSV output file")
# tune-deadtime
p_td = sub.add_parser("tune-deadtime", help="Optimize deadtime for each current bracket")
p_td.add_argument("--dt-start", type=int, default=14, help="Min deadtime ticks (default: 14)")
p_td.add_argument("--dt-stop", type=int, default=50, help="Max deadtime ticks (default: 50)")
p_td.add_argument("--dt-step", type=int, default=1, help="Deadtime step (default: 1)")
p_td.add_argument("--voltage", type=float, default=60.0, help="Supply voltage (V)")
p_td.add_argument("--current-limit", type=float, default=20.0, help="Supply current limit (A)")
p_td.add_argument("--load-mode", choices=["CC", "CP"], default="CP", help="Load mode")
p_td.add_argument("--load-values", help="Comma-separated load values per bracket (e.g. 20,50,100,250,400,600)")
p_td.add_argument("--settle", type=float, default=3.0, help="Settle time per step (s)")
p_td.add_argument("--apply", action="store_true", help="Apply best deadtimes after sweep")
p_td.add_argument("-o", "--output", help="CSV output file")
# plot-sweep (offline, no instruments)
p_plot = sub.add_parser("plot-sweep", help="Plot efficiency analysis from sweep CSV (no instruments needed)")
p_plot.add_argument("csv", help="Sweep CSV file to plot")
@@ -872,6 +1036,10 @@ examples:
"supply": cmd_supply,
"load": cmd_load,
"safe-off": cmd_safe_off,
"stm32-read": cmd_stm32_read,
"stm32-write": cmd_stm32_write,
"tune-param": cmd_tune_param,
"tune-deadtime": cmd_tune_deadtime,
}
bench = connect_bench(args)

440
testbench/stm32_link.py Normal file
View File

@@ -0,0 +1,440 @@
"""Synchronous serial link to the STM32 debug protocol.
Provides blocking read/write of telemetry and parameters, suitable
for automated tuning scripts (not a TUI). Reuses the binary protocol
from code64/debug_console/protocol.py.
"""
from __future__ import annotations
import struct
import time
from dataclasses import dataclass, field
from typing import Optional
import serial
# ── Protocol constants ───────────────────────────────────────────────
SYNC_BYTE = 0xAA
CMD_TELEMETRY = 0x01
CMD_PARAM_WRITE = 0x02
CMD_PARAM_WRITE_ACK = 0x03
CMD_PARAM_READ_ALL = 0x04
CMD_PARAM_VALUE = 0x05
CMD_PING = 0x10
CMD_PONG = 0x11
CMD_ERROR_MSG = 0xE0
PTYPE_FLOAT = 0
PTYPE_UINT16 = 1
PTYPE_UINT8 = 2
PTYPE_INT32 = 3
# ── CRC8 (poly 0x07) ────────────────────────────────────────────────
_CRC8_TABLE = [0] * 256
def _init_crc8():
for i in range(256):
crc = i
for _ in range(8):
crc = ((crc << 1) ^ 0x07) & 0xFF if crc & 0x80 else (crc << 1) & 0xFF
_CRC8_TABLE[i] = crc
_init_crc8()
def crc8(data: bytes) -> int:
crc = 0x00
for b in data:
crc = _CRC8_TABLE[crc ^ b]
return crc
# ── Telemetry ────────────────────────────────────────────────────────
@dataclass
class Telemetry:
"""Decoded telemetry packet from the STM32."""
vin: float = 0.0 # mV
vout: float = 0.0 # mV
iin: float = 0.0 # mA (negative = into converter)
iout: float = 0.0 # mA
vfly: float = 0.0 # mV
etemp: float = 0.0 # °C
last_tmp: int = 0
VREF: int = 0
vfly_correction: int = 0
vfly_integral: float = 0.0
vfly_avg_debug: float = 0.0
cc_output_f: float = 0.0
mppt_iref: float = 0.0
mppt_last_vin: float = 0.0
mppt_last_iin: float = 0.0
p_in: float = 0.0
p_out: float = 0.0
seq: int = 0
timestamp: float = field(default_factory=time.time)
@property
def vin_V(self) -> float:
return self.vin / 1000.0
@property
def vout_V(self) -> float:
return self.vout / 1000.0
@property
def iin_A(self) -> float:
return self.iin / 1000.0
@property
def iout_A(self) -> float:
return self.iout / 1000.0
@property
def power_in_W(self) -> float:
return self.vin * (-self.iin) / 1e6
@property
def power_out_W(self) -> float:
return self.vout * self.iout / 1e6
@property
def efficiency(self) -> float:
p_in = self.power_in_W
return (self.power_out_W / p_in * 100.0) if p_in > 0.1 else 0.0
_TELEM_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes
_TELEM_SIZE = struct.calcsize(_TELEM_FMT)
def _decode_telemetry(payload: bytes) -> Optional[Telemetry]:
if len(payload) < _TELEM_SIZE:
return None
v = struct.unpack(_TELEM_FMT, payload[:_TELEM_SIZE])
return Telemetry(
vin=v[0], vout=v[1], iin=v[2], iout=v[3], vfly=v[4], etemp=v[5],
last_tmp=v[6], VREF=v[7], vfly_correction=v[8],
vfly_integral=v[10], vfly_avg_debug=v[11],
cc_output_f=v[12], mppt_iref=v[13],
mppt_last_vin=v[14], mppt_last_iin=v[15],
p_in=v[16], p_out=v[17], seq=v[18],
)
# ── Parameter definitions ────────────────────────────────────────────
@dataclass
class ParamDef:
id: int
name: str
ptype: int
group: str
min_val: float = -1e9
max_val: float = 1e9
fmt: str = ".4f"
PARAMS = [
# Compensator
ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"),
# Vfly
ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"),
ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"),
ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"),
ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"),
ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"),
# CC
ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"),
ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"),
ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"),
ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"),
ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"),
ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"),
# MPPT
ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"),
ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"),
ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"),
ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"),
ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"),
# Global
ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"),
# Deadtime
ParamDef(0x60, "dt_0_3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x61, "dt_3_5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x62, "dt_5_10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x63, "dt_10_20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x64, "dt_20_30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
ParamDef(0x65, "dt_30_45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"),
]
PARAM_BY_ID: dict[int, ParamDef] = {p.id: p for p in PARAMS}
PARAM_BY_NAME: dict[str, ParamDef] = {p.name: p for p in PARAMS}
# Deadtime brackets — current thresholds in mA matching firmware
DT_BRACKETS = [
(0x60, "dt_0_3A", 0, 3000),
(0x61, "dt_3_5A", 3000, 5000),
(0x62, "dt_5_10A", 5000, 10000),
(0x63, "dt_10_20A", 10000, 20000),
(0x64, "dt_20_30A", 20000, 30000),
(0x65, "dt_30_45A", 30000, 45000),
]
# ── Frame building ───────────────────────────────────────────────────
def _build_frame(cmd: int, payload: bytes = b"") -> bytes:
header = bytes([SYNC_BYTE, cmd, len(payload)])
frame = header + payload
return frame + bytes([crc8(frame)])
def _build_param_write(param_id: int, ptype: int, value) -> bytes:
if ptype == PTYPE_FLOAT:
val_bytes = struct.pack("<f", float(value))
elif ptype == PTYPE_UINT16:
val_bytes = struct.pack("<HH", int(value), 0)
elif ptype == PTYPE_UINT8:
val_bytes = struct.pack("<Bxxx", int(value))
elif ptype == PTYPE_INT32:
val_bytes = struct.pack("<i", int(value))
else:
val_bytes = struct.pack("<I", int(value))
payload = struct.pack("<BBxx", param_id, ptype) + val_bytes
return _build_frame(CMD_PARAM_WRITE, payload)
def _decode_param_value(payload: bytes) -> Optional[tuple[int, float]]:
if len(payload) < 8:
return None
param_id, ptype = payload[0], payload[1]
vb = payload[4:8]
if ptype == PTYPE_FLOAT:
value = struct.unpack("<f", vb)[0]
elif ptype == PTYPE_UINT16:
value = float(struct.unpack("<H", vb[:2])[0])
elif ptype == PTYPE_UINT8:
value = float(vb[0])
elif ptype == PTYPE_INT32:
value = float(struct.unpack("<i", vb)[0])
else:
value = float(struct.unpack("<I", vb)[0])
return (param_id, value)
# ── Frame parser state machine ───────────────────────────────────────
class _FrameParser:
def __init__(self):
self.state = 0 # WAIT_SYNC
self.cmd = 0
self.length = 0
self.buf = bytearray()
self.payload = bytearray()
self.idx = 0
def feed(self, data: bytes):
for b in data:
if self.state == 0: # WAIT_SYNC
if b == SYNC_BYTE:
self.buf = bytearray([b])
self.state = 1
elif self.state == 1: # WAIT_CMD
self.cmd = b
self.buf.append(b)
self.state = 2
elif self.state == 2: # WAIT_LEN
self.length = b
self.buf.append(b)
self.payload = bytearray()
self.idx = 0
if b == 0:
self.state = 4
elif b > 128:
self.state = 0
else:
self.state = 3
elif self.state == 3: # WAIT_PAYLOAD
self.payload.append(b)
self.buf.append(b)
self.idx += 1
if self.idx >= self.length:
self.state = 4
elif self.state == 4: # WAIT_CRC
expected = crc8(bytes(self.buf))
self.state = 0
if b == expected:
yield (self.cmd, bytes(self.payload))
# ── STM32Link — synchronous serial interface ─────────────────────────
class STM32Link:
"""Blocking serial link to STM32 debug protocol.
Usage::
link = STM32Link("COM28")
link.ping()
t = link.read_telemetry()
print(f"Vin={t.vin_V:.1f}V Iout={t.iout_A:.1f}A EFF={t.efficiency:.1f}%")
link.write_param("dt_10_20A", 18)
link.close()
"""
def __init__(self, port: str, baudrate: int = 460800, timeout: float = 2.0):
self.ser = serial.Serial(port, baudrate, timeout=timeout)
self._parser = _FrameParser()
self._param_cache: dict[int, float] = {}
def close(self):
if self.ser and self.ser.is_open:
self.ser.close()
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
# ── Low-level ────────────────────────────────────────────────────
def _send(self, frame: bytes):
self.ser.write(frame)
def _recv_frames(self, timeout: float = 1.0) -> list[tuple[int, bytes]]:
"""Read available data and return decoded frames."""
frames = []
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
data = self.ser.read(self.ser.in_waiting or 1)
if data:
for cmd, payload in self._parser.feed(data):
frames.append((cmd, payload))
if frames:
# Drain any remaining data
time.sleep(0.02)
data = self.ser.read(self.ser.in_waiting)
if data:
for cmd, payload in self._parser.feed(data):
frames.append((cmd, payload))
break
return frames
def _wait_for(self, target_cmd: int, timeout: float = 2.0) -> Optional[bytes]:
"""Wait for a specific command response, processing others."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
data = self.ser.read(self.ser.in_waiting or 1)
if data:
for cmd, payload in self._parser.feed(data):
if cmd == target_cmd:
return payload
# Cache param values seen in passing
if cmd in (CMD_PARAM_VALUE, CMD_PARAM_WRITE_ACK):
result = _decode_param_value(payload)
if result:
self._param_cache[result[0]] = result[1]
# Cache telemetry too
if cmd == CMD_TELEMETRY:
self._last_telemetry = _decode_telemetry(payload)
return None
# ── Commands ─────────────────────────────────────────────────────
def ping(self, timeout: float = 2.0) -> bool:
"""Send PING, return True if PONG received."""
self._send(_build_frame(CMD_PING))
return self._wait_for(CMD_PONG, timeout) is not None
def read_telemetry(self, timeout: float = 2.0) -> Optional[Telemetry]:
"""Wait for next telemetry packet."""
payload = self._wait_for(CMD_TELEMETRY, timeout)
if payload:
return _decode_telemetry(payload)
return None
def read_telemetry_avg(self, n: int = 10, timeout: float = 5.0) -> Optional[Telemetry]:
"""Read n telemetry packets and return the average."""
samples: list[Telemetry] = []
deadline = time.monotonic() + timeout
while len(samples) < n and time.monotonic() < deadline:
t = self.read_telemetry(timeout=deadline - time.monotonic())
if t:
samples.append(t)
if not samples:
return None
# Average all float fields
avg = Telemetry()
for attr in ("vin", "vout", "iin", "iout", "vfly", "etemp",
"vfly_integral", "vfly_avg_debug", "cc_output_f",
"mppt_iref", "mppt_last_vin", "mppt_last_iin",
"p_in", "p_out"):
setattr(avg, attr, sum(getattr(s, attr) for s in samples) / len(samples))
avg.seq = samples[-1].seq
return avg
def request_all_params(self):
"""Request all parameter values from the STM32."""
self._send(_build_frame(CMD_PARAM_READ_ALL))
def read_all_params(self, timeout: float = 3.0) -> dict[str, float]:
"""Request and collect all parameter values."""
self._param_cache.clear()
self.request_all_params()
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
data = self.ser.read(self.ser.in_waiting or 1)
if data:
for cmd, payload in self._parser.feed(data):
if cmd == CMD_PARAM_VALUE:
result = _decode_param_value(payload)
if result:
self._param_cache[result[0]] = result[1]
time.sleep(0.05)
# Convert to name->value
return {
PARAM_BY_ID[pid].name: val
for pid, val in self._param_cache.items()
if pid in PARAM_BY_ID
}
def write_param(self, name: str, value: float, wait_ack: bool = True) -> bool:
"""Write a parameter by name. Returns True if ACK received."""
pdef = PARAM_BY_NAME.get(name)
if not pdef:
raise ValueError(f"Unknown parameter: {name!r}")
if value < pdef.min_val or value > pdef.max_val:
raise ValueError(
f"{name}: {value} out of range [{pdef.min_val}, {pdef.max_val}]"
)
frame = _build_param_write(pdef.id, pdef.ptype, value)
self._send(frame)
if wait_ack:
payload = self._wait_for(CMD_PARAM_WRITE_ACK, timeout=2.0)
if payload:
result = _decode_param_value(payload)
if result:
self._param_cache[result[0]] = result[1]
return True
return False
return True
def write_param_by_id(self, param_id: int, value: float) -> bool:
"""Write a parameter by ID."""
pdef = PARAM_BY_ID.get(param_id)
if not pdef:
raise ValueError(f"Unknown param ID: 0x{param_id:02X}")
return self.write_param(pdef.name, value)

443
testbench/tuner.py Normal file
View File

@@ -0,0 +1,443 @@
"""Automated tuning routines combining testbench instruments + STM32 link.
Uses the power analyzer (HIOKI) as ground truth for efficiency while
adjusting converter parameters via the STM32 debug protocol.
"""
from __future__ import annotations
import csv
import time
from dataclasses import dataclass, field
from pathlib import Path
from testbench.bench import MPPTTestbench, IDLE_VOLTAGE
from testbench.stm32_link import (
STM32Link, Telemetry, PARAM_BY_NAME, DT_BRACKETS,
)
@dataclass
class TunePoint:
"""One measurement during a tuning sweep."""
param_name: str
param_value: float
voltage_set: float
load_setpoint: float
load_mode: str
# HIOKI measurements (ground truth)
meter_pin: float = 0.0
meter_pout: float = 0.0
meter_eff: float = 0.0
# STM32 telemetry
stm_vin: float = 0.0
stm_vout: float = 0.0
stm_iin: float = 0.0
stm_iout: float = 0.0
stm_eff: float = 0.0
stm_vfly: float = 0.0
stm_etemp: float = 0.0
timestamp: float = field(default_factory=time.time)
class Tuner:
"""Combines MPPTTestbench + STM32Link for automated tuning.
Usage::
tuner = Tuner(bench, link)
results = tuner.sweep_param(
"dt_10_20A", start=14, stop=40, step=1,
voltage=60.0, current_limit=20.0,
load_mode="CP", load_value=300.0,
)
tuner.print_results(results)
"""
def __init__(
self,
bench: MPPTTestbench,
link: STM32Link,
settle_time: float = 3.0,
stm_avg_samples: int = 10,
):
self.bench = bench
self.link = link
self.settle_time = settle_time
self.stm_avg_samples = stm_avg_samples
def _measure(
self,
param_name: str,
param_value: float,
voltage: float,
load_value: float,
load_mode: str,
) -> TunePoint:
"""Take one combined measurement from HIOKI + STM32."""
point = TunePoint(
param_name=param_name,
param_value=param_value,
voltage_set=voltage,
load_setpoint=load_value,
load_mode=load_mode,
)
# HIOKI measurement
meter_vals = self.bench._wait_meter_ready(max_retries=10, retry_delay=1.0)
point.meter_pin = meter_vals.get("P5", 0.0)
point.meter_pout = meter_vals.get("P6", 0.0)
point.meter_eff = meter_vals.get("EFF1", 0.0)
# STM32 telemetry (averaged)
t = self.link.read_telemetry_avg(n=self.stm_avg_samples)
if t:
point.stm_vin = t.vin_V
point.stm_vout = t.vout_V
point.stm_iin = t.iin_A
point.stm_iout = t.iout_A
point.stm_eff = t.efficiency
point.stm_vfly = t.vfly / 1000.0
point.stm_etemp = t.etemp
return point
# ── Parameter sweep ──────────────────────────────────────────────
def sweep_param(
self,
param_name: str,
start: float,
stop: float,
step: float,
voltage: float,
current_limit: float,
load_mode: str = "CC",
load_value: float = 5.0,
settle_time: float | None = None,
) -> list[TunePoint]:
"""Sweep a single STM32 parameter while measuring efficiency.
Sets up the testbench at the given operating point, then steps
the parameter from start to stop, measuring at each step.
Returns list of TunePoints with both HIOKI and STM32 data.
"""
if param_name not in PARAM_BY_NAME:
raise ValueError(f"Unknown parameter: {param_name!r}")
settle = settle_time or self.settle_time
if step == 0:
raise ValueError("step cannot be zero")
if start > stop and step > 0:
step = -step
elif start < stop and step < 0:
step = -step
# Count steps
n_steps = int(abs(stop - start) / abs(step)) + 1
unit = "A" if load_mode == "CC" else "W"
print(f"Parameter sweep: {param_name} = {start}{stop} (step {step})")
print(f" Operating point: V={voltage:.1f}V, {load_mode}={load_value:.1f}{unit}")
print(f" {n_steps} points, settle={settle:.1f}s")
print()
# Set up testbench
self.bench.supply.set_current(current_limit)
self.bench.supply.set_voltage(voltage)
self.bench.supply.output_on()
self.bench.load.set_mode(load_mode)
self.bench._apply_load_value(load_mode, load_value)
self.bench.load.load_on()
# Initial settle
print(" Settling...")
time.sleep(settle * 2)
results: list[TunePoint] = []
val = start
n = 0
try:
while True:
if step > 0 and val > stop + step / 2:
break
if step < 0 and val < stop + step / 2:
break
# Write parameter
ack = self.link.write_param(param_name, val)
if not ack:
print(f" WARNING: No ACK for {param_name}={val}")
time.sleep(settle)
# Measure
point = self._measure(param_name, val, voltage, load_value, load_mode)
results.append(point)
n += 1
print(
f" [{n:>3d}/{n_steps}] {param_name}={val:>6.1f} "
f"HIOKI: Pin={point.meter_pin:7.1f}W Pout={point.meter_pout:7.1f}W "
f"EFF={point.meter_eff:5.2f}% "
f"STM32: EFF={point.stm_eff:5.1f}% T={point.stm_etemp:.0f}°C"
)
val += step
finally:
self.bench.load.load_off()
self.bench.supply.set_voltage(IDLE_VOLTAGE)
print(f"\n Load OFF. Supply at {IDLE_VOLTAGE:.0f}V.")
return results
# ── Deadtime optimization ────────────────────────────────────────
def tune_deadtime(
self,
dt_start: int = 14,
dt_stop: int = 50,
dt_step: int = 1,
voltage: float = 60.0,
current_limit: float = 20.0,
load_mode: str = "CP",
load_values: list[float] | None = None,
settle_time: float | None = None,
) -> dict[str, list[TunePoint]]:
"""Optimize deadtime for each current bracket.
For each deadtime bracket, sets a load that puts the converter
in that current range, then sweeps deadtime values to find the
optimum.
Args:
load_values: Load setpoints to test (one per DT bracket).
If None, auto-selects based on bracket midpoints.
"""
settle = settle_time or self.settle_time
if load_values is None:
# Auto-select load values targeting the middle of each bracket
# Using CP mode: power ≈ voltage × current
load_values = []
for _, _, i_lo, i_hi in DT_BRACKETS:
mid_i_A = (i_lo + i_hi) / 2 / 1000.0 # mA → A
target_power = voltage * mid_i_A * 0.4 # rough vout/vin ratio
load_values.append(max(10.0, target_power))
print("=" * 80)
print("DEADTIME OPTIMIZATION")
print(f" DT range: {dt_start}{dt_stop} (step {dt_step})")
print(f" V={voltage:.0f}V, I_limit={current_limit:.0f}A, mode={load_mode}")
print("=" * 80)
all_results: dict[str, list[TunePoint]] = {}
for i, (param_id, param_name, i_lo, i_hi) in enumerate(DT_BRACKETS):
load_val = load_values[i] if i < len(load_values) else load_values[-1]
unit = "A" if load_mode == "CC" else "W"
print(f"\n── Bracket: {param_name} ({i_lo/1000:.0f}-{i_hi/1000:.0f}A) "
f"@ {load_mode}={load_val:.0f}{unit} ──")
results = self.sweep_param(
param_name=param_name,
start=dt_start,
stop=dt_stop,
step=dt_step,
voltage=voltage,
current_limit=current_limit,
load_mode=load_mode,
load_value=load_val,
settle_time=settle,
)
all_results[param_name] = results
# Find and report best
if results:
valid = [p for p in results if 0 < p.meter_eff < 110]
if valid:
best = max(valid, key=lambda p: p.meter_eff)
print(f" ★ Best: {param_name}={best.param_value:.0f}"
f"EFF={best.meter_eff:.2f}%")
# Summary
print("\n" + "=" * 80)
print("DEADTIME OPTIMIZATION SUMMARY")
print(f"{'Bracket':<15} {'Best DT':>8} {'Efficiency':>12} {'Temp':>8}")
print("-" * 45)
for param_name, results in all_results.items():
valid = [p for p in results if 0 < p.meter_eff < 110]
if valid:
best = max(valid, key=lambda p: p.meter_eff)
print(f"{param_name:<15} {best.param_value:>8.0f} "
f"{best.meter_eff:>11.2f}% {best.stm_etemp:>7.0f}°C")
else:
print(f"{param_name:<15} {'N/A':>8} {'N/A':>12} {'N/A':>8}")
print("=" * 80)
return all_results
def apply_best_deadtimes(self, results: dict[str, list[TunePoint]]):
"""Apply the best deadtime from each bracket to the STM32."""
print("\nApplying optimal deadtimes:")
for param_name, points in results.items():
valid = [p for p in points if 0 < p.meter_eff < 110]
if valid:
best = max(valid, key=lambda p: p.meter_eff)
val = int(best.param_value)
ack = self.link.write_param(param_name, val)
status = "OK" if ack else "NO ACK"
print(f" {param_name} = {val} ({status})")
# ── Multi-point sweep ────────────────────────────────────────────
def sweep_param_multi(
self,
param_name: str,
start: float,
stop: float,
step: float,
voltages: list[float],
current_limit: float,
load_mode: str = "CP",
load_values: list[float] | None = None,
settle_time: float | None = None,
) -> list[TunePoint]:
"""Sweep a parameter across multiple voltage/load combinations.
Produces a comprehensive dataset showing how the parameter
affects efficiency across the full operating range.
"""
if load_values is None:
load_values = [200.0] # default: 200W
all_results: list[TunePoint] = []
for v in voltages:
for lv in load_values:
unit = "A" if load_mode == "CC" else "W"
print(f"\n── V={v:.0f}V, {load_mode}={lv:.0f}{unit} ──")
results = self.sweep_param(
param_name=param_name,
start=start, stop=stop, step=step,
voltage=v, current_limit=current_limit,
load_mode=load_mode, load_value=lv,
settle_time=settle_time,
)
all_results.extend(results)
return all_results
# ── Output ───────────────────────────────────────────────────────
@staticmethod
def print_results(results: list[TunePoint]):
"""Print a summary table of tuning results."""
if not results:
print("No results.")
return
valid = [p for p in results if 0 < p.meter_eff < 110]
if valid:
best = max(valid, key=lambda p: p.meter_eff)
print(f"\nBest: {best.param_name}={best.param_value:.1f}"
f"EFF={best.meter_eff:.2f}% "
f"(Pin={best.meter_pin:.1f}W Pout={best.meter_pout:.1f}W)")
@staticmethod
def write_csv(results: list[TunePoint], path: str):
"""Write tuning results to CSV."""
if not results:
return
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow([
"param_name", "param_value",
"voltage_set", "load_setpoint", "load_mode",
"meter_pin", "meter_pout", "meter_eff",
"stm_vin", "stm_vout", "stm_iin", "stm_iout",
"stm_eff", "stm_vfly", "stm_etemp",
])
for p in results:
w.writerow([
p.param_name, f"{p.param_value:.4f}",
f"{p.voltage_set:.4f}", f"{p.load_setpoint:.4f}", p.load_mode,
f"{p.meter_pin:.4f}", f"{p.meter_pout:.4f}", f"{p.meter_eff:.4f}",
f"{p.stm_vin:.4f}", f"{p.stm_vout:.4f}",
f"{p.stm_iin:.4f}", f"{p.stm_iout:.4f}",
f"{p.stm_eff:.4f}", f"{p.stm_vfly:.4f}", f"{p.stm_etemp:.4f}",
])
print(f"Results saved to {path}")
@staticmethod
def plot_sweep(results: list[TunePoint], show: bool = True):
"""Plot parameter sweep results."""
import numpy as np
import matplotlib.pyplot as plt
if not results:
return
param_name = results[0].param_name
vals = np.array([p.param_value for p in results])
eff_hioki = np.array([p.meter_eff for p in results])
eff_stm = np.array([p.stm_eff for p in results])
temp = np.array([p.stm_etemp for p in results])
# Filter valid
valid = (eff_hioki > 0) & (eff_hioki < 110)
# Group by operating point
ops = sorted(set((p.voltage_set, p.load_setpoint) for p in results))
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
cmap = plt.cm.viridis
for i, (v, l) in enumerate(ops):
color = cmap(i / max(len(ops) - 1, 1))
mask = np.array([
(p.voltage_set == v and p.load_setpoint == l and
0 < p.meter_eff < 110)
for p in results
])
if not np.any(mask):
continue
x = vals[mask]
order = np.argsort(x)
unit = results[0].load_mode
ax1.plot(x[order], eff_hioki[mask][order], "o-", color=color,
markersize=4, label=f"{v:.0f}V/{l:.0f}{unit}")
ax2.plot(x[order], temp[mask][order], "o-", color=color,
markersize=4, label=f"{v:.0f}V/{l:.0f}{unit}")
# Mark best
valid_pts = [p for p in results if 0 < p.meter_eff < 110]
if valid_pts:
best = max(valid_pts, key=lambda p: p.meter_eff)
ax1.axvline(best.param_value, color="red", linestyle="--", alpha=0.5)
ax1.plot(best.param_value, best.meter_eff, "*", color="red",
markersize=15, zorder=10,
label=f"Best: {best.param_value:.0f}{best.meter_eff:.2f}%")
ax1.set_ylabel("Efficiency (%)", fontsize=12)
ax1.set_title(f"Parameter Sweep: {param_name}", fontsize=14)
ax1.legend(fontsize=8)
ax1.grid(True, alpha=0.3)
ax2.set_xlabel(param_name, fontsize=12)
ax2.set_ylabel("Temperature (°C)", fontsize=12)
ax2.legend(fontsize=8)
ax2.grid(True, alpha=0.3)
fig.tight_layout()
if show:
plt.show()
return fig