From e2be7109b75b6d969cbd4c1bdbcdf31038d4c707 Mon Sep 17 00:00:00 2001 From: grabowski Date: Wed, 11 Mar 2026 13:42:07 +0700 Subject: [PATCH] Add GUI control panel with live measurements and full instrument control New `mppt-gui` command launches a tkinter app with: - Live numeric readouts for all 13 measurement channels - 4-panel real-time matplotlib graphs (power, efficiency, voltage, current) - Full supply control: voltage, current, OVP, slew, output on/off - Full load control: mode (CC/CR/CV/CP), setpoint, slew, load on/off - Meter settings: wiring mode, response speed, coupling per channel - CSV data logging with start/stop control - Red SAFE OFF button (also Escape key) bypasses command queue - Threaded architecture: instrument I/O never blocks the GUI - Fix apply() silently failing: use set_current + set_voltage instead Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 1 + testbench/bench.py | 6 +- testbench/gui.py | 714 +++++++++++++++++++++++++++++++++++++++ testbench/gui_workers.py | 190 +++++++++++ 4 files changed, 909 insertions(+), 2 deletions(-) create mode 100644 testbench/gui.py create mode 100644 testbench/gui_workers.py diff --git a/pyproject.toml b/pyproject.toml index 45c605a..315bcfe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,3 +25,4 @@ packages = ["testbench"] [project.scripts] mppt = "testbench.cli:main" +mppt-gui = "testbench.gui:main" diff --git a/testbench/bench.py b/testbench/bench.py index c79e023..8dbd8d7 100644 --- a/testbench/bench.py +++ b/testbench/bench.py @@ -384,7 +384,8 @@ class MPPTTestbench: elif i_start > i_stop and i_step > 0: i_step = -i_step - self.supply.apply(voltage, current_limit) + self.supply.set_current(current_limit) + self.supply.set_voltage(voltage) self.supply.output_on() self.load.set_mode("CC") self.load.set_cc_current(i_start) @@ -439,7 +440,8 @@ class MPPTTestbench: Dict with avg_input_power, avg_output_power, avg_efficiency, plus individual supply/load readings. """ - self.supply.apply(voltage, current_limit) + self.supply.set_current(current_limit) + self.supply.set_voltage(voltage) self.supply.output_on() time.sleep(settle_time) diff --git a/testbench/gui.py b/testbench/gui.py new file mode 100644 index 0000000..2f107ba --- /dev/null +++ b/testbench/gui.py @@ -0,0 +1,714 @@ +"""MPPT Testbench GUI -- live measurements + full instrument control. + +Tkinter app with embedded matplotlib graphs and threaded instrument I/O. +""" + +from __future__ import annotations + +import csv +import sys +import time +import tkinter as tk +from tkinter import ttk, messagebox, filedialog +from collections import deque + +import matplotlib +matplotlib.use("TkAgg") +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk +from matplotlib.figure import Figure + +from it6500.driver import IT6500 +from prodigit3366g.driver import Prodigit3366G +from hioki3193.driver import Hioki3193 +from testbench.bench import MPPTTestbench +from testbench.gui_workers import InstrumentWorker, Cmd + + +ERROR_THRESHOLD = 1e90 +POLL_MS = 200 # GUI poll interval for checking worker data + + +def _fmt(val: float, decimals: int = 4) -> str: + """Format a measurement value, showing '---' for error codes.""" + if abs(val) >= ERROR_THRESHOLD: + return "---" + return f"{val:+.{decimals}f}" + + +def _fmt_eng(val: float) -> str: + """Format in engineering notation, showing '---' for error codes.""" + if abs(val) >= ERROR_THRESHOLD: + return "---" + return f"{val:+.4E}" + + +def _clean(val: float) -> float: + """Replace HIOKI error codes with NaN for plotting.""" + return float("nan") if abs(val) >= ERROR_THRESHOLD else val + + +class TestbenchGUI(tk.Tk): + """Main GUI window.""" + + def __init__(self) -> None: + super().__init__() + self.title("MPPT Testbench Control Panel") + self.geometry("1500x950") + self.minsize(1200, 800) + + self.bench: MPPTTestbench | None = None + self.worker: InstrumentWorker | None = None + self._log_file = None + self._log_writer = None + self._log_count = 0 + self._point_count = 0 + self._t0 = time.time() + + # Graph data + self._history = 300 + self._timestamps: deque[float] = deque(maxlen=self._history) + self._series: dict[str, deque[float]] = { + k: deque(maxlen=self._history) + for k in [ + "supply_P", "load_P", + "meter_P5", "meter_P6", "meter_EFF1", + "meter_U5", "meter_U6", + "meter_I5", "meter_I6", + ] + } + + self._build_ui() + self.protocol("WM_DELETE_WINDOW", self._on_close) + + # ── UI Construction ─────────────────────────────────────────────── + + def _build_ui(self) -> None: + # Top bar + self._build_connection_bar() + # Main content: left controls + right graphs + content = ttk.PanedWindow(self, orient=tk.HORIZONTAL) + content.pack(fill=tk.BOTH, expand=True, padx=4, pady=4) + # Left panel (controls + readout) with scrollbar + left_outer = ttk.Frame(content, width=340) + left_canvas = tk.Canvas(left_outer, width=320, highlightthickness=0) + left_scrollbar = ttk.Scrollbar(left_outer, orient=tk.VERTICAL, command=left_canvas.yview) + self._left_frame = ttk.Frame(left_canvas) + self._left_frame.bind("", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all"))) + left_canvas.create_window((0, 0), window=self._left_frame, anchor="nw") + left_canvas.configure(yscrollcommand=left_scrollbar.set) + left_canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + left_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + # Mouse wheel scrolling + def _on_mousewheel(event): + left_canvas.yview_scroll(int(-1 * (event.delta / 120)), "units") + left_canvas.bind_all("", _on_mousewheel) + content.add(left_outer, weight=0) + self._build_supply_controls(self._left_frame) + self._build_load_controls(self._left_frame) + self._build_meter_readout(self._left_frame) + self._build_logging_controls(self._left_frame) + # Right panel (graphs) + right_frame = ttk.Frame(content) + content.add(right_frame, weight=1) + self._build_graphs(right_frame) + # Status bar + self._build_status_bar() + + def _build_connection_bar(self) -> None: + bar = ttk.Frame(self) + bar.pack(fill=tk.X, padx=4, pady=(4, 0)) + + # Connection fields + ttk.Label(bar, text="Supply:").pack(side=tk.LEFT) + self._supply_addr = ttk.Entry(bar, width=20) + self._supply_addr.insert(0, "auto") + self._supply_addr.pack(side=tk.LEFT, padx=(2, 8)) + + ttk.Label(bar, text="Load:").pack(side=tk.LEFT) + self._load_port = ttk.Entry(bar, width=8) + self._load_port.insert(0, "COM1") + self._load_port.pack(side=tk.LEFT, padx=(2, 4)) + ttk.Label(bar, text="@").pack(side=tk.LEFT) + self._load_baud = ttk.Entry(bar, width=7) + self._load_baud.insert(0, "115200") + self._load_baud.pack(side=tk.LEFT, padx=(2, 8)) + + ttk.Label(bar, text="Meter:").pack(side=tk.LEFT) + self._meter_addr = ttk.Entry(bar, width=20) + self._meter_addr.insert(0, "auto") + self._meter_addr.pack(side=tk.LEFT, padx=(2, 8)) + + self._btn_connect = ttk.Button(bar, text="Connect", command=self._connect) + self._btn_connect.pack(side=tk.LEFT, padx=2) + self._btn_setup = ttk.Button(bar, text="Setup All", command=self._setup_all, state=tk.DISABLED) + self._btn_setup.pack(side=tk.LEFT, padx=2) + self._btn_disconnect = ttk.Button(bar, text="Disconnect", command=self._disconnect, state=tk.DISABLED) + self._btn_disconnect.pack(side=tk.LEFT, padx=2) + + # SAFE OFF - always visible, right side + self._btn_safe_off = tk.Button( + bar, text="SAFE OFF", bg="#cc0000", fg="white", + font=("TkDefaultFont", 13, "bold"), width=10, + command=self._safe_off, activebackground="#ff0000", + ) + self._btn_safe_off.pack(side=tk.RIGHT, padx=4) + self.bind("", lambda e: self._safe_off()) + + def _build_supply_controls(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="DC Supply (IT6500D)", padding=8) + frame.pack(fill=tk.X, padx=4, pady=4) + + # Setpoints + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text="Voltage (V):", width=12).pack(side=tk.LEFT) + self._sup_voltage = ttk.Entry(row, width=10) + self._sup_voltage.insert(0, "75.0") + self._sup_voltage.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set V", width=6, command=self._set_supply_voltage).pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text="Current (A):", width=12).pack(side=tk.LEFT) + self._sup_current = ttk.Entry(row, width=10) + self._sup_current.insert(0, "10.0") + self._sup_current.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set I", width=6, command=self._set_supply_current).pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Button(row, text="Apply V+I", command=self._apply_supply).pack(side=tk.LEFT, padx=2) + self._btn_sup_on = ttk.Button(row, text="Output ON", command=lambda: self._send(Cmd.OUTPUT_ON)) + self._btn_sup_on.pack(side=tk.LEFT, padx=2) + self._btn_sup_off = ttk.Button(row, text="Output OFF", command=lambda: self._send(Cmd.OUTPUT_OFF)) + self._btn_sup_off.pack(side=tk.LEFT, padx=2) + + # OVP / Slew + adv = ttk.LabelFrame(frame, text="Protection / Slew", padding=4) + adv.pack(fill=tk.X, pady=4) + row = ttk.Frame(adv) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="OVP (V):", width=12).pack(side=tk.LEFT) + self._sup_ovp = ttk.Entry(row, width=10) + self._sup_ovp.insert(0, "120.0") + self._sup_ovp.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, command=self._set_ovp).pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(adv) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Rise (s):", width=12).pack(side=tk.LEFT) + self._sup_rise = ttk.Entry(row, width=10) + self._sup_rise.insert(0, "0.1") + self._sup_rise.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send_float(Cmd.SET_RISE_TIME, self._sup_rise)).pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(adv) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Fall (s):", width=12).pack(side=tk.LEFT) + self._sup_fall = ttk.Entry(row, width=10) + self._sup_fall.insert(0, "0.1") + self._sup_fall.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send_float(Cmd.SET_FALL_TIME, self._sup_fall)).pack(side=tk.LEFT, padx=2) + + # Live readout + readout = ttk.LabelFrame(frame, text="Measured", padding=4) + readout.pack(fill=tk.X, pady=4) + self._sup_v_label = ttk.Label(readout, text="V: ---", font=("Consolas", 11)) + self._sup_v_label.pack(anchor=tk.W) + self._sup_i_label = ttk.Label(readout, text="I: ---", font=("Consolas", 11)) + self._sup_i_label.pack(anchor=tk.W) + self._sup_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11)) + self._sup_p_label.pack(anchor=tk.W) + + def _build_load_controls(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="DC Load (Prodigit 3366G)", padding=8) + frame.pack(fill=tk.X, padx=4, pady=4) + + # Mode + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text="Mode:", width=12).pack(side=tk.LEFT) + self._load_mode = ttk.Combobox(row, values=["CC", "CR", "CV", "CP"], width=6, state="readonly") + self._load_mode.set("CC") + self._load_mode.pack(side=tk.LEFT, padx=2) + self._load_mode.bind("<>", self._on_mode_change) + ttk.Button(row, text="Set Mode", command=self._set_load_mode).pack(side=tk.LEFT, padx=2) + + # Value + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + self._load_unit_label = ttk.Label(row, text="Value (A):", width=12) + self._load_unit_label.pack(side=tk.LEFT) + self._load_value = ttk.Entry(row, width=10) + self._load_value.insert(0, "5.0") + self._load_value.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=6, command=self._set_load_value).pack(side=tk.LEFT, padx=2) + + # On/Off + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Button(row, text="Load ON", command=lambda: self._send(Cmd.LOAD_ON)).pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Load OFF", command=lambda: self._send(Cmd.LOAD_OFF)).pack(side=tk.LEFT, padx=2) + + # Slew rate + adv = ttk.LabelFrame(frame, text="Slew Rate", padding=4) + adv.pack(fill=tk.X, pady=4) + row = ttk.Frame(adv) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Rise (A/us):", width=12).pack(side=tk.LEFT) + self._load_rise = ttk.Entry(row, width=10) + self._load_rise.insert(0, "1.0") + self._load_rise.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send_float(Cmd.SET_SLEW_RISE, self._load_rise)).pack(side=tk.LEFT, padx=2) + row = ttk.Frame(adv) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Fall (A/us):", width=12).pack(side=tk.LEFT) + self._load_fall = ttk.Entry(row, width=10) + self._load_fall.insert(0, "1.0") + self._load_fall.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send_float(Cmd.SET_SLEW_FALL, self._load_fall)).pack(side=tk.LEFT, padx=2) + + # Live readout + readout = ttk.LabelFrame(frame, text="Measured", padding=4) + readout.pack(fill=tk.X, pady=4) + self._load_v_label = ttk.Label(readout, text="V: ---", font=("Consolas", 11)) + self._load_v_label.pack(anchor=tk.W) + self._load_i_label = ttk.Label(readout, text="I: ---", font=("Consolas", 11)) + self._load_i_label.pack(anchor=tk.W) + self._load_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11)) + self._load_p_label.pack(anchor=tk.W) + + def _build_meter_readout(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="Power Analyzer (HIOKI 3193-10)", padding=8) + frame.pack(fill=tk.X, padx=4, pady=4) + + # Meter controls + ctrl = ttk.LabelFrame(frame, text="Settings", padding=4) + ctrl.pack(fill=tk.X, pady=4) + + row = ttk.Frame(ctrl) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Wiring:", width=10).pack(side=tk.LEFT) + self._meter_wiring = ttk.Combobox(row, values=["1P2W", "1P3W", "3P3W", "3V3A", "3P4W"], width=6, state="readonly") + self._meter_wiring.set("1P2W") + self._meter_wiring.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send(Cmd.SET_WIRING, self._meter_wiring.get())).pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(ctrl) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Speed:", width=10).pack(side=tk.LEFT) + self._meter_speed = ttk.Combobox(row, values=["FAST", "MID", "SLOW"], width=6, state="readonly") + self._meter_speed.set("SLOW") + self._meter_speed.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send(Cmd.SET_RESPONSE_SPEED, self._meter_speed.get())).pack(side=tk.LEFT, padx=2) + + # Ch5 coupling + row = ttk.Frame(ctrl) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Ch5 coup:", width=10).pack(side=tk.LEFT) + self._ch5_coupling = ttk.Combobox(row, values=["DC", "AC", "ACDC"], width=6, state="readonly") + self._ch5_coupling.set("DC") + self._ch5_coupling.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send(Cmd.SET_COUPLING, 5, self._ch5_coupling.get())).pack(side=tk.LEFT, padx=2) + + # Ch6 coupling + row = ttk.Frame(ctrl) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="Ch6 coup:", width=10).pack(side=tk.LEFT) + self._ch6_coupling = ttk.Combobox(row, values=["DC", "AC", "ACDC"], width=6, state="readonly") + self._ch6_coupling.set("DC") + self._ch6_coupling.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, + command=lambda: self._send(Cmd.SET_COUPLING, 6, self._ch6_coupling.get())).pack(side=tk.LEFT, padx=2) + + # Input side + input_frame = ttk.LabelFrame(frame, text="Input (Ch5 - Solar)", padding=4) + input_frame.pack(fill=tk.X, pady=2) + self._m_u5 = ttk.Label(input_frame, text="U5: ---", font=("Consolas", 11)) + self._m_u5.pack(anchor=tk.W) + self._m_i5 = ttk.Label(input_frame, text="I5: ---", font=("Consolas", 11)) + self._m_i5.pack(anchor=tk.W) + self._m_p5 = ttk.Label(input_frame, text="P5: ---", font=("Consolas", 11)) + self._m_p5.pack(anchor=tk.W) + + # Output side + output_frame = ttk.LabelFrame(frame, text="Output (Ch6 - MPPT)", padding=4) + output_frame.pack(fill=tk.X, pady=2) + self._m_u6 = ttk.Label(output_frame, text="U6: ---", font=("Consolas", 11)) + self._m_u6.pack(anchor=tk.W) + self._m_i6 = ttk.Label(output_frame, text="I6: ---", font=("Consolas", 11)) + self._m_i6.pack(anchor=tk.W) + self._m_p6 = ttk.Label(output_frame, text="P6: ---", font=("Consolas", 11)) + self._m_p6.pack(anchor=tk.W) + + # Efficiency - big and bold + eff_frame = ttk.Frame(frame) + eff_frame.pack(fill=tk.X, pady=4) + self._m_eff = ttk.Label(eff_frame, text="EFF1: --- %", font=("Consolas", 16, "bold")) + self._m_eff.pack(anchor=tk.CENTER) + + def _build_logging_controls(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="Data Logging", padding=8) + frame.pack(fill=tk.X, padx=4, pady=4) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text="Interval (s):", width=12).pack(side=tk.LEFT) + self._poll_interval = ttk.Entry(row, width=6) + self._poll_interval.insert(0, "1.0") + self._poll_interval.pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Set", width=4, command=self._set_interval).pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + self._btn_log_start = ttk.Button(row, text="Start Log", command=self._start_log) + self._btn_log_start.pack(side=tk.LEFT, padx=2) + self._btn_log_stop = ttk.Button(row, text="Stop Log", command=self._stop_log, state=tk.DISABLED) + self._btn_log_stop.pack(side=tk.LEFT, padx=2) + + self._log_status = ttk.Label(frame, text="Not logging", font=("Consolas", 9)) + self._log_status.pack(anchor=tk.W, pady=2) + + def _build_graphs(self, parent) -> None: + self._fig = Figure(figsize=(10, 8), dpi=100) + self._fig.suptitle("MPPT Testbench Live", fontsize=12, fontweight="bold") + + self._ax_power = self._fig.add_subplot(4, 1, 1) + self._ax_eff = self._fig.add_subplot(4, 1, 2) + self._ax_volt = self._fig.add_subplot(4, 1, 3) + self._ax_curr = self._fig.add_subplot(4, 1, 4) + + # Power + self._ax_power.set_ylabel("Power (W)") + self._ax_power.set_title("Power", fontsize=10) + self._ax_power.grid(True, alpha=0.3) + self._ln_p5, = self._ax_power.plot([], [], label="P_in (meter)", linewidth=1.5) + self._ln_p6, = self._ax_power.plot([], [], label="P_out (meter)", linewidth=1.5) + self._ln_sp, = self._ax_power.plot([], [], label="Supply P", linewidth=1, ls="--", alpha=0.6) + self._ln_lp, = self._ax_power.plot([], [], label="Load P", linewidth=1, ls="--", alpha=0.6) + self._ax_power.legend(loc="upper left", fontsize=8) + + # Efficiency + self._ax_eff.set_ylabel("Efficiency (%)") + self._ax_eff.set_title("Efficiency", fontsize=10) + self._ax_eff.grid(True, alpha=0.3) + self._ln_eff, = self._ax_eff.plot([], [], label="EFF1", linewidth=1.5, color="green") + self._ax_eff.legend(loc="upper left", fontsize=8) + + # Voltage + self._ax_volt.set_ylabel("Voltage (V)") + self._ax_volt.set_title("Voltage", fontsize=10) + self._ax_volt.grid(True, alpha=0.3) + self._ln_u5, = self._ax_volt.plot([], [], label="U5 (input)", linewidth=1.5) + self._ln_u6, = self._ax_volt.plot([], [], label="U6 (output)", linewidth=1.5) + self._ax_volt.legend(loc="upper left", fontsize=8) + + # Current + self._ax_curr.set_ylabel("Current (A)") + self._ax_curr.set_xlabel("Time (s)") + self._ax_curr.set_title("Current", fontsize=10) + self._ax_curr.grid(True, alpha=0.3) + self._ln_i5, = self._ax_curr.plot([], [], label="I5 (input)", linewidth=1.5) + self._ln_i6, = self._ax_curr.plot([], [], label="I6 (output)", linewidth=1.5) + self._ax_curr.legend(loc="upper left", fontsize=8) + + self._fig.tight_layout() + + self._canvas = FigureCanvasTkAgg(self._fig, master=parent) + self._canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) + toolbar = NavigationToolbar2Tk(self._canvas, parent) + toolbar.update() + + def _build_status_bar(self) -> None: + bar = ttk.Frame(self) + bar.pack(fill=tk.X, padx=4, pady=(0, 4)) + self._status_label = ttk.Label(bar, text="Disconnected", font=("Consolas", 9)) + self._status_label.pack(side=tk.LEFT) + + # ── Connection ──────────────────────────────────────────────────── + + def _connect(self) -> None: + try: + supply_addr = self._supply_addr.get().strip() + if supply_addr == "auto": + supply_addr = MPPTTestbench.find_supply(None) + meter_addr = self._meter_addr.get().strip() + if meter_addr == "auto": + meter_addr = MPPTTestbench.find_meter(None) + load_port = self._load_port.get().strip() + load_baud = int(self._load_baud.get().strip()) + + supply = IT6500(supply_addr) + load = Prodigit3366G(load_port, baudrate=load_baud) + meter = Hioki3193(meter_addr) + + self.bench = MPPTTestbench(supply, load, meter) + self.bench.supply.remote() + self.bench.load.remote() + + self.worker = InstrumentWorker(self.bench, interval=1.0) + self.worker.start() + + self._t0 = time.time() + self._point_count = 0 + + # Update UI state + self._btn_connect.config(state=tk.DISABLED) + self._btn_setup.config(state=tk.NORMAL) + self._btn_disconnect.config(state=tk.NORMAL) + self._supply_addr.config(state=tk.DISABLED) + self._load_port.config(state=tk.DISABLED) + self._load_baud.config(state=tk.DISABLED) + self._meter_addr.config(state=tk.DISABLED) + + self._status_label.config(text="Connected") + self._poll() + + except Exception as e: + messagebox.showerror("Connection Error", str(e)) + + def _disconnect(self) -> None: + self._stop_log() + if self.worker: + self.worker.stop() + self.worker = None + if self.bench: + try: + self.bench.close() + except Exception: + pass + self.bench = None + + self._btn_connect.config(state=tk.NORMAL) + self._btn_setup.config(state=tk.DISABLED) + self._btn_disconnect.config(state=tk.DISABLED) + self._supply_addr.config(state=tk.NORMAL) + self._load_port.config(state=tk.NORMAL) + self._load_baud.config(state=tk.NORMAL) + self._meter_addr.config(state=tk.NORMAL) + self._status_label.config(text="Disconnected") + + def _setup_all(self) -> None: + self._send(Cmd.SETUP_ALL) + + def _safe_off(self) -> None: + """Emergency stop -- bypasses command queue for minimum latency.""" + if self.bench: + try: + self.bench.safe_off() + except Exception: + pass + if self.worker: + self.worker.send(Cmd.SAFE_OFF) + + def _on_close(self) -> None: + self._stop_log() + if self.bench: + try: + self.bench.safe_off() + except Exception: + pass + self._disconnect() + self.destroy() + + # ── Polling / Data Update ───────────────────────────────────────── + + def _poll(self) -> None: + """Called periodically to pull data from the worker and update UI.""" + if not self.worker: + return + + data = self.worker.get_data() + if data: + error = data.get("_error") + if error: + self._status_label.config(text=f"Error: {error}") + else: + self._update_readouts(data) + self._update_graphs(data) + self._log_data(data) + self._point_count += 1 + + elapsed = time.time() - self._t0 + mins, secs = divmod(int(elapsed), 60) + hrs, mins = divmod(mins, 60) + self._status_label.config( + text=f"Connected | {self._point_count} pts | {hrs:02d}:{mins:02d}:{secs:02d}" + ) + + self.after(POLL_MS, self._poll) + + def _update_readouts(self, data: dict) -> None: + """Update all numeric labels from measurement data.""" + # Supply + self._sup_v_label.config(text=f"V: {_fmt(data['supply_V'])} V") + self._sup_i_label.config(text=f"I: {_fmt(data['supply_I'])} A") + self._sup_p_label.config(text=f"P: {_fmt(data['supply_P'])} W") + + # Load + self._load_v_label.config(text=f"V: {_fmt(data['load_V'])} V") + self._load_i_label.config(text=f"I: {_fmt(data['load_I'])} A") + self._load_p_label.config(text=f"P: {_fmt(data['load_P'])} W") + + # Meter + self._m_u5.config(text=f"U5: {_fmt_eng(data['meter_U5'])} V") + self._m_i5.config(text=f"I5: {_fmt_eng(data['meter_I5'])} A") + self._m_p5.config(text=f"P5: {_fmt_eng(data['meter_P5'])} W") + self._m_u6.config(text=f"U6: {_fmt_eng(data['meter_U6'])} V") + self._m_i6.config(text=f"I6: {_fmt_eng(data['meter_I6'])} A") + self._m_p6.config(text=f"P6: {_fmt_eng(data['meter_P6'])} W") + + eff = data['meter_EFF1'] + if abs(eff) >= ERROR_THRESHOLD: + self._m_eff.config(text="EFF1: --- %") + else: + self._m_eff.config(text=f"EFF1: {eff:.2f} %") + + def _update_graphs(self, data: dict) -> None: + """Append data to series and redraw graphs.""" + now = time.time() - self._t0 + self._timestamps.append(now) + + for key in self._series: + self._series[key].append(_clean(data.get(key, 0.0))) + + t = list(self._timestamps) + self._ln_p5.set_data(t, list(self._series["meter_P5"])) + self._ln_p6.set_data(t, list(self._series["meter_P6"])) + self._ln_sp.set_data(t, list(self._series["supply_P"])) + self._ln_lp.set_data(t, list(self._series["load_P"])) + self._ln_eff.set_data(t, list(self._series["meter_EFF1"])) + self._ln_u5.set_data(t, list(self._series["meter_U5"])) + self._ln_u6.set_data(t, list(self._series["meter_U6"])) + self._ln_i5.set_data(t, list(self._series["meter_I5"])) + self._ln_i6.set_data(t, list(self._series["meter_I6"])) + + for ax in [self._ax_power, self._ax_eff, self._ax_volt, self._ax_curr]: + ax.relim() + ax.autoscale_view() + + self._canvas.draw_idle() + + # ── Command Helpers ─────────────────────────────────────────────── + + def _send(self, cmd: Cmd, *args) -> None: + """Send a command to the worker thread.""" + if self.worker: + self.worker.send(cmd, *args) + + def _send_float(self, cmd: Cmd, entry: ttk.Entry) -> None: + """Parse an entry as float and send to worker.""" + try: + val = float(entry.get()) + self._send(cmd, val) + except ValueError: + entry.config(foreground="red") + self.after(1000, lambda: entry.config(foreground="")) + + def _set_supply_voltage(self) -> None: + self._send_float(Cmd.SET_VOLTAGE, self._sup_voltage) + + def _set_supply_current(self) -> None: + self._send_float(Cmd.SET_CURRENT, self._sup_current) + + def _apply_supply(self) -> None: + try: + v = float(self._sup_voltage.get()) + i = float(self._sup_current.get()) + self._send(Cmd.APPLY, v, i) + except ValueError: + pass + + def _set_ovp(self) -> None: + self._send_float(Cmd.SET_OVP, self._sup_ovp) + + def _set_load_mode(self) -> None: + self._send(Cmd.SET_MODE, self._load_mode.get()) + + def _set_load_value(self) -> None: + try: + val = float(self._load_value.get()) + mode = self._load_mode.get() + self._send(Cmd.SET_MODE_VALUE, mode, val) + except ValueError: + self._load_value.config(foreground="red") + self.after(1000, lambda: self._load_value.config(foreground="")) + + def _on_mode_change(self, _event=None) -> None: + """Update the value label units when load mode changes.""" + units = {"CC": "A", "CR": "\u03a9", "CV": "V", "CP": "W"} + mode = self._load_mode.get() + self._load_unit_label.config(text=f"Value ({units.get(mode, '?')}):") + + def _set_interval(self) -> None: + try: + val = float(self._poll_interval.get()) + if val < 0.2: + val = 0.2 + self._send(Cmd.SET_INTERVAL, val) + except ValueError: + pass + + # ── Logging ─────────────────────────────────────────────────────── + + _LOG_COLUMNS = [ + "timestamp", + "supply_V", "supply_I", "supply_P", + "load_V", "load_I", "load_P", + "meter_U5", "meter_I5", "meter_P5", + "meter_U6", "meter_I6", "meter_P6", + "meter_EFF1", + ] + + def _start_log(self) -> None: + default_name = time.strftime("data_%Y%m%d_%H%M%S.csv") + path = filedialog.asksaveasfilename( + defaultextension=".csv", + filetypes=[("CSV files", "*.csv")], + initialfile=default_name, + ) + if not path: + return + self._log_file = open(path, "w", newline="") + self._log_writer = csv.writer(self._log_file) + self._log_writer.writerow(self._LOG_COLUMNS) + self._log_count = 0 + self._btn_log_start.config(state=tk.DISABLED) + self._btn_log_stop.config(state=tk.NORMAL) + self._log_status.config(text=f"Logging to {path}") + + def _stop_log(self) -> None: + if self._log_file: + self._log_file.close() + self._log_file = None + self._log_writer = None + self._btn_log_start.config(state=tk.NORMAL) + self._btn_log_stop.config(state=tk.DISABLED) + self._log_status.config(text=f"Stopped ({self._log_count} samples)") + + def _log_data(self, data: dict) -> None: + if not self._log_writer: + return + row = [time.strftime("%Y-%m-%d %H:%M:%S")] + for col in self._LOG_COLUMNS[1:]: + row.append(f"{data.get(col, 0.0):.6f}") + self._log_writer.writerow(row) + self._log_file.flush() + self._log_count += 1 + self._log_status.config(text=f"Logging... {self._log_count} samples") + + +def main() -> None: + app = TestbenchGUI() + app.mainloop() + + +if __name__ == "__main__": + main() diff --git a/testbench/gui_workers.py b/testbench/gui_workers.py new file mode 100644 index 0000000..1cd0aa1 --- /dev/null +++ b/testbench/gui_workers.py @@ -0,0 +1,190 @@ +"""Background worker thread for instrument I/O. + +Keeps all VISA/serial communication off the GUI thread so tkinter +never freezes during instrument queries. +""" + +from __future__ import annotations + +import queue +import threading +import time +from enum import Enum, auto + + +class Cmd(Enum): + """Commands sent from GUI to worker thread.""" + + # Supply + SET_VOLTAGE = auto() + SET_CURRENT = auto() + APPLY = auto() + OUTPUT_ON = auto() + OUTPUT_OFF = auto() + SET_OVP = auto() + SET_RISE_TIME = auto() + SET_FALL_TIME = auto() + + # Load + SET_MODE = auto() + SET_MODE_VALUE = auto() + LOAD_ON = auto() + LOAD_OFF = auto() + SET_SLEW_RISE = auto() + SET_SLEW_FALL = auto() + + # Meter + SETUP_ALL = auto() + SET_WIRING = auto() + SET_COUPLING = auto() + SET_RESPONSE_SPEED = auto() + SET_AVERAGING = auto() + + # System + SET_INTERVAL = auto() + SAFE_OFF = auto() + + +class InstrumentWorker(threading.Thread): + """Daemon thread that polls instruments and processes commands. + + Usage: + worker = InstrumentWorker(bench) + worker.start() + worker.send(Cmd.SET_VOLTAGE, 48.0) + data = worker.get_data() # dict or None + worker.stop() + """ + + def __init__(self, bench, interval: float = 1.0) -> None: + super().__init__(daemon=True) + self.bench = bench + self.interval = interval + self.cmd_queue: queue.Queue = queue.Queue() + self.data_queue: queue.Queue = queue.Queue(maxsize=100) + self._stop_event = threading.Event() + + def send(self, cmd: Cmd, *args) -> None: + """Queue a command for execution on the worker thread.""" + self.cmd_queue.put((cmd, args)) + + def get_data(self) -> dict | None: + """Get the latest measurement data (non-blocking). Returns None if empty.""" + result = None + # Drain queue, keep only the latest + while True: + try: + result = self.data_queue.get_nowait() + except queue.Empty: + break + return result + + def stop(self) -> None: + """Signal the worker to stop.""" + self._stop_event.set() + + def run(self) -> None: + """Main worker loop: process commands, then read instruments.""" + while not self._stop_event.is_set(): + # Process all pending commands + while True: + try: + cmd, args = self.cmd_queue.get_nowait() + self._execute(cmd, args) + except queue.Empty: + break + + # Read all instruments + try: + data = self.bench.measure_all() + data["_error"] = None + data["_timestamp"] = time.time() + except Exception as e: + data = {"_error": str(e), "_timestamp": time.time()} + + # Push to GUI (drop oldest if full) + try: + self.data_queue.put_nowait(data) + except queue.Full: + try: + self.data_queue.get_nowait() + except queue.Empty: + pass + self.data_queue.put_nowait(data) + + self._stop_event.wait(timeout=self.interval) + + def _execute(self, cmd: Cmd, args: tuple) -> None: + """Execute a single command. Exceptions are swallowed and reported.""" + try: + bench = self.bench + match cmd: + # Supply + case Cmd.SET_VOLTAGE: + bench.supply.set_voltage(args[0]) + case Cmd.SET_CURRENT: + bench.supply.set_current(args[0]) + case Cmd.APPLY: + bench.supply.set_current(args[1]) + bench.supply.set_voltage(args[0]) + case Cmd.OUTPUT_ON: + bench.supply.output_on() + case Cmd.OUTPUT_OFF: + bench.supply.output_off() + case Cmd.SET_OVP: + bench.supply.set_ovp_level(args[0]) + bench.supply.set_ovp_state(True) + case Cmd.SET_RISE_TIME: + bench.supply.set_rise_time(args[0]) + case Cmd.SET_FALL_TIME: + bench.supply.set_fall_time(args[0]) + + # Load + case Cmd.SET_MODE: + bench.load.set_mode(args[0]) + case Cmd.SET_MODE_VALUE: + mode, value = args[0], args[1] + if mode == "CC": + bench.load.set_cc_current(value) + elif mode == "CR": + bench.load.set_cr_resistance(value) + elif mode == "CV": + bench.load.set_cv_voltage(value) + elif mode == "CP": + bench.load.set_cp_power(value) + case Cmd.LOAD_ON: + bench.load.load_on() + case Cmd.LOAD_OFF: + bench.load.load_off() + case Cmd.SET_SLEW_RISE: + bench.load.set_rise_slew(args[0]) + case Cmd.SET_SLEW_FALL: + bench.load.set_fall_slew(args[0]) + + # Meter + case Cmd.SETUP_ALL: + bench.setup_all() + case Cmd.SET_WIRING: + bench.meter.set_wiring_mode(args[0]) + case Cmd.SET_COUPLING: + bench.meter.set_coupling(args[0], args[1]) + case Cmd.SET_RESPONSE_SPEED: + bench.meter.set_response_speed(args[0]) + case Cmd.SET_AVERAGING: + bench.meter.set_averaging(args[0], args[1] if len(args) > 1 else None) + + # System + case Cmd.SET_INTERVAL: + self.interval = args[0] + case Cmd.SAFE_OFF: + bench.safe_off() + + except Exception as e: + # Push error to data queue so GUI can display it + try: + self.data_queue.put_nowait({ + "_error": f"Command {cmd.name} failed: {e}", + "_timestamp": time.time(), + }) + except queue.Full: + pass