Add GUI control panel with live measurements and full instrument control

New `mppt-gui` command launches a tkinter app with:
- Live numeric readouts for all 13 measurement channels
- 4-panel real-time matplotlib graphs (power, efficiency, voltage, current)
- Full supply control: voltage, current, OVP, slew, output on/off
- Full load control: mode (CC/CR/CV/CP), setpoint, slew, load on/off
- Meter settings: wiring mode, response speed, coupling per channel
- CSV data logging with start/stop control
- Red SAFE OFF button (also Escape key) bypasses command queue
- Threaded architecture: instrument I/O never blocks the GUI
- Fix apply() silently failing: use set_current + set_voltage instead

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 13:42:07 +07:00
parent c50c09a148
commit e2be7109b7
4 changed files with 909 additions and 2 deletions

714
testbench/gui.py Normal file
View File

@@ -0,0 +1,714 @@
"""MPPT Testbench GUI -- live measurements + full instrument control.
Tkinter app with embedded matplotlib graphs and threaded instrument I/O.
"""
from __future__ import annotations
import csv
import sys
import time
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from collections import deque
import matplotlib
matplotlib.use("TkAgg")
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
from matplotlib.figure import Figure
from it6500.driver import IT6500
from prodigit3366g.driver import Prodigit3366G
from hioki3193.driver import Hioki3193
from testbench.bench import MPPTTestbench
from testbench.gui_workers import InstrumentWorker, Cmd
ERROR_THRESHOLD = 1e90
POLL_MS = 200 # GUI poll interval for checking worker data
def _fmt(val: float, decimals: int = 4) -> str:
"""Format a measurement value, showing '---' for error codes."""
if abs(val) >= ERROR_THRESHOLD:
return "---"
return f"{val:+.{decimals}f}"
def _fmt_eng(val: float) -> str:
"""Format in engineering notation, showing '---' for error codes."""
if abs(val) >= ERROR_THRESHOLD:
return "---"
return f"{val:+.4E}"
def _clean(val: float) -> float:
"""Replace HIOKI error codes with NaN for plotting."""
return float("nan") if abs(val) >= ERROR_THRESHOLD else val
class TestbenchGUI(tk.Tk):
"""Main GUI window."""
def __init__(self) -> None:
super().__init__()
self.title("MPPT Testbench Control Panel")
self.geometry("1500x950")
self.minsize(1200, 800)
self.bench: MPPTTestbench | None = None
self.worker: InstrumentWorker | None = None
self._log_file = None
self._log_writer = None
self._log_count = 0
self._point_count = 0
self._t0 = time.time()
# Graph data
self._history = 300
self._timestamps: deque[float] = deque(maxlen=self._history)
self._series: dict[str, deque[float]] = {
k: deque(maxlen=self._history)
for k in [
"supply_P", "load_P",
"meter_P5", "meter_P6", "meter_EFF1",
"meter_U5", "meter_U6",
"meter_I5", "meter_I6",
]
}
self._build_ui()
self.protocol("WM_DELETE_WINDOW", self._on_close)
# ── UI Construction ───────────────────────────────────────────────
def _build_ui(self) -> None:
# Top bar
self._build_connection_bar()
# Main content: left controls + right graphs
content = ttk.PanedWindow(self, orient=tk.HORIZONTAL)
content.pack(fill=tk.BOTH, expand=True, padx=4, pady=4)
# Left panel (controls + readout) with scrollbar
left_outer = ttk.Frame(content, width=340)
left_canvas = tk.Canvas(left_outer, width=320, highlightthickness=0)
left_scrollbar = ttk.Scrollbar(left_outer, orient=tk.VERTICAL, command=left_canvas.yview)
self._left_frame = ttk.Frame(left_canvas)
self._left_frame.bind("<Configure>", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all")))
left_canvas.create_window((0, 0), window=self._left_frame, anchor="nw")
left_canvas.configure(yscrollcommand=left_scrollbar.set)
left_canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
left_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Mouse wheel scrolling
def _on_mousewheel(event):
left_canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
left_canvas.bind_all("<MouseWheel>", _on_mousewheel)
content.add(left_outer, weight=0)
self._build_supply_controls(self._left_frame)
self._build_load_controls(self._left_frame)
self._build_meter_readout(self._left_frame)
self._build_logging_controls(self._left_frame)
# Right panel (graphs)
right_frame = ttk.Frame(content)
content.add(right_frame, weight=1)
self._build_graphs(right_frame)
# Status bar
self._build_status_bar()
def _build_connection_bar(self) -> None:
bar = ttk.Frame(self)
bar.pack(fill=tk.X, padx=4, pady=(4, 0))
# Connection fields
ttk.Label(bar, text="Supply:").pack(side=tk.LEFT)
self._supply_addr = ttk.Entry(bar, width=20)
self._supply_addr.insert(0, "auto")
self._supply_addr.pack(side=tk.LEFT, padx=(2, 8))
ttk.Label(bar, text="Load:").pack(side=tk.LEFT)
self._load_port = ttk.Entry(bar, width=8)
self._load_port.insert(0, "COM1")
self._load_port.pack(side=tk.LEFT, padx=(2, 4))
ttk.Label(bar, text="@").pack(side=tk.LEFT)
self._load_baud = ttk.Entry(bar, width=7)
self._load_baud.insert(0, "115200")
self._load_baud.pack(side=tk.LEFT, padx=(2, 8))
ttk.Label(bar, text="Meter:").pack(side=tk.LEFT)
self._meter_addr = ttk.Entry(bar, width=20)
self._meter_addr.insert(0, "auto")
self._meter_addr.pack(side=tk.LEFT, padx=(2, 8))
self._btn_connect = ttk.Button(bar, text="Connect", command=self._connect)
self._btn_connect.pack(side=tk.LEFT, padx=2)
self._btn_setup = ttk.Button(bar, text="Setup All", command=self._setup_all, state=tk.DISABLED)
self._btn_setup.pack(side=tk.LEFT, padx=2)
self._btn_disconnect = ttk.Button(bar, text="Disconnect", command=self._disconnect, state=tk.DISABLED)
self._btn_disconnect.pack(side=tk.LEFT, padx=2)
# SAFE OFF - always visible, right side
self._btn_safe_off = tk.Button(
bar, text="SAFE OFF", bg="#cc0000", fg="white",
font=("TkDefaultFont", 13, "bold"), width=10,
command=self._safe_off, activebackground="#ff0000",
)
self._btn_safe_off.pack(side=tk.RIGHT, padx=4)
self.bind("<Escape>", lambda e: self._safe_off())
def _build_supply_controls(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="DC Supply (IT6500D)", padding=8)
frame.pack(fill=tk.X, padx=4, pady=4)
# Setpoints
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Label(row, text="Voltage (V):", width=12).pack(side=tk.LEFT)
self._sup_voltage = ttk.Entry(row, width=10)
self._sup_voltage.insert(0, "75.0")
self._sup_voltage.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set V", width=6, command=self._set_supply_voltage).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Label(row, text="Current (A):", width=12).pack(side=tk.LEFT)
self._sup_current = ttk.Entry(row, width=10)
self._sup_current.insert(0, "10.0")
self._sup_current.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set I", width=6, command=self._set_supply_current).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Button(row, text="Apply V+I", command=self._apply_supply).pack(side=tk.LEFT, padx=2)
self._btn_sup_on = ttk.Button(row, text="Output ON", command=lambda: self._send(Cmd.OUTPUT_ON))
self._btn_sup_on.pack(side=tk.LEFT, padx=2)
self._btn_sup_off = ttk.Button(row, text="Output OFF", command=lambda: self._send(Cmd.OUTPUT_OFF))
self._btn_sup_off.pack(side=tk.LEFT, padx=2)
# OVP / Slew
adv = ttk.LabelFrame(frame, text="Protection / Slew", padding=4)
adv.pack(fill=tk.X, pady=4)
row = ttk.Frame(adv)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="OVP (V):", width=12).pack(side=tk.LEFT)
self._sup_ovp = ttk.Entry(row, width=10)
self._sup_ovp.insert(0, "120.0")
self._sup_ovp.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4, command=self._set_ovp).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(adv)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Rise (s):", width=12).pack(side=tk.LEFT)
self._sup_rise = ttk.Entry(row, width=10)
self._sup_rise.insert(0, "0.1")
self._sup_rise.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send_float(Cmd.SET_RISE_TIME, self._sup_rise)).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(adv)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Fall (s):", width=12).pack(side=tk.LEFT)
self._sup_fall = ttk.Entry(row, width=10)
self._sup_fall.insert(0, "0.1")
self._sup_fall.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send_float(Cmd.SET_FALL_TIME, self._sup_fall)).pack(side=tk.LEFT, padx=2)
# Live readout
readout = ttk.LabelFrame(frame, text="Measured", padding=4)
readout.pack(fill=tk.X, pady=4)
self._sup_v_label = ttk.Label(readout, text="V: ---", font=("Consolas", 11))
self._sup_v_label.pack(anchor=tk.W)
self._sup_i_label = ttk.Label(readout, text="I: ---", font=("Consolas", 11))
self._sup_i_label.pack(anchor=tk.W)
self._sup_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11))
self._sup_p_label.pack(anchor=tk.W)
def _build_load_controls(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="DC Load (Prodigit 3366G)", padding=8)
frame.pack(fill=tk.X, padx=4, pady=4)
# Mode
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Label(row, text="Mode:", width=12).pack(side=tk.LEFT)
self._load_mode = ttk.Combobox(row, values=["CC", "CR", "CV", "CP"], width=6, state="readonly")
self._load_mode.set("CC")
self._load_mode.pack(side=tk.LEFT, padx=2)
self._load_mode.bind("<<ComboboxSelected>>", self._on_mode_change)
ttk.Button(row, text="Set Mode", command=self._set_load_mode).pack(side=tk.LEFT, padx=2)
# Value
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
self._load_unit_label = ttk.Label(row, text="Value (A):", width=12)
self._load_unit_label.pack(side=tk.LEFT)
self._load_value = ttk.Entry(row, width=10)
self._load_value.insert(0, "5.0")
self._load_value.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=6, command=self._set_load_value).pack(side=tk.LEFT, padx=2)
# On/Off
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Button(row, text="Load ON", command=lambda: self._send(Cmd.LOAD_ON)).pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Load OFF", command=lambda: self._send(Cmd.LOAD_OFF)).pack(side=tk.LEFT, padx=2)
# Slew rate
adv = ttk.LabelFrame(frame, text="Slew Rate", padding=4)
adv.pack(fill=tk.X, pady=4)
row = ttk.Frame(adv)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Rise (A/us):", width=12).pack(side=tk.LEFT)
self._load_rise = ttk.Entry(row, width=10)
self._load_rise.insert(0, "1.0")
self._load_rise.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send_float(Cmd.SET_SLEW_RISE, self._load_rise)).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(adv)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Fall (A/us):", width=12).pack(side=tk.LEFT)
self._load_fall = ttk.Entry(row, width=10)
self._load_fall.insert(0, "1.0")
self._load_fall.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send_float(Cmd.SET_SLEW_FALL, self._load_fall)).pack(side=tk.LEFT, padx=2)
# Live readout
readout = ttk.LabelFrame(frame, text="Measured", padding=4)
readout.pack(fill=tk.X, pady=4)
self._load_v_label = ttk.Label(readout, text="V: ---", font=("Consolas", 11))
self._load_v_label.pack(anchor=tk.W)
self._load_i_label = ttk.Label(readout, text="I: ---", font=("Consolas", 11))
self._load_i_label.pack(anchor=tk.W)
self._load_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11))
self._load_p_label.pack(anchor=tk.W)
def _build_meter_readout(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="Power Analyzer (HIOKI 3193-10)", padding=8)
frame.pack(fill=tk.X, padx=4, pady=4)
# Meter controls
ctrl = ttk.LabelFrame(frame, text="Settings", padding=4)
ctrl.pack(fill=tk.X, pady=4)
row = ttk.Frame(ctrl)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Wiring:", width=10).pack(side=tk.LEFT)
self._meter_wiring = ttk.Combobox(row, values=["1P2W", "1P3W", "3P3W", "3V3A", "3P4W"], width=6, state="readonly")
self._meter_wiring.set("1P2W")
self._meter_wiring.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send(Cmd.SET_WIRING, self._meter_wiring.get())).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(ctrl)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Speed:", width=10).pack(side=tk.LEFT)
self._meter_speed = ttk.Combobox(row, values=["FAST", "MID", "SLOW"], width=6, state="readonly")
self._meter_speed.set("SLOW")
self._meter_speed.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send(Cmd.SET_RESPONSE_SPEED, self._meter_speed.get())).pack(side=tk.LEFT, padx=2)
# Ch5 coupling
row = ttk.Frame(ctrl)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Ch5 coup:", width=10).pack(side=tk.LEFT)
self._ch5_coupling = ttk.Combobox(row, values=["DC", "AC", "ACDC"], width=6, state="readonly")
self._ch5_coupling.set("DC")
self._ch5_coupling.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send(Cmd.SET_COUPLING, 5, self._ch5_coupling.get())).pack(side=tk.LEFT, padx=2)
# Ch6 coupling
row = ttk.Frame(ctrl)
row.pack(fill=tk.X, pady=1)
ttk.Label(row, text="Ch6 coup:", width=10).pack(side=tk.LEFT)
self._ch6_coupling = ttk.Combobox(row, values=["DC", "AC", "ACDC"], width=6, state="readonly")
self._ch6_coupling.set("DC")
self._ch6_coupling.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4,
command=lambda: self._send(Cmd.SET_COUPLING, 6, self._ch6_coupling.get())).pack(side=tk.LEFT, padx=2)
# Input side
input_frame = ttk.LabelFrame(frame, text="Input (Ch5 - Solar)", padding=4)
input_frame.pack(fill=tk.X, pady=2)
self._m_u5 = ttk.Label(input_frame, text="U5: ---", font=("Consolas", 11))
self._m_u5.pack(anchor=tk.W)
self._m_i5 = ttk.Label(input_frame, text="I5: ---", font=("Consolas", 11))
self._m_i5.pack(anchor=tk.W)
self._m_p5 = ttk.Label(input_frame, text="P5: ---", font=("Consolas", 11))
self._m_p5.pack(anchor=tk.W)
# Output side
output_frame = ttk.LabelFrame(frame, text="Output (Ch6 - MPPT)", padding=4)
output_frame.pack(fill=tk.X, pady=2)
self._m_u6 = ttk.Label(output_frame, text="U6: ---", font=("Consolas", 11))
self._m_u6.pack(anchor=tk.W)
self._m_i6 = ttk.Label(output_frame, text="I6: ---", font=("Consolas", 11))
self._m_i6.pack(anchor=tk.W)
self._m_p6 = ttk.Label(output_frame, text="P6: ---", font=("Consolas", 11))
self._m_p6.pack(anchor=tk.W)
# Efficiency - big and bold
eff_frame = ttk.Frame(frame)
eff_frame.pack(fill=tk.X, pady=4)
self._m_eff = ttk.Label(eff_frame, text="EFF1: --- %", font=("Consolas", 16, "bold"))
self._m_eff.pack(anchor=tk.CENTER)
def _build_logging_controls(self, parent) -> None:
frame = ttk.LabelFrame(parent, text="Data Logging", padding=8)
frame.pack(fill=tk.X, padx=4, pady=4)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
ttk.Label(row, text="Interval (s):", width=12).pack(side=tk.LEFT)
self._poll_interval = ttk.Entry(row, width=6)
self._poll_interval.insert(0, "1.0")
self._poll_interval.pack(side=tk.LEFT, padx=2)
ttk.Button(row, text="Set", width=4, command=self._set_interval).pack(side=tk.LEFT, padx=2)
row = ttk.Frame(frame)
row.pack(fill=tk.X, pady=2)
self._btn_log_start = ttk.Button(row, text="Start Log", command=self._start_log)
self._btn_log_start.pack(side=tk.LEFT, padx=2)
self._btn_log_stop = ttk.Button(row, text="Stop Log", command=self._stop_log, state=tk.DISABLED)
self._btn_log_stop.pack(side=tk.LEFT, padx=2)
self._log_status = ttk.Label(frame, text="Not logging", font=("Consolas", 9))
self._log_status.pack(anchor=tk.W, pady=2)
def _build_graphs(self, parent) -> None:
self._fig = Figure(figsize=(10, 8), dpi=100)
self._fig.suptitle("MPPT Testbench Live", fontsize=12, fontweight="bold")
self._ax_power = self._fig.add_subplot(4, 1, 1)
self._ax_eff = self._fig.add_subplot(4, 1, 2)
self._ax_volt = self._fig.add_subplot(4, 1, 3)
self._ax_curr = self._fig.add_subplot(4, 1, 4)
# Power
self._ax_power.set_ylabel("Power (W)")
self._ax_power.set_title("Power", fontsize=10)
self._ax_power.grid(True, alpha=0.3)
self._ln_p5, = self._ax_power.plot([], [], label="P_in (meter)", linewidth=1.5)
self._ln_p6, = self._ax_power.plot([], [], label="P_out (meter)", linewidth=1.5)
self._ln_sp, = self._ax_power.plot([], [], label="Supply P", linewidth=1, ls="--", alpha=0.6)
self._ln_lp, = self._ax_power.plot([], [], label="Load P", linewidth=1, ls="--", alpha=0.6)
self._ax_power.legend(loc="upper left", fontsize=8)
# Efficiency
self._ax_eff.set_ylabel("Efficiency (%)")
self._ax_eff.set_title("Efficiency", fontsize=10)
self._ax_eff.grid(True, alpha=0.3)
self._ln_eff, = self._ax_eff.plot([], [], label="EFF1", linewidth=1.5, color="green")
self._ax_eff.legend(loc="upper left", fontsize=8)
# Voltage
self._ax_volt.set_ylabel("Voltage (V)")
self._ax_volt.set_title("Voltage", fontsize=10)
self._ax_volt.grid(True, alpha=0.3)
self._ln_u5, = self._ax_volt.plot([], [], label="U5 (input)", linewidth=1.5)
self._ln_u6, = self._ax_volt.plot([], [], label="U6 (output)", linewidth=1.5)
self._ax_volt.legend(loc="upper left", fontsize=8)
# Current
self._ax_curr.set_ylabel("Current (A)")
self._ax_curr.set_xlabel("Time (s)")
self._ax_curr.set_title("Current", fontsize=10)
self._ax_curr.grid(True, alpha=0.3)
self._ln_i5, = self._ax_curr.plot([], [], label="I5 (input)", linewidth=1.5)
self._ln_i6, = self._ax_curr.plot([], [], label="I6 (output)", linewidth=1.5)
self._ax_curr.legend(loc="upper left", fontsize=8)
self._fig.tight_layout()
self._canvas = FigureCanvasTkAgg(self._fig, master=parent)
self._canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
toolbar = NavigationToolbar2Tk(self._canvas, parent)
toolbar.update()
def _build_status_bar(self) -> None:
bar = ttk.Frame(self)
bar.pack(fill=tk.X, padx=4, pady=(0, 4))
self._status_label = ttk.Label(bar, text="Disconnected", font=("Consolas", 9))
self._status_label.pack(side=tk.LEFT)
# ── Connection ────────────────────────────────────────────────────
def _connect(self) -> None:
try:
supply_addr = self._supply_addr.get().strip()
if supply_addr == "auto":
supply_addr = MPPTTestbench.find_supply(None)
meter_addr = self._meter_addr.get().strip()
if meter_addr == "auto":
meter_addr = MPPTTestbench.find_meter(None)
load_port = self._load_port.get().strip()
load_baud = int(self._load_baud.get().strip())
supply = IT6500(supply_addr)
load = Prodigit3366G(load_port, baudrate=load_baud)
meter = Hioki3193(meter_addr)
self.bench = MPPTTestbench(supply, load, meter)
self.bench.supply.remote()
self.bench.load.remote()
self.worker = InstrumentWorker(self.bench, interval=1.0)
self.worker.start()
self._t0 = time.time()
self._point_count = 0
# Update UI state
self._btn_connect.config(state=tk.DISABLED)
self._btn_setup.config(state=tk.NORMAL)
self._btn_disconnect.config(state=tk.NORMAL)
self._supply_addr.config(state=tk.DISABLED)
self._load_port.config(state=tk.DISABLED)
self._load_baud.config(state=tk.DISABLED)
self._meter_addr.config(state=tk.DISABLED)
self._status_label.config(text="Connected")
self._poll()
except Exception as e:
messagebox.showerror("Connection Error", str(e))
def _disconnect(self) -> None:
self._stop_log()
if self.worker:
self.worker.stop()
self.worker = None
if self.bench:
try:
self.bench.close()
except Exception:
pass
self.bench = None
self._btn_connect.config(state=tk.NORMAL)
self._btn_setup.config(state=tk.DISABLED)
self._btn_disconnect.config(state=tk.DISABLED)
self._supply_addr.config(state=tk.NORMAL)
self._load_port.config(state=tk.NORMAL)
self._load_baud.config(state=tk.NORMAL)
self._meter_addr.config(state=tk.NORMAL)
self._status_label.config(text="Disconnected")
def _setup_all(self) -> None:
self._send(Cmd.SETUP_ALL)
def _safe_off(self) -> None:
"""Emergency stop -- bypasses command queue for minimum latency."""
if self.bench:
try:
self.bench.safe_off()
except Exception:
pass
if self.worker:
self.worker.send(Cmd.SAFE_OFF)
def _on_close(self) -> None:
self._stop_log()
if self.bench:
try:
self.bench.safe_off()
except Exception:
pass
self._disconnect()
self.destroy()
# ── Polling / Data Update ─────────────────────────────────────────
def _poll(self) -> None:
"""Called periodically to pull data from the worker and update UI."""
if not self.worker:
return
data = self.worker.get_data()
if data:
error = data.get("_error")
if error:
self._status_label.config(text=f"Error: {error}")
else:
self._update_readouts(data)
self._update_graphs(data)
self._log_data(data)
self._point_count += 1
elapsed = time.time() - self._t0
mins, secs = divmod(int(elapsed), 60)
hrs, mins = divmod(mins, 60)
self._status_label.config(
text=f"Connected | {self._point_count} pts | {hrs:02d}:{mins:02d}:{secs:02d}"
)
self.after(POLL_MS, self._poll)
def _update_readouts(self, data: dict) -> None:
"""Update all numeric labels from measurement data."""
# Supply
self._sup_v_label.config(text=f"V: {_fmt(data['supply_V'])} V")
self._sup_i_label.config(text=f"I: {_fmt(data['supply_I'])} A")
self._sup_p_label.config(text=f"P: {_fmt(data['supply_P'])} W")
# Load
self._load_v_label.config(text=f"V: {_fmt(data['load_V'])} V")
self._load_i_label.config(text=f"I: {_fmt(data['load_I'])} A")
self._load_p_label.config(text=f"P: {_fmt(data['load_P'])} W")
# Meter
self._m_u5.config(text=f"U5: {_fmt_eng(data['meter_U5'])} V")
self._m_i5.config(text=f"I5: {_fmt_eng(data['meter_I5'])} A")
self._m_p5.config(text=f"P5: {_fmt_eng(data['meter_P5'])} W")
self._m_u6.config(text=f"U6: {_fmt_eng(data['meter_U6'])} V")
self._m_i6.config(text=f"I6: {_fmt_eng(data['meter_I6'])} A")
self._m_p6.config(text=f"P6: {_fmt_eng(data['meter_P6'])} W")
eff = data['meter_EFF1']
if abs(eff) >= ERROR_THRESHOLD:
self._m_eff.config(text="EFF1: --- %")
else:
self._m_eff.config(text=f"EFF1: {eff:.2f} %")
def _update_graphs(self, data: dict) -> None:
"""Append data to series and redraw graphs."""
now = time.time() - self._t0
self._timestamps.append(now)
for key in self._series:
self._series[key].append(_clean(data.get(key, 0.0)))
t = list(self._timestamps)
self._ln_p5.set_data(t, list(self._series["meter_P5"]))
self._ln_p6.set_data(t, list(self._series["meter_P6"]))
self._ln_sp.set_data(t, list(self._series["supply_P"]))
self._ln_lp.set_data(t, list(self._series["load_P"]))
self._ln_eff.set_data(t, list(self._series["meter_EFF1"]))
self._ln_u5.set_data(t, list(self._series["meter_U5"]))
self._ln_u6.set_data(t, list(self._series["meter_U6"]))
self._ln_i5.set_data(t, list(self._series["meter_I5"]))
self._ln_i6.set_data(t, list(self._series["meter_I6"]))
for ax in [self._ax_power, self._ax_eff, self._ax_volt, self._ax_curr]:
ax.relim()
ax.autoscale_view()
self._canvas.draw_idle()
# ── Command Helpers ───────────────────────────────────────────────
def _send(self, cmd: Cmd, *args) -> None:
"""Send a command to the worker thread."""
if self.worker:
self.worker.send(cmd, *args)
def _send_float(self, cmd: Cmd, entry: ttk.Entry) -> None:
"""Parse an entry as float and send to worker."""
try:
val = float(entry.get())
self._send(cmd, val)
except ValueError:
entry.config(foreground="red")
self.after(1000, lambda: entry.config(foreground=""))
def _set_supply_voltage(self) -> None:
self._send_float(Cmd.SET_VOLTAGE, self._sup_voltage)
def _set_supply_current(self) -> None:
self._send_float(Cmd.SET_CURRENT, self._sup_current)
def _apply_supply(self) -> None:
try:
v = float(self._sup_voltage.get())
i = float(self._sup_current.get())
self._send(Cmd.APPLY, v, i)
except ValueError:
pass
def _set_ovp(self) -> None:
self._send_float(Cmd.SET_OVP, self._sup_ovp)
def _set_load_mode(self) -> None:
self._send(Cmd.SET_MODE, self._load_mode.get())
def _set_load_value(self) -> None:
try:
val = float(self._load_value.get())
mode = self._load_mode.get()
self._send(Cmd.SET_MODE_VALUE, mode, val)
except ValueError:
self._load_value.config(foreground="red")
self.after(1000, lambda: self._load_value.config(foreground=""))
def _on_mode_change(self, _event=None) -> None:
"""Update the value label units when load mode changes."""
units = {"CC": "A", "CR": "\u03a9", "CV": "V", "CP": "W"}
mode = self._load_mode.get()
self._load_unit_label.config(text=f"Value ({units.get(mode, '?')}):")
def _set_interval(self) -> None:
try:
val = float(self._poll_interval.get())
if val < 0.2:
val = 0.2
self._send(Cmd.SET_INTERVAL, val)
except ValueError:
pass
# ── Logging ───────────────────────────────────────────────────────
_LOG_COLUMNS = [
"timestamp",
"supply_V", "supply_I", "supply_P",
"load_V", "load_I", "load_P",
"meter_U5", "meter_I5", "meter_P5",
"meter_U6", "meter_I6", "meter_P6",
"meter_EFF1",
]
def _start_log(self) -> None:
default_name = time.strftime("data_%Y%m%d_%H%M%S.csv")
path = filedialog.asksaveasfilename(
defaultextension=".csv",
filetypes=[("CSV files", "*.csv")],
initialfile=default_name,
)
if not path:
return
self._log_file = open(path, "w", newline="")
self._log_writer = csv.writer(self._log_file)
self._log_writer.writerow(self._LOG_COLUMNS)
self._log_count = 0
self._btn_log_start.config(state=tk.DISABLED)
self._btn_log_stop.config(state=tk.NORMAL)
self._log_status.config(text=f"Logging to {path}")
def _stop_log(self) -> None:
if self._log_file:
self._log_file.close()
self._log_file = None
self._log_writer = None
self._btn_log_start.config(state=tk.NORMAL)
self._btn_log_stop.config(state=tk.DISABLED)
self._log_status.config(text=f"Stopped ({self._log_count} samples)")
def _log_data(self, data: dict) -> None:
if not self._log_writer:
return
row = [time.strftime("%Y-%m-%d %H:%M:%S")]
for col in self._LOG_COLUMNS[1:]:
row.append(f"{data.get(col, 0.0):.6f}")
self._log_writer.writerow(row)
self._log_file.flush()
self._log_count += 1
self._log_status.config(text=f"Logging... {self._log_count} samples")
def main() -> None:
app = TestbenchGUI()
app.mainloop()
if __name__ == "__main__":
main()