Files
mppt-testbench/testbench/gui_workers.py
grabowski 05f23d6417 Add HIOKI channel range controls and live range display
- GUI: voltage/current range selectors for Ch5 and Ch6 with AUTO option
- Worker: queries current V/I ranges each poll cycle, pushes to GUI
- Setting a fixed range automatically disables auto-range for that channel
- Live display shows current range values below each channel's readout

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 15:30:10 +07:00

224 lines
7.7 KiB
Python

"""Background worker thread for instrument I/O.
Keeps all VISA/serial communication off the GUI thread so tkinter
never freezes during instrument queries.
"""
from __future__ import annotations
import queue
import threading
import time
from enum import Enum, auto
class Cmd(Enum):
"""Commands sent from GUI to worker thread."""
# Supply
SET_VOLTAGE = auto()
SET_CURRENT = auto()
APPLY = auto()
OUTPUT_ON = auto()
OUTPUT_OFF = auto()
SET_OVP = auto()
SET_RISE_TIME = auto()
SET_FALL_TIME = auto()
# Load
SET_MODE = auto()
SET_MODE_VALUE = auto()
LOAD_ON = auto()
LOAD_OFF = auto()
SET_SLEW_RISE = auto()
SET_SLEW_FALL = auto()
# Meter
SETUP_ALL = auto()
SET_WIRING = auto()
SET_COUPLING = auto()
SET_RESPONSE_SPEED = auto()
SET_AVERAGING = auto()
SET_VOLTAGE_RANGE = auto()
SET_CURRENT_RANGE = auto()
SET_VOLTAGE_AUTO = auto()
SET_CURRENT_AUTO = auto()
# System
SET_INTERVAL = auto()
SAFE_OFF = auto()
class InstrumentWorker(threading.Thread):
"""Daemon thread that polls instruments and processes commands.
Usage:
worker = InstrumentWorker(bench)
worker.start()
worker.send(Cmd.SET_VOLTAGE, 48.0)
data = worker.get_data() # dict or None
worker.stop()
"""
def __init__(self, bench, interval: float = 1.0) -> None:
super().__init__(daemon=True)
self.bench = bench
self.interval = interval
self.cmd_queue: queue.Queue = queue.Queue()
self.data_queue: queue.Queue = queue.Queue(maxsize=100)
self._stop_event = threading.Event()
def send(self, cmd: Cmd, *args) -> None:
"""Queue a command for execution on the worker thread."""
self.cmd_queue.put((cmd, args))
def get_data(self) -> dict | None:
"""Get the latest measurement data (non-blocking). Returns None if empty."""
result = None
# Drain queue, keep only the latest
while True:
try:
result = self.data_queue.get_nowait()
except queue.Empty:
break
return result
def stop(self) -> None:
"""Signal the worker to stop."""
self._stop_event.set()
def run(self) -> None:
"""Main worker loop: process commands, then read instruments."""
while not self._stop_event.is_set():
# Process all pending commands
while True:
try:
cmd, args = self.cmd_queue.get_nowait()
self._execute(cmd, args)
except queue.Empty:
break
# Read all instruments
try:
data = self.bench.measure_all()
data["_error"] = None
data["_timestamp"] = time.time()
# Query output states for GUI indicators
try:
data["supply_on"] = self.bench.supply.get_output_state()
except Exception:
data["supply_on"] = None
try:
data["load_on"] = self.bench.load.get_load_state()
except Exception:
data["load_on"] = None
# Query meter channel ranges
for ch in (5, 6):
try:
data[f"v_range_{ch}"] = self.bench.meter.get_voltage_range(ch).strip()
except Exception:
data[f"v_range_{ch}"] = None
try:
data[f"i_range_{ch}"] = self.bench.meter.get_current_range(ch).strip()
except Exception:
data[f"i_range_{ch}"] = None
except Exception as e:
data = {"_error": str(e), "_timestamp": time.time()}
# Push to GUI (drop oldest if full)
try:
self.data_queue.put_nowait(data)
except queue.Full:
try:
self.data_queue.get_nowait()
except queue.Empty:
pass
self.data_queue.put_nowait(data)
self._stop_event.wait(timeout=self.interval)
def _execute(self, cmd: Cmd, args: tuple) -> None:
"""Execute a single command. Exceptions are swallowed and reported."""
try:
bench = self.bench
match cmd:
# Supply
case Cmd.SET_VOLTAGE:
bench.supply.set_voltage(args[0])
case Cmd.SET_CURRENT:
bench.supply.set_current(args[0])
case Cmd.APPLY:
bench.supply.set_current(args[1])
bench.supply.set_voltage(args[0])
case Cmd.OUTPUT_ON:
bench.supply.output_on()
case Cmd.OUTPUT_OFF:
bench.supply.output_off()
case Cmd.SET_OVP:
bench.supply.set_ovp_level(args[0])
bench.supply.set_ovp_state(True)
case Cmd.SET_RISE_TIME:
bench.supply.set_rise_time(args[0])
case Cmd.SET_FALL_TIME:
bench.supply.set_fall_time(args[0])
# Load
case Cmd.SET_MODE:
bench.load.set_mode(args[0])
case Cmd.SET_MODE_VALUE:
mode, value = args[0], args[1]
if mode == "CC":
bench.load.set_cc_current(value)
elif mode == "CR":
bench.load.set_cr_resistance(value)
elif mode == "CV":
bench.load.set_cv_voltage(value)
elif mode == "CP":
bench.load.set_cp_power(value)
case Cmd.LOAD_ON:
bench.load.load_on()
case Cmd.LOAD_OFF:
bench.load.load_off()
case Cmd.SET_SLEW_RISE:
bench.load.set_rise_slew(args[0])
case Cmd.SET_SLEW_FALL:
bench.load.set_fall_slew(args[0])
# Meter
case Cmd.SETUP_ALL:
bench.setup_all()
case Cmd.SET_WIRING:
bench.meter.set_wiring_mode(args[0])
case Cmd.SET_COUPLING:
bench.meter.set_coupling(args[0], args[1])
case Cmd.SET_RESPONSE_SPEED:
bench.meter.set_response_speed(args[0])
case Cmd.SET_AVERAGING:
bench.meter.set_averaging(args[0], args[1] if len(args) > 1 else None)
case Cmd.SET_VOLTAGE_RANGE:
bench.meter.set_voltage_auto(args[0], False)
bench.meter.set_voltage_range(args[0], args[1])
case Cmd.SET_CURRENT_RANGE:
bench.meter.set_current_auto(args[0], False)
bench.meter.set_current_range(args[0], args[1])
case Cmd.SET_VOLTAGE_AUTO:
bench.meter.set_voltage_auto(args[0], args[1])
case Cmd.SET_CURRENT_AUTO:
bench.meter.set_current_auto(args[0], args[1])
# System
case Cmd.SET_INTERVAL:
self.interval = args[0]
case Cmd.SAFE_OFF:
bench.safe_off()
except Exception as e:
# Push error to data queue so GUI can display it
try:
self.data_queue.put_nowait({
"_error": f"Command {cmd.name} failed: {e}",
"_timestamp": time.time(),
})
except queue.Full:
pass