From 956be4b77afdde7dd9d6e89334220f35f58bdcd9 Mon Sep 17 00:00:00 2001 From: grabowski Date: Wed, 11 Mar 2026 15:27:48 +0700 Subject: [PATCH] =?UTF-8?q?Add=20shade=20profiles,=202D=20V=C3=97I=20sweep?= =?UTF-8?q?,=20meter=20format=20toggle,=20ON/OFF=20indicators,=20and=20GUI?= =?UTF-8?q?=20console?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Shade profile: CSV-driven irradiance/voltage sequences with load control (bench.run_shade_profile, CLI shade-profile command, GUI profile panel) - 2D sweep: voltage × load current efficiency map with live graph updates (bench.sweep_vi, CLI sweep-vi command, GUI sweep panel with background thread) - GUI: meter format selector (scientific/normal), supply/load ON/OFF indicators, console log with stdout redirect and color-coded messages - Sample profiles: cloud_pass, partial_shade, intermittent_clouds Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + samples/cloud_pass.csv | 11 + samples/intermittent_clouds.csv | 15 + samples/partial_shade.csv | 11 + testbench/bench.py | 245 +++++++++++++ testbench/cli.py | 71 ++++ testbench/gui.py | 604 +++++++++++++++++++++++++++++++- testbench/gui_workers.py | 9 + 8 files changed, 953 insertions(+), 14 deletions(-) create mode 100644 samples/cloud_pass.csv create mode 100644 samples/intermittent_clouds.csv create mode 100644 samples/partial_shade.csv diff --git a/.gitignore b/.gitignore index 457490f..2f257d6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dist/ build/ .venv/ *.csv +!samples/*.csv diff --git a/samples/cloud_pass.csv b/samples/cloud_pass.csv new file mode 100644 index 0000000..1f7fcc6 --- /dev/null +++ b/samples/cloud_pass.csv @@ -0,0 +1,11 @@ +time,voltage,current_limit,load_mode,load_value +0,75,10.0,CC,3.0 +10,75,8.0,CC,3.0 +20,75,6.0,CC,3.0 +30,75,4.0,CC,2.0 +40,75,2.0,CC,1.0 +50,75,2.0,CC,1.0 +60,75,4.0,CC,2.0 +70,75,6.0,CC,3.0 +80,75,8.0,CC,3.0 +90,75,10.0,CC,3.0 diff --git a/samples/intermittent_clouds.csv b/samples/intermittent_clouds.csv new file mode 100644 index 0000000..fd6fda2 --- /dev/null +++ b/samples/intermittent_clouds.csv @@ -0,0 +1,15 @@ +time,voltage,current_limit,load_mode,load_value +0,75,10.0,CC,3.0 +10,75,10.0,CC,3.0 +15,75,4.0,CC,2.0 +25,75,4.0,CC,2.0 +30,75,10.0,CC,3.0 +40,75,10.0,CC,3.0 +45,75,6.0,CC,2.5 +55,75,6.0,CC,2.5 +60,75,10.0,CC,3.0 +70,75,10.0,CC,3.0 +75,75,2.0,CC,1.0 +90,75,2.0,CC,1.0 +95,75,10.0,CC,3.0 +110,75,10.0,CC,3.0 diff --git a/samples/partial_shade.csv b/samples/partial_shade.csv new file mode 100644 index 0000000..28a3373 --- /dev/null +++ b/samples/partial_shade.csv @@ -0,0 +1,11 @@ +time,voltage,current_limit,load_mode,load_value +0,75,10.0,CC,3.0 +15,75,10.0,CC,3.0 +30,75,7.0,CC,3.0 +45,65,5.0,CC,2.5 +60,55,3.0,CC,2.0 +75,55,3.0,CC,2.0 +90,65,5.0,CC,2.5 +105,75,7.0,CC,3.0 +120,75,10.0,CC,3.0 +135,75,10.0,CC,3.0 diff --git a/testbench/bench.py b/testbench/bench.py index 8dbd8d7..2b47184 100644 --- a/testbench/bench.py +++ b/testbench/bench.py @@ -417,6 +417,251 @@ class MPPTTestbench: return results + # ── Load value helper ───────────────────────────────────────────── + + def _apply_load_value(self, mode: str, value: float) -> None: + """Set the load setpoint for the given mode.""" + if mode == "CC": + self.load.set_cc_current(value) + elif mode == "CR": + self.load.set_cr_resistance(value) + elif mode == "CV": + self.load.set_cv_voltage(value) + elif mode == "CP": + self.load.set_cp_power(value) + + # ── Shade Profile ──────────────────────────────────────────────── + + @staticmethod + def load_shade_profile(path: str) -> list[dict]: + """Load a shade profile CSV. + + Required columns: time, voltage, current_limit + Optional columns: load_mode, load_value + + Returns: + Sorted list of step dicts. + """ + import csv as _csv + + steps = [] + with open(path, newline="") as f: + reader = _csv.DictReader(f) + for row in reader: + step = { + "time": float(row["time"]), + "voltage": float(row["voltage"]), + "current_limit": float(row["current_limit"]), + } + if "load_mode" in row and row["load_mode"].strip(): + step["load_mode"] = row["load_mode"].strip().upper() + if "load_value" in row and row["load_value"].strip(): + step["load_value"] = float(row["load_value"]) + steps.append(step) + steps.sort(key=lambda s: s["time"]) + return steps + + def run_shade_profile( + self, + steps: list[dict], + settle_time: float = 2.0, + ) -> list[SweepPoint]: + """Run a shade / irradiance profile sequence. + + Steps through the profile, applying voltage/current/load settings + at the scheduled times, recording a measurement at each step. + + Args: + steps: Profile steps from :meth:`load_shade_profile`. + settle_time: Seconds to wait after applying each step before + recording a measurement. + + Returns: + List of SweepPoint measurements, one per step. + """ + if not steps: + return [] + + # Apply first step and turn outputs on + first = steps[0] + self.supply.set_current(first["current_limit"]) + self.supply.set_voltage(first["voltage"]) + self.supply.output_on() + + if "load_mode" in first: + self.load.set_mode(first["load_mode"]) + if "load_value" in first: + self._apply_load_value( + first.get("load_mode", "CC"), first["load_value"] + ) + self.load.load_on() + + results: list[SweepPoint] = [] + t0 = time.time() + + try: + for i, step in enumerate(steps): + # Wait until scheduled time, keepalive supply while waiting + target = t0 + step["time"] + while True: + remaining = target - time.time() + if remaining <= 0: + break + if remaining > 2.0: + try: + self.supply.measure_voltage() + except Exception: + pass + time.sleep(min(2.0, remaining)) + else: + time.sleep(remaining) + + # Apply settings + self.supply.set_current(step["current_limit"]) + self.supply.set_voltage(step["voltage"]) + + if "load_mode" in step: + self.load.set_mode(step["load_mode"]) + if "load_value" in step: + self._apply_load_value( + step.get("load_mode", "CC"), step["load_value"] + ) + + time.sleep(settle_time) + + # Record + load_val = step.get("load_value", 0.0) or 0.0 + point = self._record_point( + step["voltage"], step["current_limit"], load_val + ) + results.append(point) + elapsed = time.time() - t0 + print( + f" [{i + 1}/{len(steps)}] t={elapsed:6.1f}s " + f"V={step['voltage']:.1f}V I_lim={step['current_limit']:.1f}A " + f"P_in={point.input_power:8.2f}W " + f"P_out={point.output_power:8.2f}W " + f"EFF={point.efficiency:6.2f}%" + ) + finally: + self.load.load_off() + self.supply.set_voltage(IDLE_VOLTAGE) + print( + f"\n Profile complete. Load OFF. " + f"Supply returning to {IDLE_VOLTAGE:.0f}V (output stays ON)" + ) + + return results + + # ── 2D Voltage × Current Sweep ───────────────────────────────────── + + def sweep_vi( + self, + v_start: float, + v_stop: float, + v_step: float, + i_start: float, + i_stop: float, + i_step: float, + current_limit: float, + settle_time: float = 2.0, + ) -> list[SweepPoint]: + """2D sweep: voltage (outer) × load current (inner). + + At each supply voltage, sweeps the load current through the full + range, recording efficiency at every (V, I) combination. Produces + a complete efficiency map of the DUT. + + After the sweep the load turns OFF and the supply returns to + IDLE_VOLTAGE (75 V, output stays ON). + + Args: + v_start: Starting supply voltage (V). + v_stop: Final supply voltage (V). + v_step: Voltage step size (V). Sign is auto-corrected. + i_start: Starting load current (A). + i_stop: Final load current (A). + i_step: Current step size (A). Sign is auto-corrected. + current_limit: Supply current limit (A) for the entire sweep. + settle_time: Seconds to wait after each setpoint change. + """ + if v_step == 0: + raise ValueError("v_step cannot be zero") + if i_step == 0: + raise ValueError("i_step cannot be zero") + + # Auto-correct step directions + if v_start < v_stop and v_step < 0: + v_step = -v_step + elif v_start > v_stop and v_step > 0: + v_step = -v_step + + if i_start < i_stop and i_step < 0: + i_step = -i_step + elif i_start > i_stop and i_step > 0: + i_step = -i_step + + # Count steps for progress display + v_count = int(abs(v_stop - v_start) / abs(v_step)) + 1 + i_count = int(abs(i_stop - i_start) / abs(i_step)) + 1 + total = v_count * i_count + print(f" Grid: {v_count} voltage × {i_count} current = {total} points") + + self.supply.set_current(current_limit) + self.supply.output_on() + self.load.set_mode("CC") + self.load.set_cc_current(i_start) + self.load.load_on() + + results: list[SweepPoint] = [] + n = 0 + v = v_start + + try: + while True: + if v_step > 0 and v > v_stop + v_step / 2: + break + if v_step < 0 and v < v_stop + v_step / 2: + break + + self.supply.set_voltage(v) + i = i_start + + while True: + if i_step > 0 and i > i_stop + i_step / 2: + break + if i_step < 0 and i < i_stop + i_step / 2: + break + + self.load.set_cc_current(i) + time.sleep(settle_time) + + point = self._record_point(v, current_limit, load_setpoint=i) + results.append(point) + n += 1 + + print( + f" [{n:>4d}/{total}] " + f"V={v:6.1f}V I_load={i:6.2f}A " + f"P_in={point.input_power:8.2f}W " + f"P_out={point.output_power:8.2f}W " + f"EFF={point.efficiency:6.2f}%" + ) + + i += i_step + + v += v_step + + finally: + self.load.load_off() + self.supply.set_voltage(IDLE_VOLTAGE) + print( + f"\n Load OFF. Supply returning to {IDLE_VOLTAGE:.0f}V " + f"(output stays ON)" + ) + + return results + # ── Efficiency at fixed operating point ─────────────────────────── def measure_efficiency( diff --git a/testbench/cli.py b/testbench/cli.py index 64e730e..236dd77 100644 --- a/testbench/cli.py +++ b/testbench/cli.py @@ -438,6 +438,55 @@ def cmd_efficiency(bench: MPPTTestbench, args: argparse.Namespace) -> None: print(f"Average efficiency: {result['avg_efficiency']:.2f} %") +def cmd_sweep_vi(bench: MPPTTestbench, args: argparse.Namespace) -> None: + """Run a 2D voltage × load current sweep.""" + print( + f"2D sweep: V={args.v_start:.1f}-{args.v_stop:.1f}V (step {args.v_step:.1f}), " + f"I_load={args.i_start:.2f}-{args.i_stop:.2f}A (step {args.i_step:.2f}), " + f"I_limit={args.current_limit:.1f}A, settle={args.settle:.1f}s" + ) + print() + + results = bench.sweep_vi( + v_start=args.v_start, + v_stop=args.v_stop, + v_step=args.v_step, + i_start=args.i_start, + i_stop=args.i_stop, + i_step=args.i_step, + current_limit=args.current_limit, + settle_time=args.settle, + ) + + _write_sweep_csv(results, args.output) + _print_sweep_summary(results) + + +def cmd_shade_profile(bench: MPPTTestbench, args: argparse.Namespace) -> None: + """Run a shade / irradiance profile from a CSV file.""" + steps = MPPTTestbench.load_shade_profile(args.profile) + duration = steps[-1]["time"] if steps else 0 + + print(f"Shade profile: {args.profile}") + print(f" {len(steps)} steps over {duration:.0f}s, settle={args.settle:.1f}s") + + # Show what modes/values are used + modes = {s.get("load_mode", "?") for s in steps} + voltages = [s["voltage"] for s in steps] + currents = [s["current_limit"] for s in steps] + print( + f" Voltage: {min(voltages):.1f} - {max(voltages):.1f}V, " + f"I_limit: {min(currents):.1f} - {max(currents):.1f}A, " + f"Load modes: {', '.join(sorted(modes))}" + ) + print() + + results = bench.run_shade_profile(steps, settle_time=args.settle) + + _write_sweep_csv(results, args.output) + _print_sweep_summary(results) + + def cmd_supply(bench: MPPTTestbench, args: argparse.Namespace) -> None: """Control the DC supply directly.""" if args.action == "on": @@ -515,6 +564,8 @@ examples: %(prog)s sweep --v-start 10 --v-stop 50 --v-step 1 --current-limit 10 -o sweep.csv %(prog)s sweep-load --voltage 75 --current-limit 10 --i-start 1 --i-stop 20 --i-step 1 -o load.csv %(prog)s efficiency --voltage 36 --current-limit 10 --samples 10 + %(prog)s sweep-vi --v-start 35 --v-stop 100 --v-step 5 --i-start 0.5 --i-stop 30 --i-step 1 --current-limit 35 -o map.csv + %(prog)s shade-profile --profile cloud_pass.csv --settle 2.0 -o shade_results.csv %(prog)s supply set --voltage 24 --current 10 %(prog)s supply on %(prog)s load set --mode CC --value 5.0 @@ -599,6 +650,24 @@ examples: p_eff.add_argument("--load-mode", choices=["CC", "CR", "CV", "CP"]) p_eff.add_argument("--load-value", type=float) + # sweep-vi (2D) + p_svi = sub.add_parser("sweep-vi", help="2D voltage × load current sweep (efficiency map)") + p_svi.add_argument("--v-start", type=float, required=True, help="Start voltage (V)") + p_svi.add_argument("--v-stop", type=float, required=True, help="Stop voltage (V)") + p_svi.add_argument("--v-step", type=float, required=True, help="Voltage step (V)") + p_svi.add_argument("--i-start", type=float, required=True, help="Start load current (A)") + p_svi.add_argument("--i-stop", type=float, required=True, help="Stop load current (A)") + p_svi.add_argument("--i-step", type=float, required=True, help="Current step (A)") + p_svi.add_argument("--current-limit", type=float, required=True, help="Supply current limit (A)") + p_svi.add_argument("--settle", type=float, default=2.0, help="Settle time per step (s)") + p_svi.add_argument("-o", "--output", help="CSV output file") + + # shade-profile + p_shade = sub.add_parser("shade-profile", help="Run a shade/irradiance profile from CSV") + p_shade.add_argument("--profile", required=True, help="Profile CSV file (time,voltage,current_limit,...)") + p_shade.add_argument("--settle", type=float, default=2.0, help="Settle time per step (s)") + p_shade.add_argument("-o", "--output", help="CSV output file for results") + # supply (direct control) p_sup = sub.add_parser("supply", help="Direct supply control") p_sup_sub = p_sup.add_subparsers(dest="action", required=True) @@ -631,6 +700,8 @@ examples: "sweep": cmd_sweep, "sweep-load": cmd_sweep_load, "efficiency": cmd_efficiency, + "sweep-vi": cmd_sweep_vi, + "shade-profile": cmd_shade_profile, "supply": cmd_supply, "load": cmd_load, "safe-off": cmd_safe_off, diff --git a/testbench/gui.py b/testbench/gui.py index 793692f..2972fec 100644 --- a/testbench/gui.py +++ b/testbench/gui.py @@ -6,7 +6,9 @@ Tkinter app with embedded matplotlib graphs and threaded instrument I/O. from __future__ import annotations import csv +import queue import sys +import threading import time import tkinter as tk from tkinter import ttk, messagebox, filedialog @@ -47,6 +49,30 @@ def _clean(val: float) -> float: return float("nan") if abs(val) >= ERROR_THRESHOLD else val +class _ConsoleRedirector: + """Redirects stdout writes to the GUI console widget.""" + + def __init__(self, gui: "TestbenchGUI") -> None: + self._gui = gui + self._orig = sys.stdout + self._buf = "" + + def write(self, text: str) -> None: + if self._orig: + self._orig.write(text) + # Buffer partial lines, flush on newline + self._buf += text + while "\n" in self._buf: + line, self._buf = self._buf.split("\n", 1) + line = line.strip() + if line: + self._gui._console(line) + + def flush(self) -> None: + if self._orig: + self._orig.flush() + + class TestbenchGUI(tk.Tk): """Main GUI window.""" @@ -77,6 +103,9 @@ class TestbenchGUI(tk.Tk): ] } + self._meter_fmt_sci = True # True=scientific, False=normal + self._sweep_data_queue: queue.Queue = queue.Queue(maxsize=100) + self._build_ui() self.protocol("WM_DELETE_WINDOW", self._on_close) @@ -99,14 +128,21 @@ class TestbenchGUI(tk.Tk): self._build_supply_controls(left_col0) self._build_load_controls(left_col0) self._build_logging_controls(left_col0) + self._build_profile_controls(left_col0) # Column 1: Meter left_col1 = ttk.Frame(left_outer) left_col1.grid(row=0, column=1, sticky="nsew", padx=(2, 0)) self._build_meter_readout(left_col1) - # Right panel (graphs) - right_frame = ttk.Frame(content) - content.add(right_frame, weight=1) - self._build_graphs(right_frame) + self._build_sweep_vi_controls(left_col1) + # Right panel: graphs + console + right_pane = ttk.PanedWindow(content, orient=tk.VERTICAL) + content.add(right_pane, weight=1) + graph_frame = ttk.Frame(right_pane) + right_pane.add(graph_frame, weight=3) + self._build_graphs(graph_frame) + console_frame = ttk.Frame(right_pane) + right_pane.add(console_frame, weight=1) + self._build_console(console_frame) # Status bar self._build_status_bar() @@ -217,6 +253,11 @@ class TestbenchGUI(tk.Tk): self._sup_i_label.pack(anchor=tk.W) self._sup_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11)) self._sup_p_label.pack(anchor=tk.W) + self._sup_state_label = tk.Label( + readout, text="OUTPUT: ---", font=("Consolas", 11, "bold"), + fg="#888888", + ) + self._sup_state_label.pack(anchor=tk.W, pady=(4, 0)) def _build_load_controls(self, parent) -> None: frame = ttk.LabelFrame(parent, text="DC Load (Prodigit 3366G)", padding=8) @@ -277,6 +318,11 @@ class TestbenchGUI(tk.Tk): self._load_i_label.pack(anchor=tk.W) self._load_p_label = ttk.Label(readout, text="P: ---", font=("Consolas", 11)) self._load_p_label.pack(anchor=tk.W) + self._load_state_label = tk.Label( + readout, text="LOAD: ---", font=("Consolas", 11, "bold"), + fg="#888888", + ) + self._load_state_label.pack(anchor=tk.W, pady=(4, 0)) def _build_meter_readout(self, parent) -> None: frame = ttk.LabelFrame(parent, text="Power Analyzer (HIOKI 3193-10)", padding=8) @@ -324,6 +370,17 @@ class TestbenchGUI(tk.Tk): ttk.Button(row, text="Set", width=4, command=lambda: self._send(Cmd.SET_COUPLING, 6, self._ch6_coupling.get())).pack(side=tk.LEFT, padx=2) + # Format selector + fmt_row = ttk.Frame(frame) + fmt_row.pack(fill=tk.X, pady=2) + ttk.Label(fmt_row, text="Format:", width=10).pack(side=tk.LEFT) + self._meter_fmt_combo = ttk.Combobox( + fmt_row, values=["Scientific", "Normal"], width=10, state="readonly" + ) + self._meter_fmt_combo.set("Scientific") + self._meter_fmt_combo.pack(side=tk.LEFT, padx=2) + self._meter_fmt_combo.bind("<>", self._on_meter_fmt_change) + # Input side input_frame = ttk.LabelFrame(frame, text="Input (Ch5 - Solar)", padding=4) input_frame.pack(fill=tk.X, pady=2) @@ -372,6 +429,102 @@ class TestbenchGUI(tk.Tk): self._log_status = ttk.Label(frame, text="Not logging", font=("Consolas", 9)) self._log_status.pack(anchor=tk.W, pady=2) + def _build_profile_controls(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="Shade Profile", padding=8) + frame.pack(fill=tk.X, padx=4, pady=4) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + self._profile_path_label = ttk.Label(row, text="No file selected", font=("Consolas", 9)) + self._profile_path_label.pack(side=tk.LEFT, fill=tk.X, expand=True) + ttk.Button(row, text="Browse", command=self._browse_profile).pack(side=tk.RIGHT, padx=2) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + ttk.Label(row, text="Settle (s):", width=12).pack(side=tk.LEFT) + self._profile_settle = ttk.Entry(row, width=6) + self._profile_settle.insert(0, "2.0") + self._profile_settle.pack(side=tk.LEFT, padx=2) + + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + self._btn_profile_run = ttk.Button(row, text="Run Profile", command=self._run_profile) + self._btn_profile_run.pack(side=tk.LEFT, padx=2) + self._btn_profile_stop = ttk.Button(row, text="Stop", command=self._stop_profile, state=tk.DISABLED) + self._btn_profile_stop.pack(side=tk.LEFT, padx=2) + + self._profile_status = ttk.Label(frame, text="Idle", font=("Consolas", 9)) + self._profile_status.pack(anchor=tk.W, pady=2) + + self._profile_steps: list[dict] = [] + self._profile_index = 0 + self._profile_running = False + self._profile_t0 = 0.0 + self._profile_after_id = None + + def _build_sweep_vi_controls(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="2D Sweep (V × I)", padding=8) + frame.pack(fill=tk.X, padx=4, pady=4) + + # Voltage range + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="V start:", width=10).pack(side=tk.LEFT) + self._svi_v_start = ttk.Entry(row, width=7) + self._svi_v_start.insert(0, "35") + self._svi_v_start.pack(side=tk.LEFT, padx=2) + ttk.Label(row, text="stop:").pack(side=tk.LEFT) + self._svi_v_stop = ttk.Entry(row, width=7) + self._svi_v_stop.insert(0, "100") + self._svi_v_stop.pack(side=tk.LEFT, padx=2) + ttk.Label(row, text="step:").pack(side=tk.LEFT) + self._svi_v_step = ttk.Entry(row, width=5) + self._svi_v_step.insert(0, "5") + self._svi_v_step.pack(side=tk.LEFT, padx=2) + + # Current range + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="I start:", width=10).pack(side=tk.LEFT) + self._svi_i_start = ttk.Entry(row, width=7) + self._svi_i_start.insert(0, "0.5") + self._svi_i_start.pack(side=tk.LEFT, padx=2) + ttk.Label(row, text="stop:").pack(side=tk.LEFT) + self._svi_i_stop = ttk.Entry(row, width=7) + self._svi_i_stop.insert(0, "30") + self._svi_i_stop.pack(side=tk.LEFT, padx=2) + ttk.Label(row, text="step:").pack(side=tk.LEFT) + self._svi_i_step = ttk.Entry(row, width=5) + self._svi_i_step.insert(0, "1") + self._svi_i_step.pack(side=tk.LEFT, padx=2) + + # Supply current limit + settle + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=1) + ttk.Label(row, text="I limit:", width=10).pack(side=tk.LEFT) + self._svi_ilimit = ttk.Entry(row, width=7) + self._svi_ilimit.insert(0, "35") + self._svi_ilimit.pack(side=tk.LEFT, padx=2) + ttk.Label(row, text="settle:").pack(side=tk.LEFT) + self._svi_settle = ttk.Entry(row, width=5) + self._svi_settle.insert(0, "2.0") + self._svi_settle.pack(side=tk.LEFT, padx=2) + ttk.Label(row, text="s").pack(side=tk.LEFT) + + # Run / Stop + row = ttk.Frame(frame) + row.pack(fill=tk.X, pady=2) + self._btn_svi_run = ttk.Button(row, text="Run Sweep", command=self._run_sweep_vi) + self._btn_svi_run.pack(side=tk.LEFT, padx=2) + self._btn_svi_stop = ttk.Button(row, text="Stop", command=self._stop_sweep_vi, state=tk.DISABLED) + self._btn_svi_stop.pack(side=tk.LEFT, padx=2) + + self._svi_status = ttk.Label(frame, text="Idle", font=("Consolas", 9)) + self._svi_status.pack(anchor=tk.W, pady=2) + + self._svi_thread = None + self._svi_stop_event = None + def _build_graphs(self, parent) -> None: self._fig = Figure(figsize=(10, 8), dpi=100) self._fig.suptitle("MPPT Testbench Live", fontsize=12, fontweight="bold") @@ -422,6 +575,52 @@ class TestbenchGUI(tk.Tk): toolbar = NavigationToolbar2Tk(self._canvas, parent) toolbar.update() + def _build_console(self, parent) -> None: + frame = ttk.LabelFrame(parent, text="Console", padding=4) + frame.pack(fill=tk.BOTH, expand=True, padx=0, pady=0) + + self._console_text = tk.Text( + frame, height=8, font=("Consolas", 9), + wrap=tk.WORD, state=tk.DISABLED, + bg="#1e1e1e", fg="#cccccc", insertbackground="#cccccc", + ) + scrollbar = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self._console_text.yview) + self._console_text.config(yscrollcommand=scrollbar.set) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + self._console_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + + # Tag for error messages + self._console_text.tag_configure("error", foreground="#ff6b6b") + self._console_text.tag_configure("success", foreground="#69db7c") + self._console_text.tag_configure("warn", foreground="#ffd43b") + + # Redirect stdout to console + self._orig_stdout = sys.stdout + sys.stdout = _ConsoleRedirector(self) + + def _console(self, msg: str, tag: str = "") -> None: + """Append a message to the console log. Thread-safe.""" + ts = time.strftime("%H:%M:%S") + line = f"[{ts}] {msg}\n" + + def _append(): + self._console_text.config(state=tk.NORMAL) + if tag: + self._console_text.insert(tk.END, line, tag) + else: + self._console_text.insert(tk.END, line) + self._console_text.see(tk.END) + self._console_text.config(state=tk.DISABLED) + + # If called from a non-main thread, schedule on main thread + try: + if threading.current_thread() is threading.main_thread(): + _append() + else: + self.after(0, _append) + except RuntimeError: + pass + def _build_status_bar(self) -> None: bar = ttk.Frame(self) bar.pack(fill=tk.X, padx=4, pady=(0, 4)) @@ -465,9 +664,11 @@ class TestbenchGUI(tk.Tk): self._meter_addr.config(state=tk.DISABLED) self._status_label.config(text="Connected") + self._console(f"Connected: supply={supply_addr}, load={load_port}, meter={meter_addr}", "success") self._poll() except Exception as e: + self._console(f"Connection failed: {e}", "error") messagebox.showerror("Connection Error", str(e)) def _disconnect(self) -> None: @@ -490,12 +691,15 @@ class TestbenchGUI(tk.Tk): self._load_baud.config(state=tk.NORMAL) self._meter_addr.config(state=tk.NORMAL) self._status_label.config(text="Disconnected") + self._console("Disconnected") def _setup_all(self) -> None: self._send(Cmd.SETUP_ALL) + self._console("Setup All sent") def _safe_off(self) -> None: """Emergency stop -- bypasses command queue for minimum latency.""" + self._console("SAFE OFF triggered", "error") if self.bench: try: self.bench.safe_off() @@ -512,19 +716,31 @@ class TestbenchGUI(tk.Tk): except Exception: pass self._disconnect() + sys.stdout = self._orig_stdout self.destroy() # ── Polling / Data Update ───────────────────────────────────────── def _poll(self) -> None: """Called periodically to pull data from the worker and update UI.""" - if not self.worker: - return + data = None + + if self.worker: + data = self.worker.get_data() + else: + # During 2D sweep the worker is stopped; drain sweep queue + latest = None + while True: + try: + latest = self._sweep_data_queue.get_nowait() + except queue.Empty: + break + data = latest - data = self.worker.get_data() if data: error = data.get("_error") if error: + self._console(f"Error: {error}", "error") self._status_label.config(text=f"Error: {error}") else: self._update_readouts(data) @@ -539,7 +755,9 @@ class TestbenchGUI(tk.Tk): text=f"Connected | {self._point_count} pts | {hrs:02d}:{mins:02d}:{secs:02d}" ) - self.after(POLL_MS, self._poll) + # Keep polling as long as connected (bench exists) + if self.worker or self.bench: + self.after(POLL_MS, self._poll) def _update_readouts(self, data: dict) -> None: """Update all numeric labels from measurement data.""" @@ -554,12 +772,13 @@ class TestbenchGUI(tk.Tk): self._load_p_label.config(text=f"P: {_fmt(data['load_P'])} W") # Meter - self._m_u5.config(text=f"U5: {_fmt_eng(data['meter_U5'])} V") - self._m_i5.config(text=f"I5: {_fmt_eng(data['meter_I5'])} A") - self._m_p5.config(text=f"P5: {_fmt_eng(data['meter_P5'])} W") - self._m_u6.config(text=f"U6: {_fmt_eng(data['meter_U6'])} V") - self._m_i6.config(text=f"I6: {_fmt_eng(data['meter_I6'])} A") - self._m_p6.config(text=f"P6: {_fmt_eng(data['meter_P6'])} W") + fm = self._fmt_meter + self._m_u5.config(text=f"U5: {fm(data['meter_U5'])} V") + self._m_i5.config(text=f"I5: {fm(data['meter_I5'])} A") + self._m_p5.config(text=f"P5: {fm(data['meter_P5'])} W") + self._m_u6.config(text=f"U6: {fm(data['meter_U6'])} V") + self._m_i6.config(text=f"I6: {fm(data['meter_I6'])} A") + self._m_p6.config(text=f"P6: {fm(data['meter_P6'])} W") eff = data['meter_EFF1'] if abs(eff) >= ERROR_THRESHOLD: @@ -567,6 +786,23 @@ class TestbenchGUI(tk.Tk): else: self._m_eff.config(text=f"EFF1: {eff:.2f} %") + # ON/OFF indicators + supply_on = data.get("supply_on") + if supply_on is True: + self._sup_state_label.config(text="OUTPUT: ON", fg="#00aa00") + elif supply_on is False: + self._sup_state_label.config(text="OUTPUT: OFF", fg="#cc0000") + else: + self._sup_state_label.config(text="OUTPUT: ---", fg="#888888") + + load_on = data.get("load_on") + if load_on is True: + self._load_state_label.config(text="LOAD: ON", fg="#00aa00") + elif load_on is False: + self._load_state_label.config(text="LOAD: OFF", fg="#cc0000") + else: + self._load_state_label.config(text="LOAD: ---", fg="#888888") + def _update_graphs(self, data: dict) -> None: """Append data to series and redraw graphs.""" now = time.time() - self._t0 @@ -592,12 +828,27 @@ class TestbenchGUI(tk.Tk): self._canvas.draw_idle() + # ── Format Helpers ───────────────────────────────────────────────── + + def _on_meter_fmt_change(self, _event=None) -> None: + self._meter_fmt_sci = self._meter_fmt_combo.get() == "Scientific" + + def _fmt_meter(self, val: float) -> str: + """Format a meter value based on the current format selection.""" + if self._meter_fmt_sci: + return _fmt_eng(val) + return _fmt(val, decimals=4) + # ── Command Helpers ─────────────────────────────────────────────── def _send(self, cmd: Cmd, *args) -> None: """Send a command to the worker thread.""" if self.worker: self.worker.send(cmd, *args) + if args: + self._console(f"{cmd.name} {' '.join(str(a) for a in args)}") + else: + self._console(cmd.name) def _send_float(self, cmd: Cmd, entry: ttk.Entry) -> None: """Parse an entry as float and send to worker.""" @@ -652,6 +903,329 @@ class TestbenchGUI(tk.Tk): except ValueError: pass + # ── Shade Profile ────────────────────────────────────────────────── + + def _browse_profile(self) -> None: + path = filedialog.askopenfilename( + filetypes=[("CSV files", "*.csv"), ("All files", "*.*")], + ) + if not path: + return + try: + self._profile_steps = MPPTTestbench.load_shade_profile(path) + n = len(self._profile_steps) + dur = self._profile_steps[-1]["time"] if self._profile_steps else 0 + self._profile_path_label.config(text=path.split("/")[-1].split("\\")[-1]) + self._profile_status.config(text=f"Loaded: {n} steps, {dur:.0f}s") + except Exception as e: + messagebox.showerror("Profile Error", str(e)) + + def _run_profile(self) -> None: + if not self._profile_steps: + messagebox.showwarning("No Profile", "Load a profile CSV first.") + return + if not self.worker: + return + + try: + settle = float(self._profile_settle.get()) + except ValueError: + settle = 2.0 + + n = len(self._profile_steps) + dur = self._profile_steps[-1]["time"] if self._profile_steps else 0 + self._console(f"Shade profile started: {n} steps, {dur:.0f}s", "success") + + self._profile_running = True + self._profile_index = 0 + self._profile_t0 = time.time() + self._btn_profile_run.config(state=tk.DISABLED) + self._btn_profile_stop.config(state=tk.NORMAL) + + # Turn outputs on with first step + first = self._profile_steps[0] + self._send(Cmd.SET_CURRENT, first["current_limit"]) + self._send(Cmd.SET_VOLTAGE, first["voltage"]) + self._send(Cmd.OUTPUT_ON) + if "load_mode" in first: + self._send(Cmd.SET_MODE, first["load_mode"]) + if "load_value" in first: + self._send(Cmd.SET_MODE_VALUE, first.get("load_mode", "CC"), first["load_value"]) + self._send(Cmd.LOAD_ON) + + self._profile_schedule_next() + + def _profile_schedule_next(self) -> None: + if not self._profile_running: + return + if self._profile_index >= len(self._profile_steps): + self._finish_profile() + return + + step = self._profile_steps[self._profile_index] + target = self._profile_t0 + step["time"] + delay_ms = max(0, int((target - time.time()) * 1000)) + + self._profile_after_id = self.after(delay_ms, self._apply_profile_step) + + def _apply_profile_step(self) -> None: + if not self._profile_running: + return + + step = self._profile_steps[self._profile_index] + self._send(Cmd.SET_CURRENT, step["current_limit"]) + self._send(Cmd.SET_VOLTAGE, step["voltage"]) + if "load_mode" in step: + self._send(Cmd.SET_MODE, step["load_mode"]) + if "load_value" in step: + self._send(Cmd.SET_MODE_VALUE, step.get("load_mode", "CC"), step["load_value"]) + + elapsed = time.time() - self._profile_t0 + n = len(self._profile_steps) + self._profile_status.config( + text=f"Step {self._profile_index + 1}/{n} " + f"t={elapsed:.0f}s V={step['voltage']:.1f}V " + f"I_lim={step['current_limit']:.1f}A" + ) + + self._profile_index += 1 + self._profile_schedule_next() + + def _stop_profile(self) -> None: + self._profile_running = False + if self._profile_after_id: + self.after_cancel(self._profile_after_id) + self._profile_after_id = None + self._btn_profile_run.config(state=tk.NORMAL) + self._btn_profile_stop.config(state=tk.DISABLED) + self._profile_status.config(text="Stopped") + self._console("Shade profile stopped", "warn") + + def _finish_profile(self) -> None: + self._profile_running = False + self._btn_profile_run.config(state=tk.NORMAL) + self._btn_profile_stop.config(state=tk.DISABLED) + elapsed = time.time() - self._profile_t0 + self._profile_status.config(text=f"Done ({elapsed:.0f}s)") + self._console(f"Shade profile complete ({elapsed:.0f}s)", "success") + + # ── 2D Sweep (V × I) ─────────────────────────────────────────────── + + def _run_sweep_vi(self) -> None: + if not self.bench: + return + try: + params = { + "v_start": float(self._svi_v_start.get()), + "v_stop": float(self._svi_v_stop.get()), + "v_step": float(self._svi_v_step.get()), + "i_start": float(self._svi_i_start.get()), + "i_stop": float(self._svi_i_stop.get()), + "i_step": float(self._svi_i_step.get()), + "current_limit": float(self._svi_ilimit.get()), + "settle_time": float(self._svi_settle.get()), + } + except ValueError: + messagebox.showerror("Invalid Input", "Check sweep parameters.") + return + + # Ask for output file + default_name = time.strftime("sweep_vi_%Y%m%d_%H%M%S.csv") + path = filedialog.asksaveasfilename( + defaultextension=".csv", + filetypes=[("CSV files", "*.csv")], + initialfile=default_name, + ) + if not path: + return + + self._console( + f"2D sweep: V={params['v_start']}-{params['v_stop']}V " + f"I={params['i_start']}-{params['i_stop']}A", + "success", + ) + + # Stop the normal worker so the sweep owns the instruments + if self.worker: + self.worker.stop() + self.worker = None + + # Restart _poll so it drains the sweep data queue + self.after(POLL_MS, self._poll) + + self._btn_svi_run.config(state=tk.DISABLED) + self._btn_svi_stop.config(state=tk.NORMAL) + + stop_event = threading.Event() + self._svi_stop_event = stop_event + + def _sweep_thread(): + try: + self.after(0, lambda: self._svi_status.config(text="Running...")) + results = self._sweep_vi_loop(params, stop_event) + # Write CSV + if results: + self._write_sweep_vi_csv(results, path) + fname = path.split('/')[-1].split(chr(92))[-1] + msg = f"Done: {len(results)} pts saved to {fname}" + self._console(msg, "success") + else: + msg = "Sweep stopped (no data)" + self._console(msg, "warn") + self.after(0, lambda: self._svi_status.config(text=msg)) + except Exception as e: + self._console(f"Sweep error: {e}", "error") + self.after(0, lambda: self._svi_status.config(text=f"Error: {e}")) + finally: + self.after(0, self._sweep_vi_done) + + self._svi_thread = threading.Thread(target=_sweep_thread, daemon=True) + self._svi_thread.start() + + def _sweep_vi_loop(self, p: dict, stop: threading.Event) -> list: + """Run the 2D sweep on a background thread. Returns list of SweepPoint.""" + from testbench.bench import SweepPoint, IDLE_VOLTAGE + + bench = self.bench + v_start, v_stop, v_step = p["v_start"], p["v_stop"], p["v_step"] + i_start, i_stop, i_step = p["i_start"], p["i_stop"], p["i_step"] + current_limit = p["current_limit"] + settle = p["settle_time"] + + # Auto-correct directions + if v_start < v_stop and v_step < 0: + v_step = -v_step + elif v_start > v_stop and v_step > 0: + v_step = -v_step + if i_start < i_stop and i_step < 0: + i_step = -i_step + elif i_start > i_stop and i_step > 0: + i_step = -i_step + + bench.supply.set_current(current_limit) + bench.supply.output_on() + bench.load.set_mode("CC") + bench.load.set_cc_current(i_start) + bench.load.load_on() + + results = [] + n = 0 + v = v_start + + try: + while not stop.is_set(): + if v_step > 0 and v > v_stop + v_step / 2: + break + if v_step < 0 and v < v_stop + v_step / 2: + break + + bench.supply.set_voltage(v) + i = i_start + + while not stop.is_set(): + if i_step > 0 and i > i_stop + i_step / 2: + break + if i_step < 0 and i < i_stop + i_step / 2: + break + + bench.load.set_cc_current(i) + time.sleep(settle) + if stop.is_set(): + break + + point = bench._record_point(v, current_limit, load_setpoint=i) + results.append(point) + n += 1 + + # Push data for live graph/readout updates + gui_data = { + "supply_V": point.supply_voltage, + "supply_I": point.supply_current, + "supply_P": point.supply_power, + "load_V": point.load_voltage, + "load_I": point.load_current, + "load_P": point.load_power, + "meter_U5": point.supply_voltage, + "meter_I5": point.supply_current, + "meter_P5": point.input_power, + "meter_U6": point.load_voltage, + "meter_I6": point.load_current, + "meter_P6": point.output_power, + "meter_EFF1": point.efficiency, + "supply_on": True, + "load_on": True, + "_error": None, + "_timestamp": time.time(), + } + try: + self._sweep_data_queue.put_nowait(gui_data) + except queue.Full: + try: + self._sweep_data_queue.get_nowait() + except queue.Empty: + pass + self._sweep_data_queue.put_nowait(gui_data) + + self.after( + 0, + lambda v=v, i=i, pt=point, n=n: self._svi_status.config( + text=f"[{n}] V={v:.1f}V I={i:.1f}A " + f"EFF={pt.efficiency:.1f}%" + ), + ) + + i += i_step + v += v_step + finally: + bench.load.load_off() + bench.supply.set_voltage(IDLE_VOLTAGE) + + return results + + @staticmethod + def _write_sweep_vi_csv(results, path: str) -> None: + import csv as _csv + + with open(path, "w", newline="") as f: + w = _csv.writer(f) + w.writerow([ + "voltage_set", "current_limit", "load_setpoint", + "supply_V", "supply_I", "supply_P", + "load_V", "load_I", "load_P", + "input_power", "output_power", "efficiency", + ]) + for pt in results: + w.writerow([ + f"{pt.voltage_set:.4f}", f"{pt.current_limit:.4f}", + f"{pt.load_setpoint:.4f}", + f"{pt.supply_voltage:.4f}", f"{pt.supply_current:.4f}", + f"{pt.supply_power:.4f}", + f"{pt.load_voltage:.4f}", f"{pt.load_current:.4f}", + f"{pt.load_power:.4f}", + f"{pt.input_power:.4f}", f"{pt.output_power:.4f}", + f"{pt.efficiency:.4f}", + ]) + + def _stop_sweep_vi(self) -> None: + if self._svi_stop_event: + self._svi_stop_event.set() + + def _sweep_vi_done(self) -> None: + self._btn_svi_run.config(state=tk.NORMAL) + self._btn_svi_stop.config(state=tk.DISABLED) + self._svi_thread = None + self._svi_stop_event = None + # Restart normal worker polling + if self.bench: + interval = 1.0 + try: + interval = float(self._poll_interval.get()) + except ValueError: + pass + self.worker = InstrumentWorker(self.bench, interval=interval) + self.worker.start() + self._poll() + # ── Logging ─────────────────────────────────────────────────────── _LOG_COLUMNS = [ @@ -679,9 +1253,11 @@ class TestbenchGUI(tk.Tk): self._btn_log_start.config(state=tk.DISABLED) self._btn_log_stop.config(state=tk.NORMAL) self._log_status.config(text=f"Logging to {path}") + self._console(f"CSV logging started: {path}") def _stop_log(self) -> None: if self._log_file: + self._console(f"CSV logging stopped ({self._log_count} samples)") self._log_file.close() self._log_file = None self._log_writer = None diff --git a/testbench/gui_workers.py b/testbench/gui_workers.py index 1cd0aa1..4f38fe4 100644 --- a/testbench/gui_workers.py +++ b/testbench/gui_workers.py @@ -99,6 +99,15 @@ class InstrumentWorker(threading.Thread): data = self.bench.measure_all() data["_error"] = None data["_timestamp"] = time.time() + # Query output states for GUI indicators + try: + data["supply_on"] = self.bench.supply.get_output_state() + except Exception: + data["supply_on"] = None + try: + data["load_on"] = self.bench.load.get_load_state() + except Exception: + data["load_on"] = None except Exception as e: data = {"_error": str(e), "_timestamp": time.time()}