Files
mppt-testbench/testbench/bench.py
grabowski a0795302a9 Add CP mode sanity check: validate current limit vs power at min voltage
CP=500W at V=50V needs 10A on the load side — if the supply I_limit
is only 20A and the MPPT has conversion losses, this can cause the
supply to current-limit and timeout. The check now catches this upfront:

  CP check: 500W / 50V = 10.0A (limit 20.0A) OK

Also raises ValueError if the worst-case current exceeds the limit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 16:06:48 +07:00

799 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MPPT Testbench orchestrator -- coordinates all three instruments."""
from __future__ import annotations
import sys
import time
from dataclasses import dataclass, field
from it6500.driver import IT6500
from prodigit3366g.driver import Prodigit3366G
from hioki3193.driver import Hioki3193
# Values above this are HIOKI error codes (blank, over-range, scaling error)
ERROR_THRESHOLD = 1e90
# Default voltage to return to after a sweep
IDLE_VOLTAGE = 75.0
@dataclass
class SweepPoint:
"""A single point in an IV / efficiency sweep."""
# Supply setpoints
voltage_set: float
current_limit: float
# Load setpoint (for current sweeps)
load_setpoint: float = 0.0
# Supply measurements
supply_voltage: float = 0.0
supply_current: float = 0.0
supply_power: float = 0.0
# Load measurements
load_voltage: float = 0.0
load_current: float = 0.0
load_power: float = 0.0
# Power analyzer measurements
input_power: float = 0.0 # P5 (solar side)
output_power: float = 0.0 # P6 (load side)
efficiency: float = 0.0 # EFF1 (%)
timestamp: float = field(default_factory=time.time)
class MPPTTestbench:
"""Orchestrates IT6500D + Prodigit 3366G + HIOKI 3193-10 for MPPT testing.
Typical wiring:
IT6500D ──(+/-)──> MPPT Tracker Input ──(sense)──> HIOKI Ch5
MPPT Tracker Output ──(sense)──> HIOKI Ch6 ──(+/-)──> Prodigit 3366G
The HIOKI measures both sides and computes efficiency.
"""
METER_ITEMS = ("U5", "I5", "P5", "U6", "I6", "P6", "EFF1")
def __init__(
self,
supply: IT6500,
load: Prodigit3366G,
meter: Hioki3193,
) -> None:
self.supply = supply
self.load = load
self.meter = meter
def close(self) -> None:
"""Close all instrument connections."""
for inst in (self.supply, self.load, self.meter):
try:
inst.close()
except Exception:
pass
def __enter__(self) -> MPPTTestbench:
return self
def __exit__(self, *exc) -> None:
self.close()
# ── Instrument discovery helpers ──────────────────────────────────
@staticmethod
def find_supply(address: str | None) -> str:
"""Find the IT6500D VISA address."""
if address:
return address
import pyvisa
rm = pyvisa.ResourceManager()
resources = rm.list_resources()
rm.close()
itech = [r for r in resources if "2EC7" in r.upper() or "6522" in r.upper()]
if itech:
return itech[0]
usb = [r for r in resources if "USB" in r]
if usb:
return usb[0]
print("ERROR: No IT6500D supply found. Available:", list(resources))
sys.exit(1)
@staticmethod
def find_meter(address: str | None) -> str:
"""Find the HIOKI 3193-10 VISA address."""
if address:
return address
import pyvisa
rm = pyvisa.ResourceManager()
resources = rm.list_resources()
rm.close()
hioki = [r for r in resources if "3193" in r or "03EB" in r or "GPIB" in r]
if hioki:
return hioki[0]
print("ERROR: No HIOKI 3193-10 found. Available:", list(resources))
sys.exit(1)
# ── Setup ─────────────────────────────────────────────────────────
def setup_all(self) -> None:
"""Configure all instruments for MPPT testing."""
# Supply: remote mode
self.supply.remote()
# Load: remote mode, CC mode default
self.load.remote()
# Meter: 1P2W, DC coupling, auto-range, efficiency P6/P5
self.meter.set_wiring_mode("1P2W")
self.meter.set_coupling(5, "DC")
self.meter.set_coupling(6, "DC")
self.meter.set_voltage_auto(5, True)
self.meter.set_current_auto(5, True)
self.meter.set_voltage_auto(6, True)
self.meter.set_current_auto(6, True)
self.meter.set_response_speed("SLOW")
self.meter.set_efficiency(1, "P6", "P5")
# Display: 16-item SELECT view
# Left side (1-8): U5, I5, P5, EFF1, OFF, OFF, OFF, OFF
# Right side (9-16): U6, I6, P6, OFF, OFF, OFF, OFF, OFF
display_items = "U5,I5,P5,EFF1,OFF,OFF,OFF,OFF,U6,I6,P6,OFF,OFF,OFF,OFF,OFF"
self.meter.write(f":DISPlay:SELect16 {display_items}")
# ── Safe shutdown ─────────────────────────────────────────────────
def safe_off(self) -> None:
"""Turn off all outputs in safe order: load first, then supply."""
try:
self.load.load_off()
except Exception:
pass
try:
self.supply.output_off()
except Exception:
pass
# ── HIOKI auto-range wait ─────────────────────────────────────────
def _wait_meter_ready(
self,
max_retries: int = 30,
retry_delay: float = 2.0,
) -> dict[str, float]:
"""Read the meter, retrying until all values are valid (not over-range).
The HIOKI returns special error values (+9999.9E+99, +6666.6E+99)
while auto-ranging. This method keeps polling until all requested
items return real data.
Returns:
The MeasurementResult.values dict once all values are valid.
Raises:
TimeoutError: If still auto-ranging after max_retries.
"""
for attempt in range(max_retries):
result = self.meter.measure(*self.METER_ITEMS)
values = result.values
# Check if any value is an error code
if all(abs(v) < ERROR_THRESHOLD for v in values.values()):
return values
bad = [k for k, v in values.items() if abs(v) >= ERROR_THRESHOLD]
print(f" (auto-ranging: {', '.join(bad)} -- retrying {attempt + 1}/{max_retries})")
# Keep supply alive during the wait so it doesn't
# time out and show a front-panel error / beep
try:
self.supply.measure_voltage()
except Exception:
pass
time.sleep(retry_delay)
raise TimeoutError(
f"HIOKI still auto-ranging after {max_retries} retries"
)
# ── Measurement ───────────────────────────────────────────────────
def measure_all(self) -> dict[str, float]:
"""Take a synchronized reading from all three instruments.
Returns dict with keys:
supply_V, supply_I, supply_P,
load_V, load_I, load_P,
meter_U5, meter_I5, meter_P5,
meter_U6, meter_I6, meter_P6,
meter_EFF1
"""
supply = self.supply.measure_all()
load = self.load.measure_all()
meter = self.meter.measure(*self.METER_ITEMS)
return {
"supply_V": supply.voltage,
"supply_I": supply.current,
"supply_P": supply.power,
"load_V": load.voltage,
"load_I": load.current,
"load_P": load.power,
"meter_U5": meter.values.get("U5", 0.0),
"meter_I5": meter.values.get("I5", 0.0),
"meter_P5": meter.values.get("P5", 0.0),
"meter_U6": meter.values.get("U6", 0.0),
"meter_I6": meter.values.get("I6", 0.0),
"meter_P6": meter.values.get("P6", 0.0),
"meter_EFF1": meter.values.get("EFF1", 0.0),
}
# ── Sweep helper: record one point ────────────────────────────────
@staticmethod
def _query_safe(fn, retries: int = 3, delay: float = 0.5):
"""Call a measurement function with retries on VISA timeout."""
for attempt in range(retries):
try:
return fn()
except Exception:
if attempt == retries - 1:
raise
time.sleep(delay)
def _record_point(
self,
voltage_set: float,
current_limit: float,
load_setpoint: float = 0.0,
) -> SweepPoint:
"""Wait for meter auto-range, then record from all instruments.
Waits for the meter first (which keeps the supply alive via
keepalive pings), then reads supply and load with retry logic.
"""
meter_vals = self._wait_meter_ready()
supply = self._query_safe(self.supply.measure_all)
load = self._query_safe(self.load.measure_all)
return SweepPoint(
voltage_set=voltage_set,
current_limit=current_limit,
load_setpoint=load_setpoint,
supply_voltage=supply.voltage,
supply_current=supply.current,
supply_power=supply.power,
load_voltage=load.voltage,
load_current=load.current,
load_power=load.power,
input_power=meter_vals.get("P5", 0.0),
output_power=meter_vals.get("P6", 0.0),
efficiency=meter_vals.get("EFF1", 0.0),
)
@staticmethod
def _print_point(point: SweepPoint, label: str, value: float, unit: str) -> None:
"""Print a single sweep point to the console."""
print(
f" {label}={value:7.2f}{unit} "
f"P_in={point.input_power:8.2f}W "
f"P_out={point.output_power:8.2f}W "
f"EFF={point.efficiency:6.2f}%"
)
# ── IV Curve / Voltage Sweep ──────────────────────────────────────
def sweep_voltage(
self,
v_start: float,
v_stop: float,
v_step: float,
current_limit: float,
settle_time: float = 1.0,
load_setpoint: float = 0.0,
) -> list[SweepPoint]:
"""Sweep supply voltage and record measurements at each point.
After the sweep completes, the supply returns to IDLE_VOLTAGE (75V)
and stays ON.
Args:
v_start: Starting voltage (V).
v_stop: Final voltage (V).
v_step: Voltage step size (V). Sign is auto-corrected.
current_limit: Supply current limit (A) for the entire sweep.
settle_time: Seconds to wait after each voltage step before
polling the meter.
load_setpoint: The load setpoint value (for CSV recording).
"""
if v_step == 0:
raise ValueError("v_step cannot be zero")
max_v = max(abs(v_start), abs(v_stop))
self.check_supply_capability(max_v, current_limit)
# Auto-correct step direction
if v_start < v_stop and v_step < 0:
v_step = -v_step
elif v_start > v_stop and v_step > 0:
v_step = -v_step
self.supply.set_current(current_limit)
self.supply.output_on()
results: list[SweepPoint] = []
v = v_start
try:
while True:
if v_step > 0 and v > v_stop + v_step / 2:
break
if v_step < 0 and v < v_stop + v_step / 2:
break
self.supply.set_voltage(v)
time.sleep(settle_time)
point = self._record_point(v, current_limit, load_setpoint)
results.append(point)
self._print_point(point, "V_set", v, "V")
v += v_step
finally:
# Return to idle voltage and keep supply ON
self.supply.set_voltage(IDLE_VOLTAGE)
print(f"\n Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)")
return results
# ── Load Current Sweep ────────────────────────────────────────────
def sweep_load_current(
self,
voltage: float,
current_limit: float,
i_start: float,
i_stop: float,
i_step: float,
settle_time: float = 1.0,
) -> list[SweepPoint]:
"""Sweep load current (CC mode) at a fixed supply voltage.
After the sweep completes, the supply returns to IDLE_VOLTAGE (75V)
and stays ON. The load is turned OFF.
Args:
voltage: Fixed supply voltage (V).
current_limit: Supply current limit (A).
i_start: Starting load current (A).
i_stop: Final load current (A).
i_step: Current step size (A). Sign is auto-corrected.
settle_time: Seconds to wait after each current step before
polling the meter.
"""
if i_step == 0:
raise ValueError("i_step cannot be zero")
self.check_supply_capability(voltage, current_limit)
# Auto-correct step direction
if i_start < i_stop and i_step < 0:
i_step = -i_step
elif i_start > i_stop and i_step > 0:
i_step = -i_step
self.supply.set_current(current_limit)
self.supply.set_voltage(voltage)
self.supply.output_on()
self.load.set_mode("CC")
self.load.set_cc_current(i_start)
self.load.load_on()
results: list[SweepPoint] = []
i = i_start
try:
while True:
if i_step > 0 and i > i_stop + i_step / 2:
break
if i_step < 0 and i < i_stop + i_step / 2:
break
self.load.set_cc_current(i)
time.sleep(settle_time)
point = self._record_point(voltage, current_limit, load_setpoint=i)
results.append(point)
self._print_point(point, "I_load", i, "A")
i += i_step
finally:
self.load.load_off()
# Return to idle voltage and keep supply ON
self.supply.set_voltage(IDLE_VOLTAGE)
print(f"\n Load OFF. Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)")
return results
# ── Load value helper ─────────────────────────────────────────────
def _apply_load_value(self, mode: str, value: float) -> None:
"""Set the load setpoint for the given mode."""
if mode == "CC":
self.load.set_cc_current(value)
elif mode == "CR":
self.load.set_cr_resistance(value)
elif mode == "CV":
self.load.set_cv_voltage(value)
elif mode == "CP":
self.load.set_cp_power(value)
# ── Shade Profile ────────────────────────────────────────────────
@staticmethod
def load_shade_profile(path: str) -> list[dict]:
"""Load a shade profile CSV.
Required columns: time, voltage, current_limit
Optional columns: load_mode, load_value
Returns:
Sorted list of step dicts.
"""
import csv as _csv
steps = []
with open(path, newline="") as f:
reader = _csv.DictReader(f)
for row in reader:
step = {
"time": float(row["time"]),
"voltage": float(row["voltage"]),
"current_limit": float(row["current_limit"]),
}
if "load_mode" in row and row["load_mode"].strip():
step["load_mode"] = row["load_mode"].strip().upper()
if "load_value" in row and row["load_value"].strip():
step["load_value"] = float(row["load_value"])
steps.append(step)
steps.sort(key=lambda s: s["time"])
return steps
def run_shade_profile(
self,
steps: list[dict],
settle_time: float = 2.0,
) -> list[SweepPoint]:
"""Run a shade / irradiance profile sequence.
Steps through the profile, applying voltage/current/load settings
at the scheduled times, recording a measurement at each step.
Args:
steps: Profile steps from :meth:`load_shade_profile`.
settle_time: Seconds to wait after applying each step before
recording a measurement.
Returns:
List of SweepPoint measurements, one per step.
"""
if not steps:
return []
# Apply first step and turn outputs on
first = steps[0]
self.supply.set_current(first["current_limit"])
self.supply.set_voltage(first["voltage"])
self.supply.output_on()
if "load_mode" in first:
self.load.set_mode(first["load_mode"])
if "load_value" in first:
self._apply_load_value(
first.get("load_mode", "CC"), first["load_value"]
)
self.load.load_on()
results: list[SweepPoint] = []
t0 = time.time()
try:
for i, step in enumerate(steps):
# Wait until scheduled time, keepalive supply while waiting
target = t0 + step["time"]
while True:
remaining = target - time.time()
if remaining <= 0:
break
if remaining > 2.0:
try:
self.supply.measure_voltage()
except Exception:
pass
time.sleep(min(2.0, remaining))
else:
time.sleep(remaining)
# Apply settings
self.supply.set_current(step["current_limit"])
self.supply.set_voltage(step["voltage"])
if "load_mode" in step:
self.load.set_mode(step["load_mode"])
if "load_value" in step:
self._apply_load_value(
step.get("load_mode", "CC"), step["load_value"]
)
time.sleep(settle_time)
# Record
load_val = step.get("load_value", 0.0) or 0.0
point = self._record_point(
step["voltage"], step["current_limit"], load_val
)
results.append(point)
elapsed = time.time() - t0
print(
f" [{i + 1}/{len(steps)}] t={elapsed:6.1f}s "
f"V={step['voltage']:.1f}V I_lim={step['current_limit']:.1f}A "
f"P_in={point.input_power:8.2f}W "
f"P_out={point.output_power:8.2f}W "
f"EFF={point.efficiency:6.2f}%"
)
finally:
self.load.load_off()
self.supply.set_voltage(IDLE_VOLTAGE)
print(
f"\n Profile complete. Load OFF. "
f"Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)"
)
return results
# ── Sanity checks ────────────────────────────────────────────────
def check_supply_capability(
self,
max_voltage: float,
current_limit: float,
*,
min_voltage: float | None = None,
load_mode: str = "CC",
max_load_setpoint: float = 0.0,
) -> None:
"""Check that the supply can deliver the requested V and I.
Queries the supply's voltage range (max V) and verifies the
requested parameters are within bounds. For CP mode, also
checks that the supply current limit can deliver the requested
power at the minimum voltage.
Raises ValueError if the configuration is invalid.
"""
supply_max_v = self.supply.get_voltage_range()
if max_voltage > supply_max_v:
raise ValueError(
f"Requested voltage {max_voltage:.1f}V exceeds supply "
f"range {supply_max_v:.1f}V"
)
max_power = max_voltage * current_limit
print(
f" Supply check: V_max={max_voltage:.1f}V, "
f"I_limit={current_limit:.1f}A, "
f"P_max={max_power:.0f}W "
f"(range {supply_max_v:.0f}V)"
)
# CP mode: at min voltage the load draws max current
if load_mode == "CP" and max_load_setpoint > 0 and min_voltage:
worst_case_i = max_load_setpoint / min_voltage
if worst_case_i > current_limit:
raise ValueError(
f"CP mode: {max_load_setpoint:.0f}W at "
f"{min_voltage:.0f}V needs {worst_case_i:.1f}A "
f"but supply I_limit is {current_limit:.1f}A"
)
print(
f" CP check: {max_load_setpoint:.0f}W / "
f"{min_voltage:.0f}V = {worst_case_i:.1f}A "
f"(limit {current_limit:.1f}A) OK"
)
# ── 2D Voltage × Current Sweep ─────────────────────────────────────
def sweep_vi(
self,
v_start: float,
v_stop: float,
v_step: float,
l_start: float,
l_stop: float,
l_step: float,
current_limit: float,
settle_time: float = 2.0,
load_mode: str = "CC",
) -> list[SweepPoint]:
"""2D sweep: voltage (outer) × load setpoint (inner).
At each supply voltage, sweeps the load through the full
range, recording efficiency at every combination. Produces
a complete efficiency map of the DUT.
After the sweep the load turns OFF and the supply returns to
IDLE_VOLTAGE (75 V, output stays ON).
Args:
v_start: Starting supply voltage (V).
v_stop: Final supply voltage (V).
v_step: Voltage step size (V). Sign is auto-corrected.
l_start: Starting load setpoint (A for CC, W for CP).
l_stop: Final load setpoint.
l_step: Load step size. Sign is auto-corrected.
current_limit: Supply current limit (A) for the entire sweep.
settle_time: Seconds to wait after each setpoint change.
load_mode: Load mode — "CC" (constant current) or "CP"
(constant power).
"""
if v_step == 0:
raise ValueError("v_step cannot be zero")
if l_step == 0:
raise ValueError("l_step cannot be zero")
load_mode = load_mode.upper()
if load_mode not in ("CC", "CP"):
raise ValueError(f"load_mode must be CC or CP, got {load_mode!r}")
# Sanity check
max_v = max(abs(v_start), abs(v_stop))
min_v = min(abs(v_start), abs(v_stop))
max_l = max(abs(l_start), abs(l_stop))
self.check_supply_capability(
max_v, current_limit,
min_voltage=min_v,
load_mode=load_mode,
max_load_setpoint=max_l,
)
# Auto-correct step directions
if v_start < v_stop and v_step < 0:
v_step = -v_step
elif v_start > v_stop and v_step > 0:
v_step = -v_step
if l_start < l_stop and l_step < 0:
l_step = -l_step
elif l_start > l_stop and l_step > 0:
l_step = -l_step
# Count steps for progress display
v_count = int(abs(v_stop - v_start) / abs(v_step)) + 1
l_count = int(abs(l_stop - l_start) / abs(l_step)) + 1
total = v_count * l_count
unit = "A" if load_mode == "CC" else "W"
print(
f" Grid: {v_count} voltage × {l_count} {load_mode} "
f"= {total} points"
)
self.supply.set_current(current_limit)
self.supply.output_on()
self.load.set_mode(load_mode)
self._apply_load_value(load_mode, l_start)
self.load.load_on()
results: list[SweepPoint] = []
n = 0
v = v_start
try:
while True:
if v_step > 0 and v > v_stop + v_step / 2:
break
if v_step < 0 and v < v_stop + v_step / 2:
break
self.supply.set_voltage(v)
ll = l_start
while True:
if l_step > 0 and ll > l_stop + l_step / 2:
break
if l_step < 0 and ll < l_stop + l_step / 2:
break
self._apply_load_value(load_mode, ll)
time.sleep(settle_time)
point = self._record_point(v, current_limit, load_setpoint=ll)
results.append(point)
n += 1
print(
f" [{n:>4d}/{total}] "
f"V={v:6.1f}V {load_mode}={ll:6.2f}{unit} "
f"P_in={point.input_power:8.2f}W "
f"P_out={point.output_power:8.2f}W "
f"EFF={point.efficiency:6.2f}%"
)
ll += l_step
v += v_step
finally:
self.load.load_off()
self.supply.set_voltage(IDLE_VOLTAGE)
print(
f"\n Load OFF. Supply returning to {IDLE_VOLTAGE:.0f}V "
f"(output stays ON)"
)
return results
# ── Efficiency at fixed operating point ───────────────────────────
def measure_efficiency(
self,
voltage: float,
current_limit: float,
settle_time: float = 2.0,
samples: int = 5,
sample_interval: float = 1.0,
) -> dict[str, float]:
"""Measure average efficiency at a fixed operating point.
Args:
voltage: Supply voltage (V).
current_limit: Supply current limit (A).
settle_time: Seconds to wait before first measurement.
samples: Number of readings to average.
sample_interval: Seconds between readings.
Returns:
Dict with avg_input_power, avg_output_power, avg_efficiency,
plus individual supply/load readings.
"""
self.supply.set_current(current_limit)
self.supply.set_voltage(voltage)
self.supply.output_on()
time.sleep(settle_time)
p_in_sum = 0.0
p_out_sum = 0.0
eff_sum = 0.0
try:
for i in range(samples):
meter_vals = self._wait_meter_ready()
p_in = meter_vals.get("P5", 0.0)
p_out = meter_vals.get("P6", 0.0)
eff = meter_vals.get("EFF1", 0.0)
p_in_sum += p_in
p_out_sum += p_out
eff_sum += eff
print(
f" [{i + 1}/{samples}] "
f"P_in={p_in:8.2f}W P_out={p_out:8.2f}W EFF={eff:6.2f}%"
)
if i < samples - 1:
time.sleep(sample_interval)
finally:
self.supply.output_off()
return {
"avg_input_power": p_in_sum / samples,
"avg_output_power": p_out_sum / samples,
"avg_efficiency": eff_sum / samples,
"voltage_set": voltage,
"current_limit": current_limit,
"samples": samples,
}