"""Background worker thread for instrument I/O. Keeps all VISA/serial communication off the GUI thread so tkinter never freezes during instrument queries. """ from __future__ import annotations import queue import threading import time from enum import Enum, auto class Cmd(Enum): """Commands sent from GUI to worker thread.""" # Supply SET_VOLTAGE = auto() SET_CURRENT = auto() APPLY = auto() OUTPUT_ON = auto() OUTPUT_OFF = auto() SET_OVP = auto() SET_RISE_TIME = auto() SET_FALL_TIME = auto() # Load SET_MODE = auto() SET_MODE_VALUE = auto() LOAD_ON = auto() LOAD_OFF = auto() SET_SLEW_RISE = auto() SET_SLEW_FALL = auto() # Meter SETUP_ALL = auto() SET_WIRING = auto() SET_COUPLING = auto() SET_RESPONSE_SPEED = auto() SET_AVERAGING = auto() SET_VOLTAGE_RANGE = auto() SET_CURRENT_RANGE = auto() SET_VOLTAGE_AUTO = auto() SET_CURRENT_AUTO = auto() DEGAUSS = auto() # System SET_INTERVAL = auto() SAFE_OFF = auto() class InstrumentWorker(threading.Thread): """Daemon thread that polls instruments and processes commands. Usage: worker = InstrumentWorker(bench) worker.start() worker.send(Cmd.SET_VOLTAGE, 48.0) data = worker.get_data() # dict or None worker.stop() """ def __init__(self, bench, interval: float = 1.0) -> None: super().__init__(daemon=True) self.bench = bench self.interval = interval self.cmd_queue: queue.Queue = queue.Queue() self.data_queue: queue.Queue = queue.Queue(maxsize=100) self._stop_event = threading.Event() def send(self, cmd: Cmd, *args) -> None: """Queue a command for execution on the worker thread.""" self.cmd_queue.put((cmd, args)) def get_data(self) -> dict | None: """Get the latest measurement data (non-blocking). Returns None if empty.""" result = None # Drain queue, keep only the latest while True: try: result = self.data_queue.get_nowait() except queue.Empty: break return result def stop(self) -> None: """Signal the worker to stop.""" self._stop_event.set() def run(self) -> None: """Main worker loop: process commands, then read instruments.""" while not self._stop_event.is_set(): # Process all pending commands while True: try: cmd, args = self.cmd_queue.get_nowait() self._execute(cmd, args) except queue.Empty: break # Read all instruments try: data = self.bench.measure_all() data["_error"] = None data["_timestamp"] = time.time() # Query output states for GUI indicators try: data["supply_on"] = self.bench.supply.get_output_state() except Exception: data["supply_on"] = None try: data["load_on"] = self.bench.load.get_load_state() except Exception: data["load_on"] = None # Query meter channel ranges for ch in (5, 6): try: data[f"v_range_{ch}"] = self.bench.meter.get_voltage_range(ch).strip() except Exception: data[f"v_range_{ch}"] = None try: data[f"i_range_{ch}"] = self.bench.meter.get_current_range(ch).strip() except Exception: data[f"i_range_{ch}"] = None except Exception as e: data = {"_error": str(e), "_timestamp": time.time()} # Push to GUI (drop oldest if full) try: self.data_queue.put_nowait(data) except queue.Full: try: self.data_queue.get_nowait() except queue.Empty: pass self.data_queue.put_nowait(data) self._stop_event.wait(timeout=self.interval) def _execute(self, cmd: Cmd, args: tuple) -> None: """Execute a single command. Exceptions are swallowed and reported.""" try: bench = self.bench match cmd: # Supply case Cmd.SET_VOLTAGE: bench.supply.set_voltage(args[0]) case Cmd.SET_CURRENT: bench.supply.set_current(args[0]) case Cmd.APPLY: bench.supply.set_current(args[1]) bench.supply.set_voltage(args[0]) case Cmd.OUTPUT_ON: bench.supply.output_on() case Cmd.OUTPUT_OFF: bench.supply.output_off() case Cmd.SET_OVP: bench.supply.set_ovp_level(args[0]) bench.supply.set_ovp_state(True) case Cmd.SET_RISE_TIME: bench.supply.set_rise_time(args[0]) case Cmd.SET_FALL_TIME: bench.supply.set_fall_time(args[0]) # Load case Cmd.SET_MODE: bench.load.set_mode(args[0]) case Cmd.SET_MODE_VALUE: mode, value = args[0], args[1] if mode == "CC": bench.load.set_cc_current(value) elif mode == "CR": bench.load.set_cr_resistance(value) elif mode == "CV": bench.load.set_cv_voltage(value) elif mode == "CP": bench.load.set_cp_power(value) case Cmd.LOAD_ON: bench.load.load_on() case Cmd.LOAD_OFF: bench.load.load_off() case Cmd.SET_SLEW_RISE: bench.load.set_rise_slew(args[0]) case Cmd.SET_SLEW_FALL: bench.load.set_fall_slew(args[0]) # Meter case Cmd.SETUP_ALL: bench.setup_all() case Cmd.SET_WIRING: bench.meter.set_wiring_mode(args[0]) case Cmd.SET_COUPLING: bench.meter.set_coupling(args[0], args[1]) case Cmd.SET_RESPONSE_SPEED: bench.meter.set_response_speed(args[0]) case Cmd.SET_AVERAGING: bench.meter.set_averaging(args[0], args[1] if len(args) > 1 else None) case Cmd.SET_VOLTAGE_RANGE: bench.meter.set_voltage_auto(args[0], False) bench.meter.set_voltage_range(args[0], args[1]) case Cmd.SET_CURRENT_RANGE: bench.meter.set_current_auto(args[0], False) bench.meter.set_current_range(args[0], args[1]) case Cmd.SET_VOLTAGE_AUTO: bench.meter.set_voltage_auto(args[0], args[1]) case Cmd.SET_CURRENT_AUTO: bench.meter.set_current_auto(args[0], args[1]) case Cmd.DEGAUSS: channels = args[0] if args else [5, 6] items = ",".join(f"I{ch}" for ch in channels) bench.meter.write(f":DEMAg {items}") # System case Cmd.SET_INTERVAL: self.interval = args[0] case Cmd.SAFE_OFF: bench.safe_off() except Exception as e: # Push error to data queue so GUI can display it try: self.data_queue.put_nowait({ "_error": f"Command {cmd.name} failed: {e}", "_timestamp": time.time(), }) except queue.Full: pass