"""HIOKI 3193-10 Power Analyzer GPIB driver using PyVISA.""" from __future__ import annotations import time from dataclasses import dataclass, field import pyvisa @dataclass class MeasurementResult: """Container for a single measurement snapshot.""" values: dict[str, float] timestamp: float = field(default_factory=time.time) def __repr__(self) -> str: items = ", ".join(f"{k}={v:+.4E}" for k, v in self.values.items()) return f"MeasurementResult({items})" class Hioki3193: """Driver for HIOKI 3193-10 Power Analyzer over GPIB. Args: address: VISA resource string, e.g. "GPIB0::1::INSTR" timeout_ms: Communication timeout in milliseconds. """ # Special error values from the instrument DISPLAY_BLANK = 6666.6e99 SCALING_ERROR = 7777.7e99 INPUT_OVER = 9999.9e99 def __init__(self, address: str, timeout_ms: int = 5000) -> None: self._address = address self._rm = pyvisa.ResourceManager() self._inst = self._rm.open_resource(address) self._inst.timeout = timeout_ms self._inst.read_termination = "\n" self._inst.write_termination = "\n" self._configure_for_parsing() def _configure_for_parsing(self) -> None: """Set instrument to machine-friendly output format.""" self.write(":HEADer OFF") self.write(":TRANsmit:SEParator 1") # comma separator self.write(":TRANsmit:COLumn 1") # fixed-width NR3 # -- Low-level communication -- def write(self, command: str) -> None: """Send a command to the instrument.""" self._inst.write(command) def query(self, command: str) -> str: """Send a query and return the response string.""" return self._inst.query(command).strip() def close(self) -> None: """Close the VISA connection.""" self._inst.close() self._rm.close() def __enter__(self) -> Hioki3193: return self def __exit__(self, *exc) -> None: self.close() # -- Identity & Status -- def idn(self) -> str: """Query instrument identity.""" return self.query("*IDN?") def reset(self) -> None: """Reset instrument to factory defaults and reconfigure for parsing.""" self.write("*RST") time.sleep(1) self._configure_for_parsing() def self_test(self) -> int: """Run self-test. Returns 0 if all OK, bitmask of errors otherwise.""" return int(self.query("*TST?")) def status_byte(self) -> int: """Read the status byte register.""" return int(self.query("*STB?")) def clear_status(self) -> None: """Clear status byte and all event registers.""" self.write("*CLS") def operation_complete(self) -> bool: """Block until operations complete, returns True.""" return self.query("*OPC?") == "1" def options(self) -> str: """Query installed options/input units.""" return self.query("*OPT?") # -- Wiring Mode -- def set_wiring_mode(self, mode: str) -> None: """Set wiring mode: '1P2W', '1P3W', '3P3W', '3V3A', '3P4W'.""" valid = {"1P2W", "1P3W", "3P3W", "3V3A", "3P4W"} if mode not in valid: raise ValueError(f"Invalid wiring mode '{mode}'. Must be one of {valid}") self.write(f":MODE {mode}") def get_wiring_mode(self) -> str: """Query current wiring mode.""" return self.query(":MODE?") # -- Voltage Settings -- def set_voltage_auto(self, channel: int, on: bool = True) -> None: """Enable/disable voltage auto-ranging for a channel (1-6).""" self.write(f":VOLTage{channel}:AUTO {'ON' if on else 'OFF'}") def set_voltage_range(self, channel: int, range_v: int) -> None: """Set voltage range for a channel. Values: 6,15,30,60,150,300,600,1000.""" self.write(f":VOLTage{channel}:RANGe {range_v}") def get_voltage_range(self, channel: int) -> str: """Query voltage range for a channel.""" return self.query(f":VOLTage{channel}:RANGe?") # -- Current Settings -- def set_current_auto(self, channel: int, on: bool = True) -> None: """Enable/disable current auto-ranging for a channel (1-6).""" self.write(f":CURRent{channel}:AUTO {'ON' if on else 'OFF'}") def set_current_range(self, channel: int, range_a: float) -> None: """Set current range for a channel.""" self.write(f":CURRent{channel}:RANGe {range_a}") def get_current_range(self, channel: int) -> str: """Query current range for a channel.""" return self.query(f":CURRent{channel}:RANGe?") # -- Input Coupling -- def set_coupling(self, channel: int, mode: str) -> None: """Set input coupling: 'AC', 'DC', or 'ACDC'.""" self.write(f":COUPling{channel} {mode}") def get_coupling(self, channel: int) -> str: """Query input coupling mode.""" return self.query(f":COUPling{channel}?") # -- Frequency -- def set_frequency_source(self, freq_ch: str, source: str) -> None: """Set frequency source. freq_ch='A','B','C'; source='U1','I1', etc.""" self.write(f":FREQuency{freq_ch}:SOURce {source}") def get_frequency_source(self, freq_ch: str) -> str: """Query frequency source.""" return self.query(f":FREQuency{freq_ch}:SOURce?") # -- Low-Pass Filter -- def set_lpf(self, channel: int, freq: int) -> None: """Set LPF cutoff: 0=off, 500, 5000, 300000 Hz.""" self.write(f":LPF{channel} {freq}") # -- Averaging -- def set_averaging(self, mode: str, coefficient: int | None = None) -> None: """Set averaging mode: 'TIM','LIN','EXP','OFF'. Coefficient: 8,16,32,64.""" self.write(f":AVEraging:MODE {mode}") if coefficient is not None: self.write(f":AVEraging:COEFficient {coefficient}") def get_averaging(self) -> str: """Query averaging settings.""" return self.query(":AVEraging?") # -- Response Speed -- def set_response_speed(self, speed: str) -> None: """Set response speed: 'FAST', 'MID', 'SLOW'.""" self.write(f":RESPonse {speed}") def get_response_speed(self) -> str: """Query response speed.""" return self.query(":RESPonse?") # -- Hold / Trigger -- def set_hold(self, on: bool) -> None: """Enable/disable display hold mode.""" self.write(f":HOLD {'ON' if on else 'OFF'}") def trigger(self) -> None: """Trigger a single measurement (same as GET/*TRG).""" self.write("*TRG") def wait_complete(self) -> None: """Wait until current sampling is completed.""" self.write("*WAI") # -- Efficiency Calculation -- def set_efficiency( self, formula: int, numerator: str, denominator: str, ) -> None: """Configure an efficiency formula (1-3). Args: formula: 1, 2, or 3 numerator: Power items for output, e.g. "P2" or "P2,P4" denominator: Power items for input, e.g. "P1" or "P1,P3" """ if formula not in (1, 2, 3): raise ValueError("formula must be 1, 2, or 3") self.write(f":CALCulate{formula}:NUMerator {numerator}") self.write(f":CALCulate{formula}:DENominator {denominator}") def get_efficiency_config(self, formula: int) -> str: """Query efficiency formula configuration.""" return self.query(f":CALCulate{formula}?") # -- Scaling (PT/CT/SC) -- def set_scaling_control( self, channel: int, pt: bool, ct: bool, sc: bool ) -> None: """Enable/disable PT, CT, SC scaling for a channel.""" pt_s = "ON" if pt else "OFF" ct_s = "ON" if ct else "OFF" sc_s = "ON" if sc else "OFF" self.write(f":SCALe{channel}:CONTrol {pt_s},{ct_s},{sc_s}") def set_pt_ratio(self, channel: int, ratio: float) -> None: """Set PT (potential transformer) ratio. 0.0001-10000.""" self.write(f":SCALe{channel}:PT {ratio}") def set_ct_ratio(self, channel: int, ratio: float) -> None: """Set CT (current transformer) ratio. 0.0001-10000.""" self.write(f":SCALe{channel}:CT {ratio}") # -- Integration -- def integration_reset(self) -> None: """Reset all integration values.""" self.write(":INTEGrate:RESEt") def integration_start(self, channel: int | None = None) -> None: """Start integration. None = all channels.""" cmd = ":INTEGrate:STARt" if channel is not None: cmd += f" {channel}" self.write(cmd) def integration_stop(self, channel: int | None = None) -> None: """Stop integration. None = all channels.""" cmd = ":INTEGrate:STOP" if channel is not None: cmd += f" {channel}" self.write(cmd) def integration_status(self) -> str: """Query which channels are currently integrating. '0' = none.""" return self.query(":INTEGrate?") # -- Display -- def get_display(self) -> str: """Query current display screen.""" return self.query(":DISPlay?") # -- Key Lock -- def set_keylock(self, on: bool) -> None: """Lock/unlock front panel keys.""" self.write(f":KEYLock {'ON' if on else 'OFF'}") # -- Clock -- def get_clock(self) -> str: """Query system clock.""" return self.query(":CLOCK?") def set_clock(self, year: int, month: int, day: int, hour: int, minute: int, second: int) -> None: """Set system clock.""" self.write(f":CLOCK {year},{month},{day},{hour},{minute},{second}") # -- Measurement Query (THE KEY METHOD) -- def measure(self, *items: str) -> MeasurementResult: """Query measurement data. Args: *items: Measurement item codes. If none given, uses default items set by :MEASure:ITEM commands. Examples: "U1", "I1", "P1", "EFF1", "WP1", etc. Returns: MeasurementResult with item names mapped to float values. Example: result = meter.measure("U1", "I1", "P1", "U2", "I2", "P2", "EFF1") print(result.values["P1"]) # Input power print(result.values["EFF1"]) # Efficiency % """ if items: item_str = ",".join(items) response = self.query(f":MEASure? {item_str}") keys = list(items) else: response = self.query(":MEASure?") keys = None raw_values = [v.strip() for v in response.split(",")] float_values = [float(v) for v in raw_values] if keys is None: keys = [f"item_{i}" for i in range(len(float_values))] values = dict(zip(keys, float_values)) return MeasurementResult(values=values) def set_measure_items( self, normal: list[int] | None = None, efficiency: int | None = None, frequency: int | None = None, integrate: list[int] | None = None, ) -> None: """Configure default items for :MEASure? (default mode). Args: normal: 8 bitmask values for U,I,P,S,Q,PF,DEG,PK per channel. efficiency: Bitmask for EFF1-EFF3 (0-7). frequency: Bitmask for F1-F3 (0-7). integrate: 10 bitmask values for integration items. """ self.write(":MEASure:ITEM:ALLClear") if normal is not None: vals = ",".join(str(v) for v in normal) self.write(f":MEASure:ITEM:NORMal {vals}") if efficiency is not None: self.write(f":MEASure:ITEM:EFFiciency {efficiency}") if frequency is not None: self.write(f":MEASure:ITEM:FREQuency {frequency}") if integrate is not None: vals = ",".join(str(v) for v in integrate) self.write(f":MEASure:ITEM:INTEGrate {vals}") # -- Sampling Count (RTC) -- def set_sampling_count(self, count: int) -> None: """Set RTC sampling count for event generation. 0=disabled, 8=1sec.""" self.write(f":RTC:COUNt {count}") # -- Convenience: Trigger and Read -- def trigger_and_read(self, *items: str) -> MeasurementResult: """Trigger a single measurement, wait for completion, then read. Useful in HOLD mode for synchronized single-shot readings. """ self.trigger() self.wait_complete() return self.measure(*items)