Add shade profiles, 2D V×I sweep, meter format toggle, ON/OFF indicators, and GUI console

- Shade profile: CSV-driven irradiance/voltage sequences with load control
  (bench.run_shade_profile, CLI shade-profile command, GUI profile panel)
- 2D sweep: voltage × load current efficiency map with live graph updates
  (bench.sweep_vi, CLI sweep-vi command, GUI sweep panel with background thread)
- GUI: meter format selector (scientific/normal), supply/load ON/OFF indicators,
  console log with stdout redirect and color-coded messages
- Sample profiles: cloud_pass, partial_shade, intermittent_clouds

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 15:27:48 +07:00
parent 74917e05f2
commit 956be4b77a
8 changed files with 953 additions and 14 deletions

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@ dist/
build/
.venv/
*.csv
!samples/*.csv

11
samples/cloud_pass.csv Normal file
View File

@@ -0,0 +1,11 @@
time,voltage,current_limit,load_mode,load_value
0,75,10.0,CC,3.0
10,75,8.0,CC,3.0
20,75,6.0,CC,3.0
30,75,4.0,CC,2.0
40,75,2.0,CC,1.0
50,75,2.0,CC,1.0
60,75,4.0,CC,2.0
70,75,6.0,CC,3.0
80,75,8.0,CC,3.0
90,75,10.0,CC,3.0
1 time voltage current_limit load_mode load_value
2 0 75 10.0 CC 3.0
3 10 75 8.0 CC 3.0
4 20 75 6.0 CC 3.0
5 30 75 4.0 CC 2.0
6 40 75 2.0 CC 1.0
7 50 75 2.0 CC 1.0
8 60 75 4.0 CC 2.0
9 70 75 6.0 CC 3.0
10 80 75 8.0 CC 3.0
11 90 75 10.0 CC 3.0

View File

@@ -0,0 +1,15 @@
time,voltage,current_limit,load_mode,load_value
0,75,10.0,CC,3.0
10,75,10.0,CC,3.0
15,75,4.0,CC,2.0
25,75,4.0,CC,2.0
30,75,10.0,CC,3.0
40,75,10.0,CC,3.0
45,75,6.0,CC,2.5
55,75,6.0,CC,2.5
60,75,10.0,CC,3.0
70,75,10.0,CC,3.0
75,75,2.0,CC,1.0
90,75,2.0,CC,1.0
95,75,10.0,CC,3.0
110,75,10.0,CC,3.0
1 time voltage current_limit load_mode load_value
2 0 75 10.0 CC 3.0
3 10 75 10.0 CC 3.0
4 15 75 4.0 CC 2.0
5 25 75 4.0 CC 2.0
6 30 75 10.0 CC 3.0
7 40 75 10.0 CC 3.0
8 45 75 6.0 CC 2.5
9 55 75 6.0 CC 2.5
10 60 75 10.0 CC 3.0
11 70 75 10.0 CC 3.0
12 75 75 2.0 CC 1.0
13 90 75 2.0 CC 1.0
14 95 75 10.0 CC 3.0
15 110 75 10.0 CC 3.0

11
samples/partial_shade.csv Normal file
View File

@@ -0,0 +1,11 @@
time,voltage,current_limit,load_mode,load_value
0,75,10.0,CC,3.0
15,75,10.0,CC,3.0
30,75,7.0,CC,3.0
45,65,5.0,CC,2.5
60,55,3.0,CC,2.0
75,55,3.0,CC,2.0
90,65,5.0,CC,2.5
105,75,7.0,CC,3.0
120,75,10.0,CC,3.0
135,75,10.0,CC,3.0
1 time voltage current_limit load_mode load_value
2 0 75 10.0 CC 3.0
3 15 75 10.0 CC 3.0
4 30 75 7.0 CC 3.0
5 45 65 5.0 CC 2.5
6 60 55 3.0 CC 2.0
7 75 55 3.0 CC 2.0
8 90 65 5.0 CC 2.5
9 105 75 7.0 CC 3.0
10 120 75 10.0 CC 3.0
11 135 75 10.0 CC 3.0

View File

@@ -417,6 +417,251 @@ class MPPTTestbench:
return results
# ── Load value helper ─────────────────────────────────────────────
def _apply_load_value(self, mode: str, value: float) -> None:
"""Set the load setpoint for the given mode."""
if mode == "CC":
self.load.set_cc_current(value)
elif mode == "CR":
self.load.set_cr_resistance(value)
elif mode == "CV":
self.load.set_cv_voltage(value)
elif mode == "CP":
self.load.set_cp_power(value)
# ── Shade Profile ────────────────────────────────────────────────
@staticmethod
def load_shade_profile(path: str) -> list[dict]:
"""Load a shade profile CSV.
Required columns: time, voltage, current_limit
Optional columns: load_mode, load_value
Returns:
Sorted list of step dicts.
"""
import csv as _csv
steps = []
with open(path, newline="") as f:
reader = _csv.DictReader(f)
for row in reader:
step = {
"time": float(row["time"]),
"voltage": float(row["voltage"]),
"current_limit": float(row["current_limit"]),
}
if "load_mode" in row and row["load_mode"].strip():
step["load_mode"] = row["load_mode"].strip().upper()
if "load_value" in row and row["load_value"].strip():
step["load_value"] = float(row["load_value"])
steps.append(step)
steps.sort(key=lambda s: s["time"])
return steps
def run_shade_profile(
self,
steps: list[dict],
settle_time: float = 2.0,
) -> list[SweepPoint]:
"""Run a shade / irradiance profile sequence.
Steps through the profile, applying voltage/current/load settings
at the scheduled times, recording a measurement at each step.
Args:
steps: Profile steps from :meth:`load_shade_profile`.
settle_time: Seconds to wait after applying each step before
recording a measurement.
Returns:
List of SweepPoint measurements, one per step.
"""
if not steps:
return []
# Apply first step and turn outputs on
first = steps[0]
self.supply.set_current(first["current_limit"])
self.supply.set_voltage(first["voltage"])
self.supply.output_on()
if "load_mode" in first:
self.load.set_mode(first["load_mode"])
if "load_value" in first:
self._apply_load_value(
first.get("load_mode", "CC"), first["load_value"]
)
self.load.load_on()
results: list[SweepPoint] = []
t0 = time.time()
try:
for i, step in enumerate(steps):
# Wait until scheduled time, keepalive supply while waiting
target = t0 + step["time"]
while True:
remaining = target - time.time()
if remaining <= 0:
break
if remaining > 2.0:
try:
self.supply.measure_voltage()
except Exception:
pass
time.sleep(min(2.0, remaining))
else:
time.sleep(remaining)
# Apply settings
self.supply.set_current(step["current_limit"])
self.supply.set_voltage(step["voltage"])
if "load_mode" in step:
self.load.set_mode(step["load_mode"])
if "load_value" in step:
self._apply_load_value(
step.get("load_mode", "CC"), step["load_value"]
)
time.sleep(settle_time)
# Record
load_val = step.get("load_value", 0.0) or 0.0
point = self._record_point(
step["voltage"], step["current_limit"], load_val
)
results.append(point)
elapsed = time.time() - t0
print(
f" [{i + 1}/{len(steps)}] t={elapsed:6.1f}s "
f"V={step['voltage']:.1f}V I_lim={step['current_limit']:.1f}A "
f"P_in={point.input_power:8.2f}W "
f"P_out={point.output_power:8.2f}W "
f"EFF={point.efficiency:6.2f}%"
)
finally:
self.load.load_off()
self.supply.set_voltage(IDLE_VOLTAGE)
print(
f"\n Profile complete. Load OFF. "
f"Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)"
)
return results
# ── 2D Voltage × Current Sweep ─────────────────────────────────────
def sweep_vi(
self,
v_start: float,
v_stop: float,
v_step: float,
i_start: float,
i_stop: float,
i_step: float,
current_limit: float,
settle_time: float = 2.0,
) -> list[SweepPoint]:
"""2D sweep: voltage (outer) × load current (inner).
At each supply voltage, sweeps the load current through the full
range, recording efficiency at every (V, I) combination. Produces
a complete efficiency map of the DUT.
After the sweep the load turns OFF and the supply returns to
IDLE_VOLTAGE (75 V, output stays ON).
Args:
v_start: Starting supply voltage (V).
v_stop: Final supply voltage (V).
v_step: Voltage step size (V). Sign is auto-corrected.
i_start: Starting load current (A).
i_stop: Final load current (A).
i_step: Current step size (A). Sign is auto-corrected.
current_limit: Supply current limit (A) for the entire sweep.
settle_time: Seconds to wait after each setpoint change.
"""
if v_step == 0:
raise ValueError("v_step cannot be zero")
if i_step == 0:
raise ValueError("i_step cannot be zero")
# Auto-correct step directions
if v_start < v_stop and v_step < 0:
v_step = -v_step
elif v_start > v_stop and v_step > 0:
v_step = -v_step
if i_start < i_stop and i_step < 0:
i_step = -i_step
elif i_start > i_stop and i_step > 0:
i_step = -i_step
# Count steps for progress display
v_count = int(abs(v_stop - v_start) / abs(v_step)) + 1
i_count = int(abs(i_stop - i_start) / abs(i_step)) + 1
total = v_count * i_count
print(f" Grid: {v_count} voltage × {i_count} current = {total} points")
self.supply.set_current(current_limit)
self.supply.output_on()
self.load.set_mode("CC")
self.load.set_cc_current(i_start)
self.load.load_on()
results: list[SweepPoint] = []
n = 0
v = v_start
try:
while True:
if v_step > 0 and v > v_stop + v_step / 2:
break
if v_step < 0 and v < v_stop + v_step / 2:
break
self.supply.set_voltage(v)
i = i_start
while True:
if i_step > 0 and i > i_stop + i_step / 2:
break
if i_step < 0 and i < i_stop + i_step / 2:
break
self.load.set_cc_current(i)
time.sleep(settle_time)
point = self._record_point(v, current_limit, load_setpoint=i)
results.append(point)
n += 1
print(
f" [{n:>4d}/{total}] "
f"V={v:6.1f}V I_load={i:6.2f}A "
f"P_in={point.input_power:8.2f}W "
f"P_out={point.output_power:8.2f}W "
f"EFF={point.efficiency:6.2f}%"
)
i += i_step
v += v_step
finally:
self.load.load_off()
self.supply.set_voltage(IDLE_VOLTAGE)
print(
f"\n Load OFF. Supply returning to {IDLE_VOLTAGE:.0f}V "
f"(output stays ON)"
)
return results
# ── Efficiency at fixed operating point ───────────────────────────
def measure_efficiency(

View File

@@ -438,6 +438,55 @@ def cmd_efficiency(bench: MPPTTestbench, args: argparse.Namespace) -> None:
print(f"Average efficiency: {result['avg_efficiency']:.2f} %")
def cmd_sweep_vi(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Run a 2D voltage × load current sweep."""
print(
f"2D sweep: V={args.v_start:.1f}-{args.v_stop:.1f}V (step {args.v_step:.1f}), "
f"I_load={args.i_start:.2f}-{args.i_stop:.2f}A (step {args.i_step:.2f}), "
f"I_limit={args.current_limit:.1f}A, settle={args.settle:.1f}s"
)
print()
results = bench.sweep_vi(
v_start=args.v_start,
v_stop=args.v_stop,
v_step=args.v_step,
i_start=args.i_start,
i_stop=args.i_stop,
i_step=args.i_step,
current_limit=args.current_limit,
settle_time=args.settle,
)
_write_sweep_csv(results, args.output)
_print_sweep_summary(results)
def cmd_shade_profile(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Run a shade / irradiance profile from a CSV file."""
steps = MPPTTestbench.load_shade_profile(args.profile)
duration = steps[-1]["time"] if steps else 0
print(f"Shade profile: {args.profile}")
print(f" {len(steps)} steps over {duration:.0f}s, settle={args.settle:.1f}s")
# Show what modes/values are used
modes = {s.get("load_mode", "?") for s in steps}
voltages = [s["voltage"] for s in steps]
currents = [s["current_limit"] for s in steps]
print(
f" Voltage: {min(voltages):.1f} - {max(voltages):.1f}V, "
f"I_limit: {min(currents):.1f} - {max(currents):.1f}A, "
f"Load modes: {', '.join(sorted(modes))}"
)
print()
results = bench.run_shade_profile(steps, settle_time=args.settle)
_write_sweep_csv(results, args.output)
_print_sweep_summary(results)
def cmd_supply(bench: MPPTTestbench, args: argparse.Namespace) -> None:
"""Control the DC supply directly."""
if args.action == "on":
@@ -515,6 +564,8 @@ examples:
%(prog)s sweep --v-start 10 --v-stop 50 --v-step 1 --current-limit 10 -o sweep.csv
%(prog)s sweep-load --voltage 75 --current-limit 10 --i-start 1 --i-stop 20 --i-step 1 -o load.csv
%(prog)s efficiency --voltage 36 --current-limit 10 --samples 10
%(prog)s sweep-vi --v-start 35 --v-stop 100 --v-step 5 --i-start 0.5 --i-stop 30 --i-step 1 --current-limit 35 -o map.csv
%(prog)s shade-profile --profile cloud_pass.csv --settle 2.0 -o shade_results.csv
%(prog)s supply set --voltage 24 --current 10
%(prog)s supply on
%(prog)s load set --mode CC --value 5.0
@@ -599,6 +650,24 @@ examples:
p_eff.add_argument("--load-mode", choices=["CC", "CR", "CV", "CP"])
p_eff.add_argument("--load-value", type=float)
# sweep-vi (2D)
p_svi = sub.add_parser("sweep-vi", help="2D voltage × load current sweep (efficiency map)")
p_svi.add_argument("--v-start", type=float, required=True, help="Start voltage (V)")
p_svi.add_argument("--v-stop", type=float, required=True, help="Stop voltage (V)")
p_svi.add_argument("--v-step", type=float, required=True, help="Voltage step (V)")
p_svi.add_argument("--i-start", type=float, required=True, help="Start load current (A)")
p_svi.add_argument("--i-stop", type=float, required=True, help="Stop load current (A)")
p_svi.add_argument("--i-step", type=float, required=True, help="Current step (A)")
p_svi.add_argument("--current-limit", type=float, required=True, help="Supply current limit (A)")
p_svi.add_argument("--settle", type=float, default=2.0, help="Settle time per step (s)")
p_svi.add_argument("-o", "--output", help="CSV output file")
# shade-profile
p_shade = sub.add_parser("shade-profile", help="Run a shade/irradiance profile from CSV")
p_shade.add_argument("--profile", required=True, help="Profile CSV file (time,voltage,current_limit,...)")
p_shade.add_argument("--settle", type=float, default=2.0, help="Settle time per step (s)")
p_shade.add_argument("-o", "--output", help="CSV output file for results")
# supply (direct control)
p_sup = sub.add_parser("supply", help="Direct supply control")
p_sup_sub = p_sup.add_subparsers(dest="action", required=True)
@@ -631,6 +700,8 @@ examples:
"sweep": cmd_sweep,
"sweep-load": cmd_sweep_load,
"efficiency": cmd_efficiency,
"sweep-vi": cmd_sweep_vi,
"shade-profile": cmd_shade_profile,
"supply": cmd_supply,
"load": cmd_load,
"safe-off": cmd_safe_off,

View File

@@ -6,7 +6,9 @@ Tkinter app with embedded matplotlib graphs and threaded instrument I/O.
from __future__ import annotations
import csv
import queue
import sys
import threading
import time
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
@@ -47,6 +49,30 @@ def _clean(val: float) -> float:
return float("nan") if abs(val) >= ERROR_THRESHOLD else val
class _ConsoleRedirector:
"""Redirects stdout writes to the GUI console widget."""
def __init__(self, gui: "TestbenchGUI") -> None:
self._gui = gui
self._orig = sys.stdout
self._buf = ""
def write(self, text: str) -> None:
if self._orig:
self._orig.write(text)
# Buffer partial lines, flush on newline
self._buf += text
while "\n" in self._buf:
line, self._buf = self._buf.split("\n", 1)
line = line.strip()
if line:
self._gui._console(line)
def flush(self) -> None:
if self._orig:
self._orig.flush()
class TestbenchGUI(tk.Tk):
"""Main GUI window."""
@@ -77,6 +103,9 @@ class TestbenchGUI(tk.Tk):
]
}
self._meter_fmt_sci = True # True=scientific, False=normal
self._sweep_data_queue: queue.Queue = queue.Queue(maxsize=100)
self._build_ui()
self.protocol("WM_DELETE_WINDOW", self._on_close)
@@ -99,14 +128,21 @@ class TestbenchGUI(tk.Tk):
self._build_supply_controls(left_col0)
self._build_load_controls(left_col0)
self._build_logging_controls(left_col0)
self._build_profile_controls(left_col0)
# Column 1: Meter
left_col1 = ttk.Frame(left_outer)
left_col1.grid(row=0, column=1, sticky="nsew", padx=(2, 0))
self._build_meter_readout(left_col1)
# Right panel (graphs)
right_frame = ttk.Frame(content)
content.add(right_frame, weight=1)
self._build_graphs(right_frame)
self._build_sweep_vi_controls(left_col1)
# Right panel: graphs + console
right_pane = ttk.PanedWindow(content, orient=tk.VERTICAL)
content.add(right_pane, weight=1)
graph_frame = ttk.Frame(right_pane)
right_pane.add(graph_frame, weight=3)
self._build_graphs(graph_frame)
console_frame = ttk.Frame(right_pane)
right_pane.add(console_frame, weight=1)
self._build_console(console_frame)
# Status bar
self._build_status_bar()
@@ -217,6 +253,11 @@ class TestbenchGUI(tk.Tk):
self._sup_i_label.pack(anchor=tk.W)
self._sup_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11))
self._sup_p_label.pack(anchor=tk.W)
self._sup_state_label = tk.Label(
readout, text="OUTPUT: ---", font=("Consolas", 11, "bold"),
fg="#888888",
)
self._sup_state_label.pack(anchor=tk.W, pady=(4, 0))
def _build_load_controls(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="DC Load (Prodigit 3366G)", padding=8)
@@ -277,6 +318,11 @@ class TestbenchGUI(tk.Tk):
self._load_i_label.pack(anchor=tk.W)
self._load_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11))
self._load_p_label.pack(anchor=tk.W)
self._load_state_label = tk.Label(
readout, text="LOAD: ---", font=("Consolas", 11, "bold"),
fg="#888888",
)
self._load_state_label.pack(anchor=tk.W, pady=(4, 0))
def _build_meter_readout(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="Power Analyzer (HIOKI 3193-10)", padding=8)
@@ -324,6 +370,17 @@ class TestbenchGUI(tk.Tk):
ttk.Button(row, text="Set", width=4,
command=lambda: self._send(Cmd.SET_COUPLING, 6, self._ch6_coupling.get())).pack(side=tk.LEFT, padx=2)
# Format selector
fmt_row = ttk.Frame(frame)
fmt_row.pack(fill=tk.X, pady=2)
ttk.Label(fmt_row, text="Format:", width=10).pack(side=tk.LEFT)
self._meter_fmt_combo = ttk.Combobox(
fmt_row, values=["Scientific", "Normal"], width=10, state="readonly"
)
self._meter_fmt_combo.set("Scientific")
self._meter_fmt_combo.pack(side=tk.LEFT, padx=2)
self._meter_fmt_combo.bind("<<ComboboxSelected>>", self._on_meter_fmt_change)
# Input side
input_frame = ttk.LabelFrame(frame, text="Input (Ch5 - Solar)", padding=4)
input_frame.pack(fill=tk.X, pady=2)
@@ -372,6 +429,102 @@ class TestbenchGUI(tk.Tk):
self._log_status = ttk.Label(frame, text="Not logging", font=("Consolas", 9))
self._log_status.pack(anchor=tk.W, pady=2)
def _build_profile_controls(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="Shade Profile", padding=8)
frame.pack(fill=tk.X, padx=4, pady=4)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
self._profile_path_label = ttk.Label(row, text="No file selected", font=("Consolas", 9))
self._profile_path_label.pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Button(row, text="Browse", command=self._browse_profile).pack(side=tk.RIGHT, padx=2)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Label(row, text="Settle (s):", width=12).pack(side=tk.LEFT)
self._profile_settle = ttk.Entry(row, width=6)
self._profile_settle.insert(0, "2.0")
self._profile_settle.pack(side=tk.LEFT, padx=2)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
self._btn_profile_run = ttk.Button(row, text="Run Profile", command=self._run_profile)
self._btn_profile_run.pack(side=tk.LEFT, padx=2)
self._btn_profile_stop = ttk.Button(row, text="Stop", command=self._stop_profile, state=tk.DISABLED)
self._btn_profile_stop.pack(side=tk.LEFT, padx=2)
self._profile_status = ttk.Label(frame, text="Idle", font=("Consolas", 9))
self._profile_status.pack(anchor=tk.W, pady=2)
self._profile_steps: list[dict] = []
self._profile_index = 0
self._profile_running = False
self._profile_t0 = 0.0
self._profile_after_id = None
def _build_sweep_vi_controls(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="2D Sweep (V × I)", padding=8)
frame.pack(fill=tk.X, padx=4, pady=4)
# Voltage range
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="V start:", width=10).pack(side=tk.LEFT)
self._svi_v_start = ttk.Entry(row, width=7)
self._svi_v_start.insert(0, "35")
self._svi_v_start.pack(side=tk.LEFT, padx=2)
ttk.Label(row, text="stop:").pack(side=tk.LEFT)
self._svi_v_stop = ttk.Entry(row, width=7)
self._svi_v_stop.insert(0, "100")
self._svi_v_stop.pack(side=tk.LEFT, padx=2)
ttk.Label(row, text="step:").pack(side=tk.LEFT)
self._svi_v_step = ttk.Entry(row, width=5)
self._svi_v_step.insert(0, "5")
self._svi_v_step.pack(side=tk.LEFT, padx=2)
# Current range
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="I start:", width=10).pack(side=tk.LEFT)
self._svi_i_start = ttk.Entry(row, width=7)
self._svi_i_start.insert(0, "0.5")
self._svi_i_start.pack(side=tk.LEFT, padx=2)
ttk.Label(row, text="stop:").pack(side=tk.LEFT)
self._svi_i_stop = ttk.Entry(row, width=7)
self._svi_i_stop.insert(0, "30")
self._svi_i_stop.pack(side=tk.LEFT, padx=2)
ttk.Label(row, text="step:").pack(side=tk.LEFT)
self._svi_i_step = ttk.Entry(row, width=5)
self._svi_i_step.insert(0, "1")
self._svi_i_step.pack(side=tk.LEFT, padx=2)
# Supply current limit + settle
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="I limit:", width=10).pack(side=tk.LEFT)
self._svi_ilimit = ttk.Entry(row, width=7)
self._svi_ilimit.insert(0, "35")
self._svi_ilimit.pack(side=tk.LEFT, padx=2)
ttk.Label(row, text="settle:").pack(side=tk.LEFT)
self._svi_settle = ttk.Entry(row, width=5)
self._svi_settle.insert(0, "2.0")
self._svi_settle.pack(side=tk.LEFT, padx=2)
ttk.Label(row, text="s").pack(side=tk.LEFT)
# Run / Stop
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
self._btn_svi_run = ttk.Button(row, text="Run Sweep", command=self._run_sweep_vi)
self._btn_svi_run.pack(side=tk.LEFT, padx=2)
self._btn_svi_stop = ttk.Button(row, text="Stop", command=self._stop_sweep_vi, state=tk.DISABLED)
self._btn_svi_stop.pack(side=tk.LEFT, padx=2)
self._svi_status = ttk.Label(frame, text="Idle", font=("Consolas", 9))
self._svi_status.pack(anchor=tk.W, pady=2)
self._svi_thread = None
self._svi_stop_event = None
def _build_graphs(self, parent) -> None:
self._fig = Figure(figsize=(10, 8), dpi=100)
self._fig.suptitle("MPPT Testbench Live", fontsize=12, fontweight="bold")
@@ -422,6 +575,52 @@ class TestbenchGUI(tk.Tk):
toolbar = NavigationToolbar2Tk(self._canvas, parent)
toolbar.update()
def _build_console(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="Console", padding=4)
frame.pack(fill=tk.BOTH, expand=True, padx=0, pady=0)
self._console_text = tk.Text(
frame, height=8, font=("Consolas", 9),
wrap=tk.WORD, state=tk.DISABLED,
bg="#1e1e1e", fg="#cccccc", insertbackground="#cccccc",
)
scrollbar = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self._console_text.yview)
self._console_text.config(yscrollcommand=scrollbar.set)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self._console_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Tag for error messages
self._console_text.tag_configure("error", foreground="#ff6b6b")
self._console_text.tag_configure("success", foreground="#69db7c")
self._console_text.tag_configure("warn", foreground="#ffd43b")
# Redirect stdout to console
self._orig_stdout = sys.stdout
sys.stdout = _ConsoleRedirector(self)
def _console(self, msg: str, tag: str = "") -> None:
"""Append a message to the console log. Thread-safe."""
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {msg}\n"
def _append():
self._console_text.config(state=tk.NORMAL)
if tag:
self._console_text.insert(tk.END, line, tag)
else:
self._console_text.insert(tk.END, line)
self._console_text.see(tk.END)
self._console_text.config(state=tk.DISABLED)
# If called from a non-main thread, schedule on main thread
try:
if threading.current_thread() is threading.main_thread():
_append()
else:
self.after(0, _append)
except RuntimeError:
pass
def _build_status_bar(self) -> None:
bar = ttk.Frame(self)
bar.pack(fill=tk.X, padx=4, pady=(0, 4))
@@ -465,9 +664,11 @@ class TestbenchGUI(tk.Tk):
self._meter_addr.config(state=tk.DISABLED)
self._status_label.config(text="Connected")
self._console(f"Connected: supply={supply_addr}, load={load_port}, meter={meter_addr}", "success")
self._poll()
except Exception as e:
self._console(f"Connection failed: {e}", "error")
messagebox.showerror("Connection Error", str(e))
def _disconnect(self) -> None:
@@ -490,12 +691,15 @@ class TestbenchGUI(tk.Tk):
self._load_baud.config(state=tk.NORMAL)
self._meter_addr.config(state=tk.NORMAL)
self._status_label.config(text="Disconnected")
self._console("Disconnected")
def _setup_all(self) -> None:
self._send(Cmd.SETUP_ALL)
self._console("Setup All sent")
def _safe_off(self) -> None:
"""Emergency stop -- bypasses command queue for minimum latency."""
self._console("SAFE OFF triggered", "error")
if self.bench:
try:
self.bench.safe_off()
@@ -512,19 +716,31 @@ class TestbenchGUI(tk.Tk):
except Exception:
pass
self._disconnect()
sys.stdout = self._orig_stdout
self.destroy()
# ── Polling / Data Update ─────────────────────────────────────────
def _poll(self) -> None:
"""Called periodically to pull data from the worker and update UI."""
if not self.worker:
return
data = None
if self.worker:
data = self.worker.get_data()
else:
# During 2D sweep the worker is stopped; drain sweep queue
latest = None
while True:
try:
latest = self._sweep_data_queue.get_nowait()
except queue.Empty:
break
data = latest
data = self.worker.get_data()
if data:
error = data.get("_error")
if error:
self._console(f"Error: {error}", "error")
self._status_label.config(text=f"Error: {error}")
else:
self._update_readouts(data)
@@ -539,7 +755,9 @@ class TestbenchGUI(tk.Tk):
text=f"Connected | {self._point_count} pts | {hrs:02d}:{mins:02d}:{secs:02d}"
)
self.after(POLL_MS, self._poll)
# Keep polling as long as connected (bench exists)
if self.worker or self.bench:
self.after(POLL_MS, self._poll)
def _update_readouts(self, data: dict) -> None:
"""Update all numeric labels from measurement data."""
@@ -554,12 +772,13 @@ class TestbenchGUI(tk.Tk):
self._load_p_label.config(text=f"P: {_fmt(data['load_P'])} W")
# Meter
self._m_u5.config(text=f"U5: {_fmt_eng(data['meter_U5'])} V")
self._m_i5.config(text=f"I5: {_fmt_eng(data['meter_I5'])} A")
self._m_p5.config(text=f"P5: {_fmt_eng(data['meter_P5'])} W")
self._m_u6.config(text=f"U6: {_fmt_eng(data['meter_U6'])} V")
self._m_i6.config(text=f"I6: {_fmt_eng(data['meter_I6'])} A")
self._m_p6.config(text=f"P6: {_fmt_eng(data['meter_P6'])} W")
fm = self._fmt_meter
self._m_u5.config(text=f"U5: {fm(data['meter_U5'])} V")
self._m_i5.config(text=f"I5: {fm(data['meter_I5'])} A")
self._m_p5.config(text=f"P5: {fm(data['meter_P5'])} W")
self._m_u6.config(text=f"U6: {fm(data['meter_U6'])} V")
self._m_i6.config(text=f"I6: {fm(data['meter_I6'])} A")
self._m_p6.config(text=f"P6: {fm(data['meter_P6'])} W")
eff = data['meter_EFF1']
if abs(eff) >= ERROR_THRESHOLD:
@@ -567,6 +786,23 @@ class TestbenchGUI(tk.Tk):
else:
self._m_eff.config(text=f"EFF1: {eff:.2f} %")
# ON/OFF indicators
supply_on = data.get("supply_on")
if supply_on is True:
self._sup_state_label.config(text="OUTPUT: ON", fg="#00aa00")
elif supply_on is False:
self._sup_state_label.config(text="OUTPUT: OFF", fg="#cc0000")
else:
self._sup_state_label.config(text="OUTPUT: ---", fg="#888888")
load_on = data.get("load_on")
if load_on is True:
self._load_state_label.config(text="LOAD: ON", fg="#00aa00")
elif load_on is False:
self._load_state_label.config(text="LOAD: OFF", fg="#cc0000")
else:
self._load_state_label.config(text="LOAD: ---", fg="#888888")
def _update_graphs(self, data: dict) -> None:
"""Append data to series and redraw graphs."""
now = time.time() - self._t0
@@ -592,12 +828,27 @@ class TestbenchGUI(tk.Tk):
self._canvas.draw_idle()
# ── Format Helpers ─────────────────────────────────────────────────
def _on_meter_fmt_change(self, _event=None) -> None:
self._meter_fmt_sci = self._meter_fmt_combo.get() == "Scientific"
def _fmt_meter(self, val: float) -> str:
"""Format a meter value based on the current format selection."""
if self._meter_fmt_sci:
return _fmt_eng(val)
return _fmt(val, decimals=4)
# ── Command Helpers ───────────────────────────────────────────────
def _send(self, cmd: Cmd, *args) -> None:
"""Send a command to the worker thread."""
if self.worker:
self.worker.send(cmd, *args)
if args:
self._console(f"{cmd.name} {' '.join(str(a) for a in args)}")
else:
self._console(cmd.name)
def _send_float(self, cmd: Cmd, entry: ttk.Entry) -> None:
"""Parse an entry as float and send to worker."""
@@ -652,6 +903,329 @@ class TestbenchGUI(tk.Tk):
except ValueError:
pass
# ── Shade Profile ──────────────────────────────────────────────────
def _browse_profile(self) -> None:
path = filedialog.askopenfilename(
filetypes=[("CSV files", "*.csv"), ("All files", "*.*")],
)
if not path:
return
try:
self._profile_steps = MPPTTestbench.load_shade_profile(path)
n = len(self._profile_steps)
dur = self._profile_steps[-1]["time"] if self._profile_steps else 0
self._profile_path_label.config(text=path.split("/")[-1].split("\\")[-1])
self._profile_status.config(text=f"Loaded: {n} steps, {dur:.0f}s")
except Exception as e:
messagebox.showerror("Profile Error", str(e))
def _run_profile(self) -> None:
if not self._profile_steps:
messagebox.showwarning("No Profile", "Load a profile CSV first.")
return
if not self.worker:
return
try:
settle = float(self._profile_settle.get())
except ValueError:
settle = 2.0
n = len(self._profile_steps)
dur = self._profile_steps[-1]["time"] if self._profile_steps else 0
self._console(f"Shade profile started: {n} steps, {dur:.0f}s", "success")
self._profile_running = True
self._profile_index = 0
self._profile_t0 = time.time()
self._btn_profile_run.config(state=tk.DISABLED)
self._btn_profile_stop.config(state=tk.NORMAL)
# Turn outputs on with first step
first = self._profile_steps[0]
self._send(Cmd.SET_CURRENT, first["current_limit"])
self._send(Cmd.SET_VOLTAGE, first["voltage"])
self._send(Cmd.OUTPUT_ON)
if "load_mode" in first:
self._send(Cmd.SET_MODE, first["load_mode"])
if "load_value" in first:
self._send(Cmd.SET_MODE_VALUE, first.get("load_mode", "CC"), first["load_value"])
self._send(Cmd.LOAD_ON)
self._profile_schedule_next()
def _profile_schedule_next(self) -> None:
if not self._profile_running:
return
if self._profile_index >= len(self._profile_steps):
self._finish_profile()
return
step = self._profile_steps[self._profile_index]
target = self._profile_t0 + step["time"]
delay_ms = max(0, int((target - time.time()) * 1000))
self._profile_after_id = self.after(delay_ms, self._apply_profile_step)
def _apply_profile_step(self) -> None:
if not self._profile_running:
return
step = self._profile_steps[self._profile_index]
self._send(Cmd.SET_CURRENT, step["current_limit"])
self._send(Cmd.SET_VOLTAGE, step["voltage"])
if "load_mode" in step:
self._send(Cmd.SET_MODE, step["load_mode"])
if "load_value" in step:
self._send(Cmd.SET_MODE_VALUE, step.get("load_mode", "CC"), step["load_value"])
elapsed = time.time() - self._profile_t0
n = len(self._profile_steps)
self._profile_status.config(
text=f"Step {self._profile_index + 1}/{n} "
f"t={elapsed:.0f}s V={step['voltage']:.1f}V "
f"I_lim={step['current_limit']:.1f}A"
)
self._profile_index += 1
self._profile_schedule_next()
def _stop_profile(self) -> None:
self._profile_running = False
if self._profile_after_id:
self.after_cancel(self._profile_after_id)
self._profile_after_id = None
self._btn_profile_run.config(state=tk.NORMAL)
self._btn_profile_stop.config(state=tk.DISABLED)
self._profile_status.config(text="Stopped")
self._console("Shade profile stopped", "warn")
def _finish_profile(self) -> None:
self._profile_running = False
self._btn_profile_run.config(state=tk.NORMAL)
self._btn_profile_stop.config(state=tk.DISABLED)
elapsed = time.time() - self._profile_t0
self._profile_status.config(text=f"Done ({elapsed:.0f}s)")
self._console(f"Shade profile complete ({elapsed:.0f}s)", "success")
# ── 2D Sweep (V × I) ───────────────────────────────────────────────
def _run_sweep_vi(self) -> None:
if not self.bench:
return
try:
params = {
"v_start": float(self._svi_v_start.get()),
"v_stop": float(self._svi_v_stop.get()),
"v_step": float(self._svi_v_step.get()),
"i_start": float(self._svi_i_start.get()),
"i_stop": float(self._svi_i_stop.get()),
"i_step": float(self._svi_i_step.get()),
"current_limit": float(self._svi_ilimit.get()),
"settle_time": float(self._svi_settle.get()),
}
except ValueError:
messagebox.showerror("Invalid Input", "Check sweep parameters.")
return
# Ask for output file
default_name = time.strftime("sweep_vi_%Y%m%d_%H%M%S.csv")
path = filedialog.asksaveasfilename(
defaultextension=".csv",
filetypes=[("CSV files", "*.csv")],
initialfile=default_name,
)
if not path:
return
self._console(
f"2D sweep: V={params['v_start']}-{params['v_stop']}V "
f"I={params['i_start']}-{params['i_stop']}A",
"success",
)
# Stop the normal worker so the sweep owns the instruments
if self.worker:
self.worker.stop()
self.worker = None
# Restart _poll so it drains the sweep data queue
self.after(POLL_MS, self._poll)
self._btn_svi_run.config(state=tk.DISABLED)
self._btn_svi_stop.config(state=tk.NORMAL)
stop_event = threading.Event()
self._svi_stop_event = stop_event
def _sweep_thread():
try:
self.after(0, lambda: self._svi_status.config(text="Running..."))
results = self._sweep_vi_loop(params, stop_event)
# Write CSV
if results:
self._write_sweep_vi_csv(results, path)
fname = path.split('/')[-1].split(chr(92))[-1]
msg = f"Done: {len(results)} pts saved to {fname}"
self._console(msg, "success")
else:
msg = "Sweep stopped (no data)"
self._console(msg, "warn")
self.after(0, lambda: self._svi_status.config(text=msg))
except Exception as e:
self._console(f"Sweep error: {e}", "error")
self.after(0, lambda: self._svi_status.config(text=f"Error: {e}"))
finally:
self.after(0, self._sweep_vi_done)
self._svi_thread = threading.Thread(target=_sweep_thread, daemon=True)
self._svi_thread.start()
def _sweep_vi_loop(self, p: dict, stop: threading.Event) -> list:
"""Run the 2D sweep on a background thread. Returns list of SweepPoint."""
from testbench.bench import SweepPoint, IDLE_VOLTAGE
bench = self.bench
v_start, v_stop, v_step = p["v_start"], p["v_stop"], p["v_step"]
i_start, i_stop, i_step = p["i_start"], p["i_stop"], p["i_step"]
current_limit = p["current_limit"]
settle = p["settle_time"]
# Auto-correct directions
if v_start < v_stop and v_step < 0:
v_step = -v_step
elif v_start > v_stop and v_step > 0:
v_step = -v_step
if i_start < i_stop and i_step < 0:
i_step = -i_step
elif i_start > i_stop and i_step > 0:
i_step = -i_step
bench.supply.set_current(current_limit)
bench.supply.output_on()
bench.load.set_mode("CC")
bench.load.set_cc_current(i_start)
bench.load.load_on()
results = []
n = 0
v = v_start
try:
while not stop.is_set():
if v_step > 0 and v > v_stop + v_step / 2:
break
if v_step < 0 and v < v_stop + v_step / 2:
break
bench.supply.set_voltage(v)
i = i_start
while not stop.is_set():
if i_step > 0 and i > i_stop + i_step / 2:
break
if i_step < 0 and i < i_stop + i_step / 2:
break
bench.load.set_cc_current(i)
time.sleep(settle)
if stop.is_set():
break
point = bench._record_point(v, current_limit, load_setpoint=i)
results.append(point)
n += 1
# Push data for live graph/readout updates
gui_data = {
"supply_V": point.supply_voltage,
"supply_I": point.supply_current,
"supply_P": point.supply_power,
"load_V": point.load_voltage,
"load_I": point.load_current,
"load_P": point.load_power,
"meter_U5": point.supply_voltage,
"meter_I5": point.supply_current,
"meter_P5": point.input_power,
"meter_U6": point.load_voltage,
"meter_I6": point.load_current,
"meter_P6": point.output_power,
"meter_EFF1": point.efficiency,
"supply_on": True,
"load_on": True,
"_error": None,
"_timestamp": time.time(),
}
try:
self._sweep_data_queue.put_nowait(gui_data)
except queue.Full:
try:
self._sweep_data_queue.get_nowait()
except queue.Empty:
pass
self._sweep_data_queue.put_nowait(gui_data)
self.after(
0,
lambda v=v, i=i, pt=point, n=n: self._svi_status.config(
text=f"[{n}] V={v:.1f}V I={i:.1f}A "
f"EFF={pt.efficiency:.1f}%"
),
)
i += i_step
v += v_step
finally:
bench.load.load_off()
bench.supply.set_voltage(IDLE_VOLTAGE)
return results
@staticmethod
def _write_sweep_vi_csv(results, path: str) -> None:
import csv as _csv
with open(path, "w", newline="") as f:
w = _csv.writer(f)
w.writerow([
"voltage_set", "current_limit", "load_setpoint",
"supply_V", "supply_I", "supply_P",
"load_V", "load_I", "load_P",
"input_power", "output_power", "efficiency",
])
for pt in results:
w.writerow([
f"{pt.voltage_set:.4f}", f"{pt.current_limit:.4f}",
f"{pt.load_setpoint:.4f}",
f"{pt.supply_voltage:.4f}", f"{pt.supply_current:.4f}",
f"{pt.supply_power:.4f}",
f"{pt.load_voltage:.4f}", f"{pt.load_current:.4f}",
f"{pt.load_power:.4f}",
f"{pt.input_power:.4f}", f"{pt.output_power:.4f}",
f"{pt.efficiency:.4f}",
])
def _stop_sweep_vi(self) -> None:
if self._svi_stop_event:
self._svi_stop_event.set()
def _sweep_vi_done(self) -> None:
self._btn_svi_run.config(state=tk.NORMAL)
self._btn_svi_stop.config(state=tk.DISABLED)
self._svi_thread = None
self._svi_stop_event = None
# Restart normal worker polling
if self.bench:
interval = 1.0
try:
interval = float(self._poll_interval.get())
except ValueError:
pass
self.worker = InstrumentWorker(self.bench, interval=interval)
self.worker.start()
self._poll()
# ── Logging ───────────────────────────────────────────────────────
_LOG_COLUMNS = [
@@ -679,9 +1253,11 @@ class TestbenchGUI(tk.Tk):
self._btn_log_start.config(state=tk.DISABLED)
self._btn_log_stop.config(state=tk.NORMAL)
self._log_status.config(text=f"Logging to {path}")
self._console(f"CSV logging started: {path}")
def _stop_log(self) -> None:
if self._log_file:
self._console(f"CSV logging stopped ({self._log_count} samples)")
self._log_file.close()
self._log_file = None
self._log_writer = None

View File

@@ -99,6 +99,15 @@ class InstrumentWorker(threading.Thread):
data = self.bench.measure_all()
data["_error"] = None
data["_timestamp"] = time.time()
# Query output states for GUI indicators
try:
data["supply_on"] = self.bench.supply.get_output_state()
except Exception:
data["supply_on"] = None
try:
data["load_on"] = self.bench.load.get_load_state()
except Exception:
data["load_on"] = None
except Exception as e:
data = {"_error": str(e), "_timestamp": time.time()}