Initial MPPT testbench: unified CLI for IT6500D + Prodigit 3366G + HIOKI 3193-10

Combines three instrument drivers (as git submodules) into a single
testbench for MPPT tracker efficiency testing. Features:
- Voltage sweep and load current sweep with CSV export
- Auto-range aware meter polling (waits for HIOKI to settle)
- Supply keepalive during long meter waits to prevent USB-TMC timeouts
- Live monitoring with real-time 4-panel matplotlib graphs
- Safe shutdown (load first, then supply)
- Post-sweep returns to 75V idle with supply ON

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 13:19:18 +07:00
commit e55caa59b1
10 changed files with 1672 additions and 0 deletions
+11
View File
@@ -0,0 +1,11 @@
"""MPPT Tracker Testbench.
Combines three instruments for solar MPPT converter testing:
- IT6500D: DC power supply (solar panel simulator)
- Prodigit 3366G: DC electronic load
- HIOKI 3193-10: Power analyzer (efficiency measurement)
"""
from testbench.bench import MPPTTestbench
__all__ = ["MPPTTestbench"]
+475
View File
@@ -0,0 +1,475 @@
"""MPPT Testbench orchestrator -- coordinates all three instruments."""
from __future__ import annotations
import sys
import time
from dataclasses import dataclass, field
from it6500.driver import IT6500
from prodigit3366g.driver import Prodigit3366G
from hioki3193.driver import Hioki3193
# Values above this are HIOKI error codes (blank, over-range, scaling error)
ERROR_THRESHOLD = 1e90
# Default voltage to return to after a sweep
IDLE_VOLTAGE = 75.0
@dataclass
class SweepPoint:
"""A single point in an IV / efficiency sweep."""
# Supply setpoints
voltage_set: float
current_limit: float
# Load setpoint (for current sweeps)
load_setpoint: float = 0.0
# Supply measurements
supply_voltage: float = 0.0
supply_current: float = 0.0
supply_power: float = 0.0
# Load measurements
load_voltage: float = 0.0
load_current: float = 0.0
load_power: float = 0.0
# Power analyzer measurements
input_power: float = 0.0 # P5 (solar side)
output_power: float = 0.0 # P6 (load side)
efficiency: float = 0.0 # EFF1 (%)
timestamp: float = field(default_factory=time.time)
class MPPTTestbench:
"""Orchestrates IT6500D + Prodigit 3366G + HIOKI 3193-10 for MPPT testing.
Typical wiring:
IT6500D ──(+/-)──> MPPT Tracker Input ──(sense)──> HIOKI Ch5
MPPT Tracker Output ──(sense)──> HIOKI Ch6 ──(+/-)──> Prodigit 3366G
The HIOKI measures both sides and computes efficiency.
"""
METER_ITEMS = ("U5", "I5", "P5", "U6", "I6", "P6", "EFF1")
def __init__(
self,
supply: IT6500,
load: Prodigit3366G,
meter: Hioki3193,
) -> None:
self.supply = supply
self.load = load
self.meter = meter
def close(self) -> None:
"""Close all instrument connections."""
for inst in (self.supply, self.load, self.meter):
try:
inst.close()
except Exception:
pass
def __enter__(self) -> MPPTTestbench:
return self
def __exit__(self, *exc) -> None:
self.close()
# ── Instrument discovery helpers ──────────────────────────────────
@staticmethod
def find_supply(address: str | None) -> str:
"""Find the IT6500D VISA address."""
if address:
return address
import pyvisa
rm = pyvisa.ResourceManager()
resources = rm.list_resources()
rm.close()
itech = [r for r in resources if "2EC7" in r.upper() or "6522" in r.upper()]
if itech:
return itech[0]
usb = [r for r in resources if "USB" in r]
if usb:
return usb[0]
print("ERROR: No IT6500D supply found. Available:", list(resources))
sys.exit(1)
@staticmethod
def find_meter(address: str | None) -> str:
"""Find the HIOKI 3193-10 VISA address."""
if address:
return address
import pyvisa
rm = pyvisa.ResourceManager()
resources = rm.list_resources()
rm.close()
hioki = [r for r in resources if "3193" in r or "03EB" in r or "GPIB" in r]
if hioki:
return hioki[0]
print("ERROR: No HIOKI 3193-10 found. Available:", list(resources))
sys.exit(1)
# ── Setup ─────────────────────────────────────────────────────────
def setup_all(self) -> None:
"""Configure all instruments for MPPT testing."""
# Supply: remote mode
self.supply.remote()
# Load: remote mode, CC mode default
self.load.remote()
# Meter: 1P2W, DC coupling, auto-range, efficiency P6/P5
self.meter.set_wiring_mode("1P2W")
self.meter.set_coupling(5, "DC")
self.meter.set_coupling(6, "DC")
self.meter.set_voltage_auto(5, True)
self.meter.set_current_auto(5, True)
self.meter.set_voltage_auto(6, True)
self.meter.set_current_auto(6, True)
self.meter.set_response_speed("SLOW")
self.meter.set_efficiency(1, "P6", "P5")
# Display: 16-item SELECT view
# Left side (1-8): U5, I5, P5, EFF1, OFF, OFF, OFF, OFF
# Right side (9-16): U6, I6, P6, OFF, OFF, OFF, OFF, OFF
display_items = "U5,I5,P5,EFF1,OFF,OFF,OFF,OFF,U6,I6,P6,OFF,OFF,OFF,OFF,OFF"
self.meter.write(f":DISPlay:SELect16 {display_items}")
# ── Safe shutdown ─────────────────────────────────────────────────
def safe_off(self) -> None:
"""Turn off all outputs in safe order: load first, then supply."""
try:
self.load.load_off()
except Exception:
pass
try:
self.supply.output_off()
except Exception:
pass
# ── HIOKI auto-range wait ─────────────────────────────────────────
def _wait_meter_ready(
self,
max_retries: int = 30,
retry_delay: float = 2.0,
) -> dict[str, float]:
"""Read the meter, retrying until all values are valid (not over-range).
The HIOKI returns special error values (+9999.9E+99, +6666.6E+99)
while auto-ranging. This method keeps polling until all requested
items return real data.
Returns:
The MeasurementResult.values dict once all values are valid.
Raises:
TimeoutError: If still auto-ranging after max_retries.
"""
for attempt in range(max_retries):
result = self.meter.measure(*self.METER_ITEMS)
values = result.values
# Check if any value is an error code
if all(abs(v) < ERROR_THRESHOLD for v in values.values()):
return values
bad = [k for k, v in values.items() if abs(v) >= ERROR_THRESHOLD]
print(f" (auto-ranging: {', '.join(bad)} -- retrying {attempt + 1}/{max_retries})")
# Keep supply alive during the wait so it doesn't
# time out and show a front-panel error / beep
try:
self.supply.measure_voltage()
except Exception:
pass
time.sleep(retry_delay)
raise TimeoutError(
f"HIOKI still auto-ranging after {max_retries} retries"
)
# ── Measurement ───────────────────────────────────────────────────
def measure_all(self) -> dict[str, float]:
"""Take a synchronized reading from all three instruments.
Returns dict with keys:
supply_V, supply_I, supply_P,
load_V, load_I, load_P,
meter_U5, meter_I5, meter_P5,
meter_U6, meter_I6, meter_P6,
meter_EFF1
"""
supply = self.supply.measure_all()
load = self.load.measure_all()
meter = self.meter.measure(*self.METER_ITEMS)
return {
"supply_V": supply.voltage,
"supply_I": supply.current,
"supply_P": supply.power,
"load_V": load.voltage,
"load_I": load.current,
"load_P": load.power,
"meter_U5": meter.values.get("U5", 0.0),
"meter_I5": meter.values.get("I5", 0.0),
"meter_P5": meter.values.get("P5", 0.0),
"meter_U6": meter.values.get("U6", 0.0),
"meter_I6": meter.values.get("I6", 0.0),
"meter_P6": meter.values.get("P6", 0.0),
"meter_EFF1": meter.values.get("EFF1", 0.0),
}
# ── Sweep helper: record one point ────────────────────────────────
@staticmethod
def _query_safe(fn, retries: int = 3, delay: float = 0.5):
"""Call a measurement function with retries on VISA timeout."""
for attempt in range(retries):
try:
return fn()
except Exception:
if attempt == retries - 1:
raise
time.sleep(delay)
def _record_point(
self,
voltage_set: float,
current_limit: float,
load_setpoint: float = 0.0,
) -> SweepPoint:
"""Wait for meter auto-range, then record from all instruments.
Waits for the meter first (which keeps the supply alive via
keepalive pings), then reads supply and load with retry logic.
"""
meter_vals = self._wait_meter_ready()
supply = self._query_safe(self.supply.measure_all)
load = self._query_safe(self.load.measure_all)
return SweepPoint(
voltage_set=voltage_set,
current_limit=current_limit,
load_setpoint=load_setpoint,
supply_voltage=supply.voltage,
supply_current=supply.current,
supply_power=supply.power,
load_voltage=load.voltage,
load_current=load.current,
load_power=load.power,
input_power=meter_vals.get("P5", 0.0),
output_power=meter_vals.get("P6", 0.0),
efficiency=meter_vals.get("EFF1", 0.0),
)
@staticmethod
def _print_point(point: SweepPoint, label: str, value: float, unit: str) -> None:
"""Print a single sweep point to the console."""
print(
f" {label}={value:7.2f}{unit} "
f"P_in={point.input_power:8.2f}W "
f"P_out={point.output_power:8.2f}W "
f"EFF={point.efficiency:6.2f}%"
)
# ── IV Curve / Voltage Sweep ──────────────────────────────────────
def sweep_voltage(
self,
v_start: float,
v_stop: float,
v_step: float,
current_limit: float,
settle_time: float = 1.0,
load_setpoint: float = 0.0,
) -> list[SweepPoint]:
"""Sweep supply voltage and record measurements at each point.
After the sweep completes, the supply returns to IDLE_VOLTAGE (75V)
and stays ON.
Args:
v_start: Starting voltage (V).
v_stop: Final voltage (V).
v_step: Voltage step size (V). Sign is auto-corrected.
current_limit: Supply current limit (A) for the entire sweep.
settle_time: Seconds to wait after each voltage step before
polling the meter.
load_setpoint: The load setpoint value (for CSV recording).
"""
if v_step == 0:
raise ValueError("v_step cannot be zero")
# Auto-correct step direction
if v_start < v_stop and v_step < 0:
v_step = -v_step
elif v_start > v_stop and v_step > 0:
v_step = -v_step
self.supply.set_current(current_limit)
self.supply.output_on()
results: list[SweepPoint] = []
v = v_start
try:
while True:
if v_step > 0 and v > v_stop + v_step / 2:
break
if v_step < 0 and v < v_stop + v_step / 2:
break
self.supply.set_voltage(v)
time.sleep(settle_time)
point = self._record_point(v, current_limit, load_setpoint)
results.append(point)
self._print_point(point, "V_set", v, "V")
v += v_step
finally:
# Return to idle voltage and keep supply ON
self.supply.set_voltage(IDLE_VOLTAGE)
print(f"\n Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)")
return results
# ── Load Current Sweep ────────────────────────────────────────────
def sweep_load_current(
self,
voltage: float,
current_limit: float,
i_start: float,
i_stop: float,
i_step: float,
settle_time: float = 1.0,
) -> list[SweepPoint]:
"""Sweep load current (CC mode) at a fixed supply voltage.
After the sweep completes, the supply returns to IDLE_VOLTAGE (75V)
and stays ON. The load is turned OFF.
Args:
voltage: Fixed supply voltage (V).
current_limit: Supply current limit (A).
i_start: Starting load current (A).
i_stop: Final load current (A).
i_step: Current step size (A). Sign is auto-corrected.
settle_time: Seconds to wait after each current step before
polling the meter.
"""
if i_step == 0:
raise ValueError("i_step cannot be zero")
# Auto-correct step direction
if i_start < i_stop and i_step < 0:
i_step = -i_step
elif i_start > i_stop and i_step > 0:
i_step = -i_step
self.supply.apply(voltage, current_limit)
self.supply.output_on()
self.load.set_mode("CC")
self.load.set_cc_current(i_start)
self.load.load_on()
results: list[SweepPoint] = []
i = i_start
try:
while True:
if i_step > 0 and i > i_stop + i_step / 2:
break
if i_step < 0 and i < i_stop + i_step / 2:
break
self.load.set_cc_current(i)
time.sleep(settle_time)
point = self._record_point(voltage, current_limit, load_setpoint=i)
results.append(point)
self._print_point(point, "I_load", i, "A")
i += i_step
finally:
self.load.load_off()
# Return to idle voltage and keep supply ON
self.supply.set_voltage(IDLE_VOLTAGE)
print(f"\n Load OFF. Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)")
return results
# ── Efficiency at fixed operating point ───────────────────────────
def measure_efficiency(
self,
voltage: float,
current_limit: float,
settle_time: float = 2.0,
samples: int = 5,
sample_interval: float = 1.0,
) -> dict[str, float]:
"""Measure average efficiency at a fixed operating point.
Args:
voltage: Supply voltage (V).
current_limit: Supply current limit (A).
settle_time: Seconds to wait before first measurement.
samples: Number of readings to average.
sample_interval: Seconds between readings.
Returns:
Dict with avg_input_power, avg_output_power, avg_efficiency,
plus individual supply/load readings.
"""
self.supply.apply(voltage, current_limit)
self.supply.output_on()
time.sleep(settle_time)
p_in_sum = 0.0
p_out_sum = 0.0
eff_sum = 0.0
try:
for i in range(samples):
meter_vals = self._wait_meter_ready()
p_in = meter_vals.get("P5", 0.0)
p_out = meter_vals.get("P6", 0.0)
eff = meter_vals.get("EFF1", 0.0)
p_in_sum += p_in
p_out_sum += p_out
eff_sum += eff
print(
f" [{i + 1}/{samples}] "
f"P_in={p_in:8.2f}W P_out={p_out:8.2f}W EFF={eff:6.2f}%"
)
if i < samples - 1:
time.sleep(sample_interval)
finally:
self.supply.output_off()
return {
"avg_input_power": p_in_sum / samples,
"avg_output_power": p_out_sum / samples,
"avg_efficiency": eff_sum / samples,
"voltage_set": voltage,
"current_limit": current_limit,
"samples": samples,
}
+651
View File
@@ -0,0 +1,651 @@
"""CLI for the MPPT Tracker Testbench.
Orchestrates IT6500D (supply) + Prodigit 3366G (load) + HIOKI 3193-10 (meter).
"""
from __future__ import annotations
import argparse
import csv
import sys
import time
from it6500.driver import IT6500
from prodigit3366g.driver import Prodigit3366G
from hioki3193.driver import Hioki3193
from testbench.bench import MPPTTestbench
# ── Instrument connection ─────────────────────────────────────────────
def connect_bench(args: argparse.Namespace) -> MPPTTestbench:
"""Create and return a connected MPPTTestbench from CLI args."""
supply_addr = MPPTTestbench.find_supply(args.supply_address)
meter_addr = MPPTTestbench.find_meter(args.meter_address)
load_port = args.load_port
print(f"Supply: {supply_addr}")
print(f"Load: {load_port}")
print(f"Meter: {meter_addr}")
print()
supply = IT6500(supply_addr, timeout_ms=args.timeout)
load = Prodigit3366G(load_port, baudrate=args.load_baud)
meter = Hioki3193(meter_addr, timeout_ms=args.timeout)
return MPPTTestbench(supply, load, meter)
# ── Commands ──────────────────────────────────────────────────────────
def cmd_identify(bench: MPPTTestbench, _args: argparse.Namespace) -> None:
"""Identify all three instruments."""
print("=== DC Power Supply (IT6500D) ===")
print(f" Identity: {bench.supply.idn()}")
print(f" Output: {'ON' if bench.supply.get_output_state() else 'OFF'}")
print(f" V set: {bench.supply.get_voltage():.4f} V")
print(f" I set: {bench.supply.get_current():.4f} A")
print()
print("=== DC Electronic Load (Prodigit 3366G) ===")
print(f" Model: {bench.load.name()}")
print(f" Load: {'ON' if bench.load.get_load_state() else 'OFF'}")
print(f" Mode: {bench.load.get_mode()}")
print()
print("=== Power Analyzer (HIOKI 3193-10) ===")
print(f" Identity: {bench.meter.idn()}")
print(f" Options: {bench.meter.options()}")
print(f" Wiring: {bench.meter.get_wiring_mode()}")
def cmd_setup(bench: MPPTTestbench, _args: argparse.Namespace) -> None:
"""Configure all instruments for MPPT testing."""
print("Setting up all instruments for MPPT testing...")
bench.setup_all()
print()
print("Supply: remote mode")
print("Load: remote mode")
print("Meter: 1P2W, DC coupling, auto-range, EFF1=P6/P5")
print("Display: SELECT16 — U5,I5,P5,EFF1 (left) | U6,I6,P6 (right)")
print()
print("Ready. Use 'measure' or 'monitor' to start reading data.")
def cmd_measure(bench: MPPTTestbench, _args: argparse.Namespace) -> None:
"""Take a single measurement from all instruments."""
data = bench.measure_all()
print("=== Supply (IT6500D) ===")
print(f" Voltage = {data['supply_V']:>10.4f} V")
print(f" Current = {data['supply_I']:>10.4f} A")
print(f" Power = {data['supply_P']:>10.4f} W")
print()
print("=== Load (Prodigit 3366G) ===")
print(f" Voltage = {data['load_V']:>10.4f} V")
print(f" Current = {data['load_I']:>10.4f} A")
print(f" Power = {data['load_P']:>10.4f} W")
print()
print("=== Meter (HIOKI 3193-10) ===")
print(f" Input: U5={data['meter_U5']:>+12.4E} V "
f"I5={data['meter_I5']:>+12.4E} A "
f"P5={data['meter_P5']:>+12.4E} W")
print(f" Output: U6={data['meter_U6']:>+12.4E} V "
f"I6={data['meter_I6']:>+12.4E} A "
f"P6={data['meter_P6']:>+12.4E} W")
print(f" EFF1 = {data['meter_EFF1']:>+12.4E} %")
def cmd_monitor(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Continuously monitor all instruments."""
interval = args.interval
writer = None
outfile = None
columns = [
"timestamp",
"supply_V", "supply_I", "supply_P",
"load_V", "load_I", "load_P",
"meter_U5", "meter_I5", "meter_P5",
"meter_U6", "meter_I6", "meter_P6",
"meter_EFF1",
]
if args.output:
outfile = open(args.output, "w", newline="")
writer = csv.writer(outfile)
writer.writerow(columns)
print(f"Logging to {args.output}")
print(
f"{'Time':>10s} "
f"{'Sup_V':>8s} {'Sup_I':>8s} {'Sup_P':>8s} "
f"{'Ld_V':>8s} {'Ld_I':>8s} {'Ld_P':>8s} "
f"{'P_in':>10s} {'P_out':>10s} {'EFF%':>8s}"
)
print("-" * 105)
try:
count = 0
while args.count == 0 or count < args.count:
data = bench.measure_all()
ts = time.strftime("%H:%M:%S")
print(
f"{ts:>10s} "
f"{data['supply_V']:8.3f} {data['supply_I']:8.3f} {data['supply_P']:8.2f} "
f"{data['load_V']:8.3f} {data['load_I']:8.3f} {data['load_P']:8.2f} "
f"{data['meter_P5']:>+10.3E} {data['meter_P6']:>+10.3E} {data['meter_EFF1']:8.2f}"
)
if writer:
writer.writerow(
[time.strftime("%Y-%m-%d %H:%M:%S")]
+ [f"{data[c]:.6f}" for c in columns[1:]]
)
outfile.flush()
count += 1
if args.count == 0 or count < args.count:
time.sleep(interval)
except KeyboardInterrupt:
print("\nMonitoring stopped.")
finally:
if outfile:
outfile.close()
print(f"Data saved to {args.output}")
def cmd_live(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Live monitor with real-time graphs."""
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import deque
max_points = args.history
interval_ms = int(args.interval * 1000)
timestamps: deque[float] = deque(maxlen=max_points)
series: dict[str, deque[float]] = {
k: deque(maxlen=max_points)
for k in [
"supply_P", "load_P",
"meter_P5", "meter_P6", "meter_EFF1",
"meter_U5", "meter_U6",
"meter_I5", "meter_I6",
]
}
t0 = time.time()
writer = None
outfile = None
columns = [
"timestamp",
"supply_V", "supply_I", "supply_P",
"load_V", "load_I", "load_P",
"meter_U5", "meter_I5", "meter_P5",
"meter_U6", "meter_I6", "meter_P6",
"meter_EFF1",
]
if args.output:
outfile = open(args.output, "w", newline="")
writer = csv.writer(outfile)
writer.writerow(columns)
print(f"Logging to {args.output}")
fig, axes = plt.subplots(4, 1, figsize=(14, 12), squeeze=False)
fig.suptitle("MPPT Testbench Live Monitor", fontsize=14, fontweight="bold")
axes = axes.flatten()
# Subplot 0: Power
axes[0].set_ylabel("Power (W)")
axes[0].set_title("Power")
axes[0].grid(True, alpha=0.3)
line_p5, = axes[0].plot([], [], label="P_in (meter)", linewidth=1.5)
line_p6, = axes[0].plot([], [], label="P_out (meter)", linewidth=1.5)
line_sp, = axes[0].plot([], [], label="Supply P", linewidth=1, linestyle="--", alpha=0.6)
line_lp, = axes[0].plot([], [], label="Load P", linewidth=1, linestyle="--", alpha=0.6)
axes[0].legend(loc="upper left", fontsize=9)
# Subplot 1: Efficiency
axes[1].set_ylabel("Efficiency (%)")
axes[1].set_title("Efficiency")
axes[1].grid(True, alpha=0.3)
line_eff, = axes[1].plot([], [], label="EFF1", linewidth=1.5, color="green")
axes[1].legend(loc="upper left", fontsize=9)
# Subplot 2: Voltage
axes[2].set_ylabel("Voltage (V)")
axes[2].set_title("Voltage")
axes[2].grid(True, alpha=0.3)
line_u5, = axes[2].plot([], [], label="U5 (input)", linewidth=1.5)
line_u6, = axes[2].plot([], [], label="U6 (output)", linewidth=1.5)
axes[2].legend(loc="upper left", fontsize=9)
# Subplot 3: Current
axes[3].set_ylabel("Current (A)")
axes[3].set_title("Current")
axes[3].set_xlabel("Time (s)")
axes[3].grid(True, alpha=0.3)
line_i5, = axes[3].plot([], [], label="I5 (input)", linewidth=1.5)
line_i6, = axes[3].plot([], [], label="I6 (output)", linewidth=1.5)
axes[3].legend(loc="upper left", fontsize=9)
fig.tight_layout()
ERROR_THRESHOLD = 1e90
all_lines = [line_p5, line_p6, line_sp, line_lp, line_eff, line_u5, line_u6, line_i5, line_i6]
def _clean(val: float) -> float:
return float("nan") if abs(val) > ERROR_THRESHOLD else val
def update(_frame):
try:
data = bench.measure_all()
except Exception as e:
print(f"Read error: {e}")
return all_lines
now = time.time() - t0
timestamps.append(now)
for key in series:
series[key].append(_clean(data[key]))
ts = time.strftime("%H:%M:%S")
print(
f"{ts} P_in={data['meter_P5']:+.2E} "
f"P_out={data['meter_P6']:+.2E} "
f"EFF={data['meter_EFF1']:.1f}%"
)
if writer:
writer.writerow(
[time.strftime("%Y-%m-%d %H:%M:%S")]
+ [f"{data[c]:.6f}" for c in columns[1:]]
)
outfile.flush()
t_list = list(timestamps)
line_p5.set_data(t_list, list(series["meter_P5"]))
line_p6.set_data(t_list, list(series["meter_P6"]))
line_sp.set_data(t_list, list(series["supply_P"]))
line_lp.set_data(t_list, list(series["load_P"]))
line_eff.set_data(t_list, list(series["meter_EFF1"]))
line_u5.set_data(t_list, list(series["meter_U5"]))
line_u6.set_data(t_list, list(series["meter_U6"]))
line_i5.set_data(t_list, list(series["meter_I5"]))
line_i6.set_data(t_list, list(series["meter_I6"]))
for ax in axes:
ax.relim()
ax.autoscale_view()
return all_lines
_anim = FuncAnimation(
fig, update, interval=interval_ms, blit=False, cache_frame_data=False
)
try:
plt.show()
except KeyboardInterrupt:
pass
finally:
if outfile:
outfile.close()
print(f"Data saved to {args.output}")
def cmd_sweep(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Run a voltage sweep to characterize MPPT tracking."""
print(
f"Voltage sweep: {args.v_start:.1f}V -> {args.v_stop:.1f}V, "
f"step={args.v_step:.2f}V, I_limit={args.current_limit:.1f}A, "
f"settle={args.settle:.1f}s"
)
# Ensure load is configured
if args.load_mode:
bench.load.set_mode(args.load_mode)
if args.load_value is not None:
mode = bench.load.get_mode()
if mode == "CC":
bench.load.set_cc_current(args.load_value)
elif mode == "CR":
bench.load.set_cr_resistance(args.load_value)
elif mode == "CV":
bench.load.set_cv_voltage(args.load_value)
elif mode == "CP":
bench.load.set_cp_power(args.load_value)
bench.load.load_on()
print()
results = bench.sweep_voltage(
v_start=args.v_start,
v_stop=args.v_stop,
v_step=args.v_step,
current_limit=args.current_limit,
settle_time=args.settle,
load_setpoint=args.load_value if args.load_value is not None else 0.0,
)
bench.load.load_off()
_write_sweep_csv(results, args.output)
_print_sweep_summary(results)
def _write_sweep_csv(results: list, output: str | None) -> None:
"""Write sweep results to CSV."""
if not output:
return
with open(output, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"voltage_set", "current_limit", "load_setpoint",
"supply_V", "supply_I", "supply_P",
"load_V", "load_I", "load_P",
"input_power", "output_power", "efficiency",
])
for pt in results:
writer.writerow([
f"{pt.voltage_set:.4f}",
f"{pt.current_limit:.4f}",
f"{pt.load_setpoint:.4f}",
f"{pt.supply_voltage:.4f}",
f"{pt.supply_current:.4f}",
f"{pt.supply_power:.4f}",
f"{pt.load_voltage:.4f}",
f"{pt.load_current:.4f}",
f"{pt.load_power:.4f}",
f"{pt.input_power:.4f}",
f"{pt.output_power:.4f}",
f"{pt.efficiency:.4f}",
])
print(f"\nSweep data saved to {output}")
def _print_sweep_summary(results: list) -> None:
"""Print best-efficiency point from sweep results."""
if results:
best = max(results, key=lambda p: p.efficiency)
print(f"\nBest efficiency: {best.efficiency:.2f}% "
f"at V_set={best.voltage_set:.2f}V "
f"I_load={best.load_setpoint:.2f}A "
f"(P_in={best.input_power:.2f}W, P_out={best.output_power:.2f}W)")
def cmd_sweep_load(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Run a load current sweep at a fixed supply voltage."""
print(
f"Load current sweep: {args.i_start:.2f}A -> {args.i_stop:.2f}A, "
f"step={args.i_step:.2f}A, V={args.voltage:.1f}V, "
f"I_limit={args.current_limit:.1f}A, settle={args.settle:.1f}s"
)
print()
results = bench.sweep_load_current(
voltage=args.voltage,
current_limit=args.current_limit,
i_start=args.i_start,
i_stop=args.i_stop,
i_step=args.i_step,
settle_time=args.settle,
)
_write_sweep_csv(results, args.output)
_print_sweep_summary(results)
def cmd_efficiency(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Measure efficiency at a fixed operating point."""
# Configure load
if args.load_mode:
bench.load.set_mode(args.load_mode)
if args.load_value is not None:
mode = bench.load.get_mode()
if mode == "CC":
bench.load.set_cc_current(args.load_value)
elif mode == "CR":
bench.load.set_cr_resistance(args.load_value)
elif mode == "CV":
bench.load.set_cv_voltage(args.load_value)
elif mode == "CP":
bench.load.set_cp_power(args.load_value)
bench.load.load_on()
print(
f"Measuring efficiency at {args.voltage:.1f}V, "
f"{args.current_limit:.1f}A limit, "
f"{args.samples} samples..."
)
print()
result = bench.measure_efficiency(
voltage=args.voltage,
current_limit=args.current_limit,
settle_time=args.settle,
samples=args.samples,
sample_interval=args.interval,
)
bench.load.load_off()
print()
print(f"Average input power: {result['avg_input_power']:.4f} W")
print(f"Average output power: {result['avg_output_power']:.4f} W")
print(f"Average efficiency: {result['avg_efficiency']:.2f} %")
def cmd_supply(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Control the DC supply directly."""
if args.action == "on":
bench.supply.output_on()
print("Supply output ON")
elif args.action == "off":
bench.supply.output_off()
print("Supply output OFF")
elif args.action == "set":
if args.voltage is not None and args.current is not None:
bench.supply.apply(args.voltage, args.current)
print(f"Supply set: {args.voltage:.4f} V, {args.current:.4f} A")
elif args.voltage is not None:
bench.supply.set_voltage(args.voltage)
print(f"Supply voltage: {args.voltage:.4f} V")
elif args.current is not None:
bench.supply.set_current(args.current)
print(f"Supply current: {args.current:.4f} A")
else:
print("Specify --voltage and/or --current")
def cmd_load(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Control the electronic load directly."""
if args.action == "on":
bench.load.load_on()
print("Load ON")
elif args.action == "off":
bench.load.load_off()
print("Load OFF")
elif args.action == "set":
if args.mode:
bench.load.set_mode(args.mode)
if args.value is not None:
mode = bench.load.get_mode()
if mode == "CC":
bench.load.set_cc_current(args.value)
print(f"Load CC: {args.value:.4f} A")
elif mode == "CR":
bench.load.set_cr_resistance(args.value)
print(f"Load CR: {args.value:.4f} ohm")
elif mode == "CV":
bench.load.set_cv_voltage(args.value)
print(f"Load CV: {args.value:.4f} V")
elif mode == "CP":
bench.load.set_cp_power(args.value)
print(f"Load CP: {args.value:.4f} W")
elif args.mode:
print(f"Load mode: {args.mode}")
def cmd_safe_off(bench: MPPTTestbench, _args: argparse.Namespace) -> None:
"""Emergency shutdown: turn off load, then supply."""
print("Shutting down...")
bench.safe_off()
print(" Load OFF")
print(" Supply OFF")
print("Done.")
# ── Main ──────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="MPPT Tracker Testbench: IT6500D + Prodigit 3366G + HIOKI 3193-10",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s identify
%(prog)s setup
%(prog)s measure
%(prog)s monitor --interval 1.0 --output data.csv
%(prog)s live --interval 0.5
%(prog)s sweep --v-start 10 --v-stop 50 --v-step 1 --current-limit 10 -o sweep.csv
%(prog)s sweep-load --voltage 75 --current-limit 10 --i-start 1 --i-stop 20 --i-step 1 -o load.csv
%(prog)s efficiency --voltage 36 --current-limit 10 --samples 10
%(prog)s supply set --voltage 24 --current 10
%(prog)s supply on
%(prog)s load set --mode CC --value 5.0
%(prog)s load on
%(prog)s safe-off
""",
)
# Global instrument connection args
parser.add_argument(
"--supply-address",
help="IT6500D VISA address (auto-detect if omitted)",
)
parser.add_argument(
"--load-port", default="COM1",
help="Prodigit 3366G serial port (default: COM1)",
)
parser.add_argument(
"--load-baud", type=int, default=115200,
help="Prodigit 3366G baud rate (default: 115200)",
)
parser.add_argument(
"--meter-address",
help="HIOKI 3193-10 VISA address (auto-detect if omitted)",
)
parser.add_argument(
"--timeout", type=int, default=5000,
help="VISA timeout in ms (default: 5000)",
)
sub = parser.add_subparsers(dest="command", required=True)
# identify
sub.add_parser("identify", help="Identify all connected instruments")
# setup
sub.add_parser("setup", help="Configure all instruments for MPPT testing")
# measure
sub.add_parser("measure", help="Single measurement from all instruments")
# monitor
p_mon = sub.add_parser("monitor", help="Continuous monitoring of all instruments")
p_mon.add_argument("-i", "--interval", type=float, default=1.0)
p_mon.add_argument("-n", "--count", type=int, default=0, help="0=infinite")
p_mon.add_argument("-o", "--output", help="CSV output file")
# live
p_live = sub.add_parser("live", help="Live real-time graph of all instruments")
p_live.add_argument("-i", "--interval", type=float, default=1.0)
p_live.add_argument("-o", "--output", help="CSV output file")
p_live.add_argument("--history", type=int, default=300)
# sweep
p_sweep = sub.add_parser("sweep", help="Voltage sweep to characterize MPPT tracking")
p_sweep.add_argument("--v-start", type=float, required=True, help="Start voltage (V)")
p_sweep.add_argument("--v-stop", type=float, required=True, help="Stop voltage (V)")
p_sweep.add_argument("--v-step", type=float, required=True, help="Voltage step (V)")
p_sweep.add_argument("--current-limit", type=float, required=True, help="Current limit (A)")
p_sweep.add_argument("--settle", type=float, default=1.0, help="Settle time per step (s)")
p_sweep.add_argument("--load-mode", choices=["CC", "CR", "CV", "CP"], help="Set load mode before sweep")
p_sweep.add_argument("--load-value", type=float, help="Set load value before sweep")
p_sweep.add_argument("-o", "--output", help="CSV output file")
# sweep-load
p_swl = sub.add_parser("sweep-load", help="Load current sweep at fixed supply voltage")
p_swl.add_argument("--voltage", type=float, required=True, help="Fixed supply voltage (V)")
p_swl.add_argument("--current-limit", type=float, required=True, help="Supply current limit (A)")
p_swl.add_argument("--i-start", type=float, required=True, help="Start load current (A)")
p_swl.add_argument("--i-stop", type=float, required=True, help="Stop load current (A)")
p_swl.add_argument("--i-step", type=float, required=True, help="Current step (A)")
p_swl.add_argument("--settle", type=float, default=1.0, help="Settle time per step (s)")
p_swl.add_argument("-o", "--output", help="CSV output file")
# efficiency
p_eff = sub.add_parser("efficiency", help="Measure efficiency at fixed operating point")
p_eff.add_argument("--voltage", type=float, required=True, help="Supply voltage (V)")
p_eff.add_argument("--current-limit", type=float, required=True, help="Current limit (A)")
p_eff.add_argument("--samples", type=int, default=5, help="Number of readings to average")
p_eff.add_argument("--settle", type=float, default=2.0, help="Initial settle time (s)")
p_eff.add_argument("-i", "--interval", type=float, default=1.0, help="Interval between samples")
p_eff.add_argument("--load-mode", choices=["CC", "CR", "CV", "CP"])
p_eff.add_argument("--load-value", type=float)
# supply (direct control)
p_sup = sub.add_parser("supply", help="Direct supply control")
p_sup_sub = p_sup.add_subparsers(dest="action", required=True)
p_sup_sub.add_parser("on", help="Turn supply output ON")
p_sup_sub.add_parser("off", help="Turn supply output OFF")
p_sup_set = p_sup_sub.add_parser("set", help="Set supply voltage/current")
p_sup_set.add_argument("-v", "--voltage", type=float)
p_sup_set.add_argument("-c", "--current", type=float)
# load (direct control)
p_ld = sub.add_parser("load", help="Direct load control")
p_ld_sub = p_ld.add_subparsers(dest="action", required=True)
p_ld_sub.add_parser("on", help="Turn load ON")
p_ld_sub.add_parser("off", help="Turn load OFF")
p_ld_set = p_ld_sub.add_parser("set", help="Set load mode and value")
p_ld_set.add_argument("-m", "--mode", choices=["CC", "CR", "CV", "CP"])
p_ld_set.add_argument("-v", "--value", type=float, help="Setpoint value")
# safe-off
sub.add_parser("safe-off", help="Emergency: turn off load and supply")
args = parser.parse_args()
dispatch = {
"identify": cmd_identify,
"setup": cmd_setup,
"measure": cmd_measure,
"monitor": cmd_monitor,
"live": cmd_live,
"sweep": cmd_sweep,
"sweep-load": cmd_sweep_load,
"efficiency": cmd_efficiency,
"supply": cmd_supply,
"load": cmd_load,
"safe-off": cmd_safe_off,
}
bench = connect_bench(args)
try:
bench.supply.remote()
bench.load.remote()
dispatch[args.command](bench, args)
except KeyboardInterrupt:
print("\nInterrupted.")
finally:
bench.close()
if __name__ == "__main__":
main()