From 1c41910c1e9f25b3c7281c2ce5a3f20f49aca438 Mon Sep 17 00:00:00 2001 From: grabowski Date: Thu, 12 Mar 2026 16:52:09 +0700 Subject: [PATCH] Add STM32 tuning integration and rewrite README with step-by-step guide - stm32_link.py: synchronous serial interface to STM32 debug protocol (ping, telemetry read/avg, param read/write, frame parser) - tuner.py: automated tuning combining HIOKI + STM32 measurements (param sweep, deadtime optimization, multi-point sweep, CSV/plot output) - CLI commands: stm32-read, stm32-write, tune-param, tune-deadtime - README: complete step-by-step guide covering setup, sweeps, analysis, tuning, shade profiles, debug console, and parameter reference Co-Authored-By: Claude Opus 4.6 --- README.md | 341 ++++++++++++++++++++++--------- testbench/cli.py | 168 +++++++++++++++ testbench/stm32_link.py | 440 +++++++++++++++++++++++++++++++++++++++ testbench/tuner.py | 443 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 1297 insertions(+), 95 deletions(-) create mode 100644 testbench/stm32_link.py create mode 100644 testbench/tuner.py diff --git a/README.md b/README.md index d74e4f9..549ffe5 100644 --- a/README.md +++ b/README.md @@ -1,127 +1,247 @@ # MPPT Testbench -Unified CLI tool for testing MPPT (Maximum Power Point Tracking) converters using three coordinated instruments: +Unified tool for testing and tuning MPPT (Maximum Power Point Tracking) converters. Combines three bench instruments with direct STM32 firmware access for closed-loop parameter optimization. | Instrument | Role | Interface | |---|---|---| -| **ITECH IT6500D** | DC power supply (solar panel simulator) | USB-TMC / SCPI via PyVISA | +| **ITECH IT6537D** | DC power supply (solar panel simulator, 80V/120A/6kW) | USB-TMC / SCPI via PyVISA | | **Prodigit 3366G** | DC electronic load (600V/420A/6kW) | RS-232 (115200 8N1 RTS/CTS) | | **HIOKI 3193-10** | Power analyzer (efficiency measurement) | GPIB via UsbGpib / PyVISA | +| **STM32G474** | Converter firmware (LVSolarBuck64) | Serial 460800 baud (debug protocol) | ## Wiring ``` - ┌──────────────────┐ - IT6500D ──(+/-)──> │ MPPT Tracker │ ──(+/-)──> Prodigit 3366G - │ (DUT) │ - HIOKI Ch5 ──(sense)── │ Input Output │ ──(sense)── HIOKI Ch6 - └──────────────────┘ + +------------------+ + IT6500D --(+/-)--> | MPPT Tracker | --(+/-)--> Prodigit 3366G + | (DUT) | + HIOKI Ch5 --(sense)-- | Input Output | --(sense)-- HIOKI Ch6 + +------------------+ + | + STM32 debug + serial (COM28) HIOKI EFF1 = P6 / P5 x 100% (output power / input power) ``` ## Installation -Requires Python 3.12+, NI-VISA runtime, and `uv`. +Requires Python 3.12+, NI-VISA runtime, and [uv](https://docs.astral.sh/uv/). ```bash git clone --recurse-submodules https://git.b4l.co.th/B4L/mppt-testbench.git cd mppt-testbench -uv venv -uv pip install -e . +uv sync ``` -## Quick Start +The debug console (inside `code64/`) has its own environment: ```bash -# 1. Check all instruments are connected -mppt identify - -# 2. Configure everything for MPPT testing -# (sets wiring, coupling, auto-range, efficiency formula, display) -mppt setup - -# 3. Take a single reading from all instruments -mppt measure - -# 4. Continuous monitoring with CSV export -mppt monitor --interval 1.0 --output data.csv - -# 5. Live real-time graph (Power, Efficiency, Voltage, Current) -mppt live --interval 0.5 +cd code64 +uv sync ``` -## Sweep Commands +## Step-by-Step Guide -### Voltage Sweep - -Sweep supply voltage across a range while the load is held constant. Records efficiency at each point. +### 1. Connect and verify instruments ```bash -mppt sweep \ +# Check all bench instruments respond +uv run mppt identify + +# Check STM32 responds (reads params + telemetry) +uv run mppt stm32-read --stm32-port COM28 +``` + +### 2. Configure instruments for MPPT testing + +```bash +uv run mppt setup +``` + +This sets wiring mode (1P2W), DC coupling, auto-ranging, efficiency formula (EFF1 = P6/P5), and display layout on the HIOKI. + +### 3. Basic measurements + +```bash +# Single reading from all instruments +uv run mppt measure + +# Continuous text monitoring with CSV export +uv run mppt monitor --interval 1.0 --output data.csv +``` + +### 4. Launch the GUI + +```bash +uv run mppt-gui +``` + +The GUI provides: +- Real-time readouts from all three instruments +- Supply voltage/current control with ON/OFF indicators +- Load mode (CC/CR/CV/CP) and setpoint control +- HIOKI channel range selectors + degauss buttons +- Meter format selector (scientific/normal) +- 2D sweep panel with time estimate +- Live-updating power, efficiency, voltage, and current plots +- Console log panel + +### 5. Run efficiency sweeps + +#### Voltage sweep (1D) + +```bash +uv run mppt sweep \ --v-start 30 --v-stop 100 --v-step 5 \ - --current-limit 10 \ - --load-mode CC --load-value 3.0 \ - --settle 2.0 \ - -o voltage_sweep.csv + --current-limit 20 \ + --load-mode CP --load-value 200 \ + --settle 2.0 -o voltage_sweep.csv ``` -After the sweep, the supply returns to 75V and stays ON. - -### Load Current Sweep - -Sweep load current (CC mode) at a fixed supply voltage. +#### Load sweep at fixed voltage (1D) ```bash -mppt sweep-load \ - --voltage 75 --current-limit 10 \ - --i-start 0.5 --i-stop 8 --i-step 0.5 \ - --settle 2.0 \ - -o load_sweep.csv +uv run mppt sweep-load \ + --voltage 60 --current-limit 20 \ + --i-start 0.5 --i-stop 15 --i-step 0.5 \ + --settle 2.0 -o load_sweep.csv ``` -After the sweep, the load turns OFF and the supply returns to 75V (stays ON). +#### 2D voltage x load sweep (efficiency map) -### Auto-Range Handling +```bash +# Constant Power mode +uv run mppt sweep-vi \ + --v-start 60 --v-stop 100 --v-step 5 \ + --l-start 50 --l-stop 500 --l-step 50 \ + --load-mode CP --current-limit 20 \ + --settle 2.0 -o map_cp.csv -The HIOKI 3193-10 returns special error values (`+9999.9E+99`) while auto-ranging. The testbench automatically waits for all measurement channels to settle before recording each sweep point. The supply is kept alive with periodic queries during the wait to prevent USB-TMC timeouts. +# Constant Current mode +uv run mppt sweep-vi \ + --v-start 35 --v-stop 100 --v-step 5 \ + --l-start 0.5 --l-stop 15 --l-step 0.5 \ + --load-mode CC --current-limit 20 \ + --settle 2.0 -o map_cc.csv +``` -## Direct Instrument Control +### 6. Analyze sweep results + +```bash +# Generate efficiency overlay, heatmap, and power loss plots (no instruments needed) +uv run mppt plot-sweep map_cp.csv + +# Save plots without displaying +uv run mppt plot-sweep map_cp.csv --no-show -o plots/ +``` + +Produces three PNG files: +- `*_efficiency.png` -- efficiency vs load, one line per voltage, best point marked +- `*_heatmap.png` -- 2D efficiency surface (voltage x load) +- `*_loss.png` -- power loss vs load, all voltages overlaid + +### 7. Tune converter parameters + +The tuning commands combine the testbench instruments (ground truth efficiency from HIOKI) with direct STM32 parameter writes to find optimal settings. + +#### Read current STM32 state + +```bash +uv run mppt stm32-read +``` + +#### Write a single parameter + +```bash +uv run mppt stm32-write --param dt_10_20A --value 20 +``` + +#### Sweep a parameter to find the optimum + +Sweeps a parameter from start to stop, measuring HIOKI efficiency + STM32 telemetry at each step. Plots the result. + +```bash +# Optimize deadtime for the 10-20A bracket at 300W +uv run mppt tune-param \ + --param dt_10_20A --start 14 --stop 40 --step 1 \ + --voltage 60 --current-limit 20 \ + --load-mode CP --load-value 300 \ + --settle 3.0 -o dt_tune.csv + +# Tune Vfly proportional gain +uv run mppt tune-param \ + --param vfly_kp --start -2 --stop 2 --step 0.1 \ + --voltage 60 --current-limit 20 \ + --load-mode CP --load-value 200 \ + --settle 3.0 +``` + +#### Auto-optimize all deadtime brackets + +Automatically sweeps deadtime for each of the 6 current brackets (0-3A, 3-5A, 5-10A, 10-20A, 20-30A, 30-45A), setting an appropriate load for each bracket. + +```bash +# Sweep and report best values +uv run mppt tune-deadtime \ + --voltage 60 --current-limit 20 --load-mode CP \ + --dt-start 14 --dt-stop 50 --dt-step 1 \ + -o deadtime_results.csv + +# Sweep, report, and apply best values to STM32 +uv run mppt tune-deadtime \ + --voltage 60 --current-limit 20 --load-mode CP \ + --load-values 20,50,100,250,400,600 \ + --apply +``` + +### 8. Shade / irradiance profile simulation + +Simulate cloud passing or partial shading with a CSV-driven sequence: + +```bash +uv run mppt shade-profile \ + --profile samples/cloud_pass.csv \ + --settle 2.0 -o shade_results.csv +``` + +Profile CSV format: `time,voltage,current_limit,load_mode,load_value` + +### 9. Real-time debug console (TUI) + +For live monitoring and parameter tuning via the Textual terminal UI: + +```bash +cd code64 +uv run debug-console COM28 +``` + +Keybindings: `p` ping, `r` read params, `f` toggle EMA filter, `l` show log path, `q` quit. + +### 10. Direct instrument control ```bash # Supply -mppt supply set --voltage 48 --current 10 -mppt supply on -mppt supply off +uv run mppt supply set --voltage 48 --current 10 +uv run mppt supply on +uv run mppt supply off # Load -mppt load set --mode CC --value 5.0 -mppt load set --mode CR --value 10.0 -mppt load on -mppt load off +uv run mppt load set --mode CP --value 200 +uv run mppt load on +uv run mppt load off # Emergency shutdown (load first, then supply) -mppt safe-off -``` - -## Efficiency Measurement - -Measure averaged efficiency at a fixed operating point: - -```bash -mppt efficiency \ - --voltage 75 --current-limit 10 \ - --load-mode CC --load-value 3.0 \ - --samples 10 --settle 3.0 +uv run mppt safe-off ``` ## CLI Reference ``` -mppt [-h] [--supply-address ADDR] [--load-port PORT] [--load-baud BAUD] - [--meter-address ADDR] [--timeout MS] - {identify,setup,measure,monitor,live,sweep,sweep-load,efficiency, - supply,load,safe-off} +uv run mppt [-h] [--supply-address ADDR] [--load-port PORT] [--load-baud BAUD] + [--meter-address ADDR] [--timeout MS] + [--stm32-port PORT] [--stm32-baud BAUD] + {command} ``` | Command | Description | @@ -133,7 +253,14 @@ mppt [-h] [--supply-address ADDR] [--load-port PORT] [--load-baud BAUD] | `live` | Real-time 4-panel matplotlib graph | | `sweep` | Voltage sweep with efficiency recording | | `sweep-load` | Load current sweep at fixed voltage | +| `sweep-vi` | 2D voltage x load sweep (efficiency map) | | `efficiency` | Averaged efficiency at a fixed operating point | +| `shade-profile` | Run shade/irradiance profile from CSV | +| `plot-sweep` | Generate analysis plots from sweep CSV (offline) | +| `stm32-read` | Read all STM32 parameters and telemetry | +| `stm32-write` | Write a parameter to the STM32 | +| `tune-param` | Sweep an STM32 parameter while measuring efficiency | +| `tune-deadtime` | Auto-optimize deadtime for each current bracket | | `supply` | Direct IT6500D control (on/off/set) | | `load` | Direct Prodigit 3366G control (on/off/set) | | `safe-off` | Emergency shutdown (load first, then supply) | @@ -147,48 +274,70 @@ mppt [-h] [--supply-address ADDR] [--load-port PORT] [--load-baud BAUD] | `--load-baud` | `115200` | Prodigit 3366G baud rate | | `--meter-address` | auto-detect | HIOKI 3193-10 VISA address | | `--timeout` | `5000` | VISA timeout in milliseconds | +| `--stm32-port` | `COM28` | STM32 debug serial port | +| `--stm32-baud` | `460800` | STM32 debug baud rate | + +## Tunable STM32 Parameters + +| Parameter | Type | Range | Description | +|---|---|---|---| +| `VREF` | uint16 | 3100-3700 | ADC reference voltage | +| `vfly_kp` | float | -10 to 10 | Vfly proportional gain | +| `vfly_ki` | float | -10 to 10 | Vfly integral gain | +| `vfly_clamp` | uint16 | 0-10000 | Vfly integrator clamp | +| `vfly_active` | uint8 | 0-1 | Vfly loop enable | +| `cc_target` | float | 0-60000 | CC target (mA) | +| `cc_gain` | float | -1 to 1 | CC proportional gain | +| `cc_active` | int32 | 0-1 | CC loop enable | +| `mppt_step` | float | 0-10000 | MPPT P&O step size (mA) | +| `mppt_deadband` | float | 0-1 | MPPT deadband | +| `mppt_active` | int32 | 0-1 | MPPT loop enable | +| `dt_0_3A` | uint8 | 14-200 | Deadtime 0-3A (HRTIM ticks) | +| `dt_3_5A` | uint8 | 14-200 | Deadtime 3-5A | +| `dt_5_10A` | uint8 | 14-200 | Deadtime 5-10A | +| `dt_10_20A` | uint8 | 14-200 | Deadtime 10-20A | +| `dt_20_30A` | uint8 | 14-200 | Deadtime 20-30A | +| `dt_30_45A` | uint8 | 14-200 | Deadtime 30-45A | ## CSV Output Format -Sweep CSV files contain the following columns: +Sweep CSV files contain: | Column | Description | |---|---| | `voltage_set` | Supply voltage setpoint (V) | | `current_limit` | Supply current limit (A) | -| `load_setpoint` | Load setpoint value (A for CC mode) | +| `load_setpoint` | Load setpoint value (A for CC, W for CP) | | `supply_V/I/P` | Supply measured voltage, current, power | | `load_V/I/P` | Load measured voltage, current, power | | `input_power` | HIOKI P5 -- power into MPPT tracker (W) | | `output_power` | HIOKI P6 -- power out of MPPT tracker (W) | | `efficiency` | HIOKI EFF1 -- P6/P5 x 100 (%) | -## Setup Details - -`mppt setup` configures the instruments as follows: - -- **Supply**: Remote control mode -- **Load**: Remote control mode -- **Meter**: - - Wiring: 1P2W - - Ch5 (solar input): DC coupling, voltage and current auto-range - - Ch6 (MPPT output): DC coupling, voltage and current auto-range - - Response speed: SLOW (best accuracy) - - EFF1 = P6 / P5 (output / input) - - Display: 16-item SELECT view -- U5, I5, P5, EFF1 (left) | U6, I6, P6 (right) +Tuning CSV files additionally contain `param_name`, `param_value`, and STM32 telemetry columns (`stm_vin`, `stm_vout`, `stm_iin`, `stm_iout`, `stm_eff`, `stm_vfly`, `stm_etemp`). ## Project Structure ``` mppt-testbench/ -├── IT6500D/ git submodule - DC power supply driver -├── PRODIGIT-3366G/ git submodule - electronic load driver -├── HIOKI-3193-10/ git submodule - power analyzer driver -├── testbench/ -│ ├── __init__.py exports MPPTTestbench -│ ├── bench.py orchestrator (sweeps, measurement, auto-range wait) -│ └── cli.py unified CLI entry point -└── pyproject.toml package config, entry point: mppt ++-- IT6500D/ git submodule -- DC power supply driver ++-- PRODIGIT-3366G/ git submodule -- electronic load driver ++-- HIOKI-3193-10/ git submodule -- power analyzer driver ++-- testbench/ +| +-- __init__.py exports MPPTTestbench +| +-- bench.py orchestrator (sweeps, measurement, auto-range wait) +| +-- cli.py unified CLI entry point +| +-- gui.py tkinter GUI with live plots +| +-- gui_workers.py background instrument I/O thread +| +-- stm32_link.py synchronous STM32 debug protocol interface +| +-- tuner.py automated tuning routines (param sweep, deadtime opt) ++-- code64/ +| +-- Core/ STM32G474 firmware (C) +| +-- Drivers/ HAL drivers +| +-- debug_console/ Textual TUI for live debugging +| +-- pyproject.toml uv-compatible package config ++-- samples/ shade profile CSV examples ++-- pyproject.toml package config, entry points: mppt, mppt-gui ``` ## Dependencies @@ -197,4 +346,6 @@ mppt-testbench/ - [PyVISA](https://pyvisa.readthedocs.io/) + [pyvisa-py](https://pyvisa.readthedocs.io/projects/pyvisa-py/) - [pyserial](https://pyserial.readthedocs.io/) - [matplotlib](https://matplotlib.org/) +- [numpy](https://numpy.org/) - NI-VISA runtime (for GPIB/USB-TMC communication) +- [Textual](https://textual.textualize.io/) (debug console only, in code64/) diff --git a/testbench/cli.py b/testbench/cli.py index 9b72ecb..e4b9754 100644 --- a/testbench/cli.py +++ b/testbench/cli.py @@ -550,6 +550,124 @@ def cmd_safe_off(bench: MPPTTestbench, _args: argparse.Namespace) -> None: print("Done.") +# ── Tuning commands (instruments + STM32) ──────────────────────────── + + +def cmd_stm32_read(bench: MPPTTestbench, args: argparse.Namespace) -> None: + """Read all STM32 parameters and telemetry.""" + from testbench.stm32_link import STM32Link + + with STM32Link(args.stm32_port, args.stm32_baud) as link: + if not link.ping(): + print("STM32 not responding to ping!") + return + + print("STM32 connected.\n") + + # Read params + params = link.read_all_params() + if params: + print("Parameters:") + for name, val in sorted(params.items()): + print(f" {name:<20s} = {val}") + print() + + # Read telemetry + t = link.read_telemetry_avg(n=20) + if t: + print("Telemetry (20-sample avg):") + print(f" Vin = {t.vin_V:8.2f} V") + print(f" Vout = {t.vout_V:8.2f} V") + print(f" Iin = {t.iin_A:8.2f} A") + print(f" Iout = {t.iout_A:8.2f} A") + print(f" Pin = {t.power_in_W:8.2f} W") + print(f" Pout = {t.power_out_W:8.2f} W") + print(f" EFF = {t.efficiency:8.1f} %") + print(f" Vfly = {t.vfly/1000:8.2f} V") + print(f" Temp = {t.etemp:8.1f} °C") + + +def cmd_stm32_write(bench: MPPTTestbench, args: argparse.Namespace) -> None: + """Write a parameter to the STM32.""" + from testbench.stm32_link import STM32Link + + with STM32Link(args.stm32_port, args.stm32_baud) as link: + if not link.ping(): + print("STM32 not responding to ping!") + return + ack = link.write_param(args.param, args.value) + print(f"{args.param} = {args.value} {'ACK' if ack else 'NO ACK'}") + + +def cmd_tune_param(bench: MPPTTestbench, args: argparse.Namespace) -> None: + """Sweep an STM32 parameter while measuring efficiency.""" + from testbench.stm32_link import STM32Link + from testbench.tuner import Tuner + + with STM32Link(args.stm32_port, args.stm32_baud) as link: + if not link.ping(): + print("STM32 not responding!") + return + + tuner = Tuner(bench, link, settle_time=args.settle) + results = tuner.sweep_param( + param_name=args.param, + start=args.start, + stop=args.stop, + step=args.step, + voltage=args.voltage, + current_limit=args.current_limit, + load_mode=args.load_mode, + load_value=args.load_value, + settle_time=args.settle, + ) + + tuner.print_results(results) + + if args.output: + tuner.write_csv(results, args.output) + + if not args.no_plot and results: + tuner.plot_sweep(results, show=True) + + +def cmd_tune_deadtime(bench: MPPTTestbench, args: argparse.Namespace) -> None: + """Optimize deadtime for each current bracket.""" + from testbench.stm32_link import STM32Link + from testbench.tuner import Tuner + + with STM32Link(args.stm32_port, args.stm32_baud) as link: + if not link.ping(): + print("STM32 not responding!") + return + + tuner = Tuner(bench, link, settle_time=args.settle) + + load_values = None + if args.load_values: + load_values = [float(x) for x in args.load_values.split(",")] + + results = tuner.tune_deadtime( + dt_start=args.dt_start, + dt_stop=args.dt_stop, + dt_step=args.dt_step, + voltage=args.voltage, + current_limit=args.current_limit, + load_mode=args.load_mode, + load_values=load_values, + settle_time=args.settle, + ) + + if args.apply: + tuner.apply_best_deadtimes(results) + + if args.output: + all_pts = [] + for pts in results.values(): + all_pts.extend(pts) + tuner.write_csv(all_pts, args.output) + + # ── Offline plot command (no instruments needed) ───────────────────── @@ -726,6 +844,9 @@ examples: %(prog)s safe-off %(prog)s plot-sweep sweep_vi_20260312_151212.csv %(prog)s plot-sweep sweep_vi_20260312_151212.csv --no-show -o plots/ + %(prog)s stm32-read --stm32-port COM28 + %(prog)s tune-param --stm32-port COM28 --param dt_10_20A --start 14 --stop 40 --step 1 --voltage 60 --current-limit 20 --load-mode CP --load-value 300 + %(prog)s tune-deadtime --stm32-port COM28 --voltage 60 --current-limit 20 --load-mode CP --apply """, ) @@ -750,6 +871,14 @@ examples: "--timeout", type=int, default=5000, help="VISA timeout in ms (default: 5000)", ) + parser.add_argument( + "--stm32-port", default="COM28", + help="STM32 debug serial port (default: COM28)", + ) + parser.add_argument( + "--stm32-baud", type=int, default=460800, + help="STM32 debug baud rate (default: 460800)", + ) sub = parser.add_subparsers(dest="command", required=True) @@ -845,6 +974,41 @@ examples: # safe-off sub.add_parser("safe-off", help="Emergency: turn off load and supply") + # stm32-read + sub.add_parser("stm32-read", help="Read STM32 parameters and telemetry") + + # stm32-write + p_sw = sub.add_parser("stm32-write", help="Write a parameter to the STM32") + p_sw.add_argument("--param", required=True, help="Parameter name") + p_sw.add_argument("--value", type=float, required=True, help="Value to write") + + # tune-param + p_tp = sub.add_parser("tune-param", help="Sweep an STM32 parameter while measuring efficiency") + p_tp.add_argument("--param", required=True, help="Parameter name (e.g. dt_10_20A, vfly_kp)") + p_tp.add_argument("--start", type=float, required=True, help="Start value") + p_tp.add_argument("--stop", type=float, required=True, help="Stop value") + p_tp.add_argument("--step", type=float, required=True, help="Step size") + p_tp.add_argument("--voltage", type=float, required=True, help="Supply voltage (V)") + p_tp.add_argument("--current-limit", type=float, required=True, help="Supply current limit (A)") + p_tp.add_argument("--load-mode", choices=["CC", "CP"], default="CP", help="Load mode") + p_tp.add_argument("--load-value", type=float, default=200.0, help="Load setpoint (A or W)") + p_tp.add_argument("--settle", type=float, default=3.0, help="Settle time per step (s)") + p_tp.add_argument("--no-plot", action="store_true", help="Skip plot") + p_tp.add_argument("-o", "--output", help="CSV output file") + + # tune-deadtime + p_td = sub.add_parser("tune-deadtime", help="Optimize deadtime for each current bracket") + p_td.add_argument("--dt-start", type=int, default=14, help="Min deadtime ticks (default: 14)") + p_td.add_argument("--dt-stop", type=int, default=50, help="Max deadtime ticks (default: 50)") + p_td.add_argument("--dt-step", type=int, default=1, help="Deadtime step (default: 1)") + p_td.add_argument("--voltage", type=float, default=60.0, help="Supply voltage (V)") + p_td.add_argument("--current-limit", type=float, default=20.0, help="Supply current limit (A)") + p_td.add_argument("--load-mode", choices=["CC", "CP"], default="CP", help="Load mode") + p_td.add_argument("--load-values", help="Comma-separated load values per bracket (e.g. 20,50,100,250,400,600)") + p_td.add_argument("--settle", type=float, default=3.0, help="Settle time per step (s)") + p_td.add_argument("--apply", action="store_true", help="Apply best deadtimes after sweep") + p_td.add_argument("-o", "--output", help="CSV output file") + # plot-sweep (offline, no instruments) p_plot = sub.add_parser("plot-sweep", help="Plot efficiency analysis from sweep CSV (no instruments needed)") p_plot.add_argument("csv", help="Sweep CSV file to plot") @@ -872,6 +1036,10 @@ examples: "supply": cmd_supply, "load": cmd_load, "safe-off": cmd_safe_off, + "stm32-read": cmd_stm32_read, + "stm32-write": cmd_stm32_write, + "tune-param": cmd_tune_param, + "tune-deadtime": cmd_tune_deadtime, } bench = connect_bench(args) diff --git a/testbench/stm32_link.py b/testbench/stm32_link.py new file mode 100644 index 0000000..e4b65e7 --- /dev/null +++ b/testbench/stm32_link.py @@ -0,0 +1,440 @@ +"""Synchronous serial link to the STM32 debug protocol. + +Provides blocking read/write of telemetry and parameters, suitable +for automated tuning scripts (not a TUI). Reuses the binary protocol +from code64/debug_console/protocol.py. +""" + +from __future__ import annotations + +import struct +import time +from dataclasses import dataclass, field +from typing import Optional + +import serial + +# ── Protocol constants ─────────────────────────────────────────────── + +SYNC_BYTE = 0xAA + +CMD_TELEMETRY = 0x01 +CMD_PARAM_WRITE = 0x02 +CMD_PARAM_WRITE_ACK = 0x03 +CMD_PARAM_READ_ALL = 0x04 +CMD_PARAM_VALUE = 0x05 +CMD_PING = 0x10 +CMD_PONG = 0x11 +CMD_ERROR_MSG = 0xE0 + +PTYPE_FLOAT = 0 +PTYPE_UINT16 = 1 +PTYPE_UINT8 = 2 +PTYPE_INT32 = 3 + + +# ── CRC8 (poly 0x07) ──────────────────────────────────────────────── + +_CRC8_TABLE = [0] * 256 + +def _init_crc8(): + for i in range(256): + crc = i + for _ in range(8): + crc = ((crc << 1) ^ 0x07) & 0xFF if crc & 0x80 else (crc << 1) & 0xFF + _CRC8_TABLE[i] = crc + +_init_crc8() + + +def crc8(data: bytes) -> int: + crc = 0x00 + for b in data: + crc = _CRC8_TABLE[crc ^ b] + return crc + + +# ── Telemetry ──────────────────────────────────────────────────────── + +@dataclass +class Telemetry: + """Decoded telemetry packet from the STM32.""" + vin: float = 0.0 # mV + vout: float = 0.0 # mV + iin: float = 0.0 # mA (negative = into converter) + iout: float = 0.0 # mA + vfly: float = 0.0 # mV + etemp: float = 0.0 # °C + last_tmp: int = 0 + VREF: int = 0 + vfly_correction: int = 0 + vfly_integral: float = 0.0 + vfly_avg_debug: float = 0.0 + cc_output_f: float = 0.0 + mppt_iref: float = 0.0 + mppt_last_vin: float = 0.0 + mppt_last_iin: float = 0.0 + p_in: float = 0.0 + p_out: float = 0.0 + seq: int = 0 + timestamp: float = field(default_factory=time.time) + + @property + def vin_V(self) -> float: + return self.vin / 1000.0 + + @property + def vout_V(self) -> float: + return self.vout / 1000.0 + + @property + def iin_A(self) -> float: + return self.iin / 1000.0 + + @property + def iout_A(self) -> float: + return self.iout / 1000.0 + + @property + def power_in_W(self) -> float: + return self.vin * (-self.iin) / 1e6 + + @property + def power_out_W(self) -> float: + return self.vout * self.iout / 1e6 + + @property + def efficiency(self) -> float: + p_in = self.power_in_W + return (self.power_out_W / p_in * 100.0) if p_in > 0.1 else 0.0 + + +_TELEM_FMT = "<6f hHh h 6f 2f B3x" # 68 bytes +_TELEM_SIZE = struct.calcsize(_TELEM_FMT) + + +def _decode_telemetry(payload: bytes) -> Optional[Telemetry]: + if len(payload) < _TELEM_SIZE: + return None + v = struct.unpack(_TELEM_FMT, payload[:_TELEM_SIZE]) + return Telemetry( + vin=v[0], vout=v[1], iin=v[2], iout=v[3], vfly=v[4], etemp=v[5], + last_tmp=v[6], VREF=v[7], vfly_correction=v[8], + vfly_integral=v[10], vfly_avg_debug=v[11], + cc_output_f=v[12], mppt_iref=v[13], + mppt_last_vin=v[14], mppt_last_iin=v[15], + p_in=v[16], p_out=v[17], seq=v[18], + ) + + +# ── Parameter definitions ──────────────────────────────────────────── + +@dataclass +class ParamDef: + id: int + name: str + ptype: int + group: str + min_val: float = -1e9 + max_val: float = 1e9 + fmt: str = ".4f" + + +PARAMS = [ + # Compensator + ParamDef(0x25, "VREF", PTYPE_UINT16, "Compensator", 3100, 3700, ".0f"), + # Vfly + ParamDef(0x20, "vfly_kp", PTYPE_FLOAT, "Vfly", -10, 10, ".4f"), + ParamDef(0x21, "vfly_ki", PTYPE_FLOAT, "Vfly", -10, 10, ".6f"), + ParamDef(0x22, "vfly_clamp", PTYPE_UINT16, "Vfly", 0, 10000, ".0f"), + ParamDef(0x23, "vfly_loop_trig", PTYPE_UINT16, "Vfly", 1, 10000, ".0f"), + ParamDef(0x24, "vfly_active", PTYPE_UINT8, "Vfly", 0, 1, ".0f"), + # CC + ParamDef(0x30, "cc_target", PTYPE_FLOAT, "CC", 0, 60000, ".0f"), + ParamDef(0x31, "cc_gain", PTYPE_FLOAT, "CC", -1, 1, ".4f"), + ParamDef(0x32, "cc_min_step", PTYPE_FLOAT, "CC", -1000, 0, ".1f"), + ParamDef(0x33, "cc_max_step", PTYPE_FLOAT, "CC", 0, 1000, ".1f"), + ParamDef(0x34, "cc_loop_trig", PTYPE_UINT16, "CC", 1, 10000, ".0f"), + ParamDef(0x35, "cc_active", PTYPE_INT32, "CC", 0, 1, ".0f"), + # MPPT + ParamDef(0x40, "mppt_step", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"), + ParamDef(0x41, "mppt_iref_min", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), + ParamDef(0x42, "mppt_iref_max", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), + ParamDef(0x43, "mppt_dv_thresh", PTYPE_FLOAT, "MPPT", 0, 10000, ".1f"), + ParamDef(0x44, "mppt_loop_trig", PTYPE_UINT16, "MPPT", 1, 10000, ".0f"), + ParamDef(0x45, "mppt_active", PTYPE_INT32, "MPPT", 0, 1, ".0f"), + ParamDef(0x46, "mppt_init_iref", PTYPE_FLOAT, "MPPT", 0, 60000, ".0f"), + ParamDef(0x47, "mppt_deadband", PTYPE_FLOAT, "MPPT", 0, 1, ".4f"), + # Global + ParamDef(0x50, "vin_min_ctrl", PTYPE_FLOAT, "Global", 0, 90000, ".0f"), + # Deadtime + ParamDef(0x60, "dt_0_3A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), + ParamDef(0x61, "dt_3_5A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), + ParamDef(0x62, "dt_5_10A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), + ParamDef(0x63, "dt_10_20A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), + ParamDef(0x64, "dt_20_30A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), + ParamDef(0x65, "dt_30_45A", PTYPE_UINT8, "Deadtime", 14, 200, ".0f"), +] + +PARAM_BY_ID: dict[int, ParamDef] = {p.id: p for p in PARAMS} +PARAM_BY_NAME: dict[str, ParamDef] = {p.name: p for p in PARAMS} + +# Deadtime brackets — current thresholds in mA matching firmware +DT_BRACKETS = [ + (0x60, "dt_0_3A", 0, 3000), + (0x61, "dt_3_5A", 3000, 5000), + (0x62, "dt_5_10A", 5000, 10000), + (0x63, "dt_10_20A", 10000, 20000), + (0x64, "dt_20_30A", 20000, 30000), + (0x65, "dt_30_45A", 30000, 45000), +] + + +# ── Frame building ─────────────────────────────────────────────────── + +def _build_frame(cmd: int, payload: bytes = b"") -> bytes: + header = bytes([SYNC_BYTE, cmd, len(payload)]) + frame = header + payload + return frame + bytes([crc8(frame)]) + + +def _build_param_write(param_id: int, ptype: int, value) -> bytes: + if ptype == PTYPE_FLOAT: + val_bytes = struct.pack(" Optional[tuple[int, float]]: + if len(payload) < 8: + return None + param_id, ptype = payload[0], payload[1] + vb = payload[4:8] + if ptype == PTYPE_FLOAT: + value = struct.unpack(" 128: + self.state = 0 + else: + self.state = 3 + elif self.state == 3: # WAIT_PAYLOAD + self.payload.append(b) + self.buf.append(b) + self.idx += 1 + if self.idx >= self.length: + self.state = 4 + elif self.state == 4: # WAIT_CRC + expected = crc8(bytes(self.buf)) + self.state = 0 + if b == expected: + yield (self.cmd, bytes(self.payload)) + + +# ── STM32Link — synchronous serial interface ───────────────────────── + +class STM32Link: + """Blocking serial link to STM32 debug protocol. + + Usage:: + + link = STM32Link("COM28") + link.ping() + t = link.read_telemetry() + print(f"Vin={t.vin_V:.1f}V Iout={t.iout_A:.1f}A EFF={t.efficiency:.1f}%") + link.write_param("dt_10_20A", 18) + link.close() + """ + + def __init__(self, port: str, baudrate: int = 460800, timeout: float = 2.0): + self.ser = serial.Serial(port, baudrate, timeout=timeout) + self._parser = _FrameParser() + self._param_cache: dict[int, float] = {} + + def close(self): + if self.ser and self.ser.is_open: + self.ser.close() + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + # ── Low-level ──────────────────────────────────────────────────── + + def _send(self, frame: bytes): + self.ser.write(frame) + + def _recv_frames(self, timeout: float = 1.0) -> list[tuple[int, bytes]]: + """Read available data and return decoded frames.""" + frames = [] + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + data = self.ser.read(self.ser.in_waiting or 1) + if data: + for cmd, payload in self._parser.feed(data): + frames.append((cmd, payload)) + if frames: + # Drain any remaining data + time.sleep(0.02) + data = self.ser.read(self.ser.in_waiting) + if data: + for cmd, payload in self._parser.feed(data): + frames.append((cmd, payload)) + break + return frames + + def _wait_for(self, target_cmd: int, timeout: float = 2.0) -> Optional[bytes]: + """Wait for a specific command response, processing others.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + data = self.ser.read(self.ser.in_waiting or 1) + if data: + for cmd, payload in self._parser.feed(data): + if cmd == target_cmd: + return payload + # Cache param values seen in passing + if cmd in (CMD_PARAM_VALUE, CMD_PARAM_WRITE_ACK): + result = _decode_param_value(payload) + if result: + self._param_cache[result[0]] = result[1] + # Cache telemetry too + if cmd == CMD_TELEMETRY: + self._last_telemetry = _decode_telemetry(payload) + return None + + # ── Commands ───────────────────────────────────────────────────── + + def ping(self, timeout: float = 2.0) -> bool: + """Send PING, return True if PONG received.""" + self._send(_build_frame(CMD_PING)) + return self._wait_for(CMD_PONG, timeout) is not None + + def read_telemetry(self, timeout: float = 2.0) -> Optional[Telemetry]: + """Wait for next telemetry packet.""" + payload = self._wait_for(CMD_TELEMETRY, timeout) + if payload: + return _decode_telemetry(payload) + return None + + def read_telemetry_avg(self, n: int = 10, timeout: float = 5.0) -> Optional[Telemetry]: + """Read n telemetry packets and return the average.""" + samples: list[Telemetry] = [] + deadline = time.monotonic() + timeout + while len(samples) < n and time.monotonic() < deadline: + t = self.read_telemetry(timeout=deadline - time.monotonic()) + if t: + samples.append(t) + if not samples: + return None + # Average all float fields + avg = Telemetry() + for attr in ("vin", "vout", "iin", "iout", "vfly", "etemp", + "vfly_integral", "vfly_avg_debug", "cc_output_f", + "mppt_iref", "mppt_last_vin", "mppt_last_iin", + "p_in", "p_out"): + setattr(avg, attr, sum(getattr(s, attr) for s in samples) / len(samples)) + avg.seq = samples[-1].seq + return avg + + def request_all_params(self): + """Request all parameter values from the STM32.""" + self._send(_build_frame(CMD_PARAM_READ_ALL)) + + def read_all_params(self, timeout: float = 3.0) -> dict[str, float]: + """Request and collect all parameter values.""" + self._param_cache.clear() + self.request_all_params() + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + data = self.ser.read(self.ser.in_waiting or 1) + if data: + for cmd, payload in self._parser.feed(data): + if cmd == CMD_PARAM_VALUE: + result = _decode_param_value(payload) + if result: + self._param_cache[result[0]] = result[1] + time.sleep(0.05) + # Convert to name->value + return { + PARAM_BY_ID[pid].name: val + for pid, val in self._param_cache.items() + if pid in PARAM_BY_ID + } + + def write_param(self, name: str, value: float, wait_ack: bool = True) -> bool: + """Write a parameter by name. Returns True if ACK received.""" + pdef = PARAM_BY_NAME.get(name) + if not pdef: + raise ValueError(f"Unknown parameter: {name!r}") + if value < pdef.min_val or value > pdef.max_val: + raise ValueError( + f"{name}: {value} out of range [{pdef.min_val}, {pdef.max_val}]" + ) + frame = _build_param_write(pdef.id, pdef.ptype, value) + self._send(frame) + if wait_ack: + payload = self._wait_for(CMD_PARAM_WRITE_ACK, timeout=2.0) + if payload: + result = _decode_param_value(payload) + if result: + self._param_cache[result[0]] = result[1] + return True + return False + return True + + def write_param_by_id(self, param_id: int, value: float) -> bool: + """Write a parameter by ID.""" + pdef = PARAM_BY_ID.get(param_id) + if not pdef: + raise ValueError(f"Unknown param ID: 0x{param_id:02X}") + return self.write_param(pdef.name, value) diff --git a/testbench/tuner.py b/testbench/tuner.py new file mode 100644 index 0000000..a0cb2ba --- /dev/null +++ b/testbench/tuner.py @@ -0,0 +1,443 @@ +"""Automated tuning routines combining testbench instruments + STM32 link. + +Uses the power analyzer (HIOKI) as ground truth for efficiency while +adjusting converter parameters via the STM32 debug protocol. +""" + +from __future__ import annotations + +import csv +import time +from dataclasses import dataclass, field +from pathlib import Path + +from testbench.bench import MPPTTestbench, IDLE_VOLTAGE +from testbench.stm32_link import ( + STM32Link, Telemetry, PARAM_BY_NAME, DT_BRACKETS, +) + + +@dataclass +class TunePoint: + """One measurement during a tuning sweep.""" + param_name: str + param_value: float + voltage_set: float + load_setpoint: float + load_mode: str + + # HIOKI measurements (ground truth) + meter_pin: float = 0.0 + meter_pout: float = 0.0 + meter_eff: float = 0.0 + + # STM32 telemetry + stm_vin: float = 0.0 + stm_vout: float = 0.0 + stm_iin: float = 0.0 + stm_iout: float = 0.0 + stm_eff: float = 0.0 + stm_vfly: float = 0.0 + stm_etemp: float = 0.0 + + timestamp: float = field(default_factory=time.time) + + +class Tuner: + """Combines MPPTTestbench + STM32Link for automated tuning. + + Usage:: + + tuner = Tuner(bench, link) + results = tuner.sweep_param( + "dt_10_20A", start=14, stop=40, step=1, + voltage=60.0, current_limit=20.0, + load_mode="CP", load_value=300.0, + ) + tuner.print_results(results) + """ + + def __init__( + self, + bench: MPPTTestbench, + link: STM32Link, + settle_time: float = 3.0, + stm_avg_samples: int = 10, + ): + self.bench = bench + self.link = link + self.settle_time = settle_time + self.stm_avg_samples = stm_avg_samples + + def _measure( + self, + param_name: str, + param_value: float, + voltage: float, + load_value: float, + load_mode: str, + ) -> TunePoint: + """Take one combined measurement from HIOKI + STM32.""" + point = TunePoint( + param_name=param_name, + param_value=param_value, + voltage_set=voltage, + load_setpoint=load_value, + load_mode=load_mode, + ) + + # HIOKI measurement + meter_vals = self.bench._wait_meter_ready(max_retries=10, retry_delay=1.0) + point.meter_pin = meter_vals.get("P5", 0.0) + point.meter_pout = meter_vals.get("P6", 0.0) + point.meter_eff = meter_vals.get("EFF1", 0.0) + + # STM32 telemetry (averaged) + t = self.link.read_telemetry_avg(n=self.stm_avg_samples) + if t: + point.stm_vin = t.vin_V + point.stm_vout = t.vout_V + point.stm_iin = t.iin_A + point.stm_iout = t.iout_A + point.stm_eff = t.efficiency + point.stm_vfly = t.vfly / 1000.0 + point.stm_etemp = t.etemp + + return point + + # ── Parameter sweep ────────────────────────────────────────────── + + def sweep_param( + self, + param_name: str, + start: float, + stop: float, + step: float, + voltage: float, + current_limit: float, + load_mode: str = "CC", + load_value: float = 5.0, + settle_time: float | None = None, + ) -> list[TunePoint]: + """Sweep a single STM32 parameter while measuring efficiency. + + Sets up the testbench at the given operating point, then steps + the parameter from start to stop, measuring at each step. + + Returns list of TunePoints with both HIOKI and STM32 data. + """ + if param_name not in PARAM_BY_NAME: + raise ValueError(f"Unknown parameter: {param_name!r}") + + settle = settle_time or self.settle_time + + if step == 0: + raise ValueError("step cannot be zero") + if start > stop and step > 0: + step = -step + elif start < stop and step < 0: + step = -step + + # Count steps + n_steps = int(abs(stop - start) / abs(step)) + 1 + unit = "A" if load_mode == "CC" else "W" + print(f"Parameter sweep: {param_name} = {start} → {stop} (step {step})") + print(f" Operating point: V={voltage:.1f}V, {load_mode}={load_value:.1f}{unit}") + print(f" {n_steps} points, settle={settle:.1f}s") + print() + + # Set up testbench + self.bench.supply.set_current(current_limit) + self.bench.supply.set_voltage(voltage) + self.bench.supply.output_on() + self.bench.load.set_mode(load_mode) + self.bench._apply_load_value(load_mode, load_value) + self.bench.load.load_on() + + # Initial settle + print(" Settling...") + time.sleep(settle * 2) + + results: list[TunePoint] = [] + val = start + n = 0 + + try: + while True: + if step > 0 and val > stop + step / 2: + break + if step < 0 and val < stop + step / 2: + break + + # Write parameter + ack = self.link.write_param(param_name, val) + if not ack: + print(f" WARNING: No ACK for {param_name}={val}") + + time.sleep(settle) + + # Measure + point = self._measure(param_name, val, voltage, load_value, load_mode) + results.append(point) + n += 1 + + print( + f" [{n:>3d}/{n_steps}] {param_name}={val:>6.1f} " + f"HIOKI: Pin={point.meter_pin:7.1f}W Pout={point.meter_pout:7.1f}W " + f"EFF={point.meter_eff:5.2f}% " + f"STM32: EFF={point.stm_eff:5.1f}% T={point.stm_etemp:.0f}°C" + ) + + val += step + + finally: + self.bench.load.load_off() + self.bench.supply.set_voltage(IDLE_VOLTAGE) + print(f"\n Load OFF. Supply at {IDLE_VOLTAGE:.0f}V.") + + return results + + # ── Deadtime optimization ──────────────────────────────────────── + + def tune_deadtime( + self, + dt_start: int = 14, + dt_stop: int = 50, + dt_step: int = 1, + voltage: float = 60.0, + current_limit: float = 20.0, + load_mode: str = "CP", + load_values: list[float] | None = None, + settle_time: float | None = None, + ) -> dict[str, list[TunePoint]]: + """Optimize deadtime for each current bracket. + + For each deadtime bracket, sets a load that puts the converter + in that current range, then sweeps deadtime values to find the + optimum. + + Args: + load_values: Load setpoints to test (one per DT bracket). + If None, auto-selects based on bracket midpoints. + """ + settle = settle_time or self.settle_time + + if load_values is None: + # Auto-select load values targeting the middle of each bracket + # Using CP mode: power ≈ voltage × current + load_values = [] + for _, _, i_lo, i_hi in DT_BRACKETS: + mid_i_A = (i_lo + i_hi) / 2 / 1000.0 # mA → A + target_power = voltage * mid_i_A * 0.4 # rough vout/vin ratio + load_values.append(max(10.0, target_power)) + + print("=" * 80) + print("DEADTIME OPTIMIZATION") + print(f" DT range: {dt_start} → {dt_stop} (step {dt_step})") + print(f" V={voltage:.0f}V, I_limit={current_limit:.0f}A, mode={load_mode}") + print("=" * 80) + + all_results: dict[str, list[TunePoint]] = {} + + for i, (param_id, param_name, i_lo, i_hi) in enumerate(DT_BRACKETS): + load_val = load_values[i] if i < len(load_values) else load_values[-1] + unit = "A" if load_mode == "CC" else "W" + + print(f"\n── Bracket: {param_name} ({i_lo/1000:.0f}-{i_hi/1000:.0f}A) " + f"@ {load_mode}={load_val:.0f}{unit} ──") + + results = self.sweep_param( + param_name=param_name, + start=dt_start, + stop=dt_stop, + step=dt_step, + voltage=voltage, + current_limit=current_limit, + load_mode=load_mode, + load_value=load_val, + settle_time=settle, + ) + all_results[param_name] = results + + # Find and report best + if results: + valid = [p for p in results if 0 < p.meter_eff < 110] + if valid: + best = max(valid, key=lambda p: p.meter_eff) + print(f" ★ Best: {param_name}={best.param_value:.0f} → " + f"EFF={best.meter_eff:.2f}%") + + # Summary + print("\n" + "=" * 80) + print("DEADTIME OPTIMIZATION SUMMARY") + print(f"{'Bracket':<15} {'Best DT':>8} {'Efficiency':>12} {'Temp':>8}") + print("-" * 45) + for param_name, results in all_results.items(): + valid = [p for p in results if 0 < p.meter_eff < 110] + if valid: + best = max(valid, key=lambda p: p.meter_eff) + print(f"{param_name:<15} {best.param_value:>8.0f} " + f"{best.meter_eff:>11.2f}% {best.stm_etemp:>7.0f}°C") + else: + print(f"{param_name:<15} {'N/A':>8} {'N/A':>12} {'N/A':>8}") + print("=" * 80) + + return all_results + + def apply_best_deadtimes(self, results: dict[str, list[TunePoint]]): + """Apply the best deadtime from each bracket to the STM32.""" + print("\nApplying optimal deadtimes:") + for param_name, points in results.items(): + valid = [p for p in points if 0 < p.meter_eff < 110] + if valid: + best = max(valid, key=lambda p: p.meter_eff) + val = int(best.param_value) + ack = self.link.write_param(param_name, val) + status = "OK" if ack else "NO ACK" + print(f" {param_name} = {val} ({status})") + + # ── Multi-point sweep ──────────────────────────────────────────── + + def sweep_param_multi( + self, + param_name: str, + start: float, + stop: float, + step: float, + voltages: list[float], + current_limit: float, + load_mode: str = "CP", + load_values: list[float] | None = None, + settle_time: float | None = None, + ) -> list[TunePoint]: + """Sweep a parameter across multiple voltage/load combinations. + + Produces a comprehensive dataset showing how the parameter + affects efficiency across the full operating range. + """ + if load_values is None: + load_values = [200.0] # default: 200W + + all_results: list[TunePoint] = [] + + for v in voltages: + for lv in load_values: + unit = "A" if load_mode == "CC" else "W" + print(f"\n── V={v:.0f}V, {load_mode}={lv:.0f}{unit} ──") + results = self.sweep_param( + param_name=param_name, + start=start, stop=stop, step=step, + voltage=v, current_limit=current_limit, + load_mode=load_mode, load_value=lv, + settle_time=settle_time, + ) + all_results.extend(results) + + return all_results + + # ── Output ─────────────────────────────────────────────────────── + + @staticmethod + def print_results(results: list[TunePoint]): + """Print a summary table of tuning results.""" + if not results: + print("No results.") + return + + valid = [p for p in results if 0 < p.meter_eff < 110] + if valid: + best = max(valid, key=lambda p: p.meter_eff) + print(f"\nBest: {best.param_name}={best.param_value:.1f} → " + f"EFF={best.meter_eff:.2f}% " + f"(Pin={best.meter_pin:.1f}W Pout={best.meter_pout:.1f}W)") + + @staticmethod + def write_csv(results: list[TunePoint], path: str): + """Write tuning results to CSV.""" + if not results: + return + + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow([ + "param_name", "param_value", + "voltage_set", "load_setpoint", "load_mode", + "meter_pin", "meter_pout", "meter_eff", + "stm_vin", "stm_vout", "stm_iin", "stm_iout", + "stm_eff", "stm_vfly", "stm_etemp", + ]) + for p in results: + w.writerow([ + p.param_name, f"{p.param_value:.4f}", + f"{p.voltage_set:.4f}", f"{p.load_setpoint:.4f}", p.load_mode, + f"{p.meter_pin:.4f}", f"{p.meter_pout:.4f}", f"{p.meter_eff:.4f}", + f"{p.stm_vin:.4f}", f"{p.stm_vout:.4f}", + f"{p.stm_iin:.4f}", f"{p.stm_iout:.4f}", + f"{p.stm_eff:.4f}", f"{p.stm_vfly:.4f}", f"{p.stm_etemp:.4f}", + ]) + print(f"Results saved to {path}") + + @staticmethod + def plot_sweep(results: list[TunePoint], show: bool = True): + """Plot parameter sweep results.""" + import numpy as np + import matplotlib.pyplot as plt + + if not results: + return + + param_name = results[0].param_name + vals = np.array([p.param_value for p in results]) + eff_hioki = np.array([p.meter_eff for p in results]) + eff_stm = np.array([p.stm_eff for p in results]) + temp = np.array([p.stm_etemp for p in results]) + + # Filter valid + valid = (eff_hioki > 0) & (eff_hioki < 110) + + # Group by operating point + ops = sorted(set((p.voltage_set, p.load_setpoint) for p in results)) + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True) + + cmap = plt.cm.viridis + for i, (v, l) in enumerate(ops): + color = cmap(i / max(len(ops) - 1, 1)) + mask = np.array([ + (p.voltage_set == v and p.load_setpoint == l and + 0 < p.meter_eff < 110) + for p in results + ]) + if not np.any(mask): + continue + x = vals[mask] + order = np.argsort(x) + unit = results[0].load_mode + ax1.plot(x[order], eff_hioki[mask][order], "o-", color=color, + markersize=4, label=f"{v:.0f}V/{l:.0f}{unit}") + ax2.plot(x[order], temp[mask][order], "o-", color=color, + markersize=4, label=f"{v:.0f}V/{l:.0f}{unit}") + + # Mark best + valid_pts = [p for p in results if 0 < p.meter_eff < 110] + if valid_pts: + best = max(valid_pts, key=lambda p: p.meter_eff) + ax1.axvline(best.param_value, color="red", linestyle="--", alpha=0.5) + ax1.plot(best.param_value, best.meter_eff, "*", color="red", + markersize=15, zorder=10, + label=f"Best: {best.param_value:.0f} → {best.meter_eff:.2f}%") + + ax1.set_ylabel("Efficiency (%)", fontsize=12) + ax1.set_title(f"Parameter Sweep: {param_name}", fontsize=14) + ax1.legend(fontsize=8) + ax1.grid(True, alpha=0.3) + + ax2.set_xlabel(param_name, fontsize=12) + ax2.set_ylabel("Temperature (°C)", fontsize=12) + ax2.legend(fontsize=8) + ax2.grid(True, alpha=0.3) + + fig.tight_layout() + if show: + plt.show() + return fig