HIOKI 3193-10 power analyzer GPIB/USB control tools

Python driver and CLI for solar MPPT converter efficiency testing.
Features: measure, monitor, live graph, auto-ranging V/I on both
channels, efficiency calculation, display configuration, integration
control, and CSV logging with voltage/current range tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-10 17:11:50 +07:00
commit 3ed39b2ac7
10 changed files with 2687 additions and 0 deletions

3
hioki3193/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from hioki3193.driver import Hioki3193
__all__ = ["Hioki3193"]

466
hioki3193/cli.py Normal file
View File

@@ -0,0 +1,466 @@
"""CLI tool for querying the HIOKI 3193-10 Power Analyzer via GPIB."""
from __future__ import annotations
import argparse
import csv
import sys
import time
from hioki3193.driver import Hioki3193
def find_instrument(address: str | None) -> str:
"""Find the GPIB instrument address or use the provided one."""
if address:
return address
import pyvisa
rm = pyvisa.ResourceManager()
resources = rm.list_resources()
rm.close()
# Look for HIOKI 3193 by USB descriptor first, then fall back to GPIB
hioki = [r for r in resources if "3193" in r or "03EB" in r]
if hioki:
print(f"Found HIOKI 3193: {hioki[0]}")
return hioki[0]
gpib = [r for r in resources if "GPIB" in r]
if not gpib:
print("No HIOKI 3193 or GPIB instruments found. Available resources:")
for r in resources:
print(f" {r}")
sys.exit(1)
if len(gpib) == 1:
print(f"Found GPIB instrument: {gpib[0]}")
return gpib[0]
print("Multiple GPIB instruments found:")
for i, r in enumerate(gpib):
print(f" [{i}] {r}")
choice = input("Select instrument number: ")
return gpib[int(choice)]
def cmd_identify(meter: Hioki3193, _args: argparse.Namespace) -> None:
"""Print instrument identity and options."""
print(f"Identity: {meter.idn()}")
print(f"Options: {meter.options()}")
print(f"Clock: {meter.get_clock()}")
print(f"Mode: {meter.get_wiring_mode()}")
print(f"Speed: {meter.get_response_speed()}")
def cmd_measure(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Take a single measurement."""
items = args.items if args.items else ["U5", "I5", "P5", "U6", "I6", "P6", "EFF1"]
result = meter.measure(*items)
for name, value in result.values.items():
print(f" {name:>8s} = {value:+.6E}")
def cmd_monitor(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Continuously monitor measurements at an interval."""
items = args.items if args.items else ["U5", "I5", "P5", "U6", "I6", "P6", "EFF1"]
interval = args.interval
# Detect which channels to log ranges for
voltage_channels = sorted({int(i[1:]) for i in items if i.startswith("U") and i[1:].isdigit()})
current_channels = sorted({int(i[1:]) for i in items if i.startswith("I") and i[1:].isdigit()})
# CSV output setup
writer = None
outfile = None
range_cols = [f"U{ch}_range" for ch in voltage_channels] + [f"I{ch}_range" for ch in current_channels]
if args.output:
outfile = open(args.output, "w", newline="")
writer = csv.writer(outfile)
writer.writerow(["timestamp"] + list(items) + range_cols)
print(f"Logging to {args.output}")
# Print header
header = " ".join(f"{item:>12s}" for item in items)
print(f"{'Time':>10s} {header}")
print("-" * (12 + 14 * len(items)))
try:
count = 0
while args.count == 0 or count < args.count:
result = meter.measure(*items)
ts = time.strftime("%H:%M:%S")
vals = " ".join(f"{v:>+12.4E}" for v in result.values.values())
print(f"{ts:>10s} {vals}")
if writer:
ranges = (
[meter.get_voltage_range(ch) for ch in voltage_channels]
+ [meter.get_current_range(ch) for ch in current_channels]
)
writer.writerow(
[time.strftime("%Y-%m-%d %H:%M:%S")]
+ [f"{v:.6E}" for v in result.values.values()]
+ ranges
)
outfile.flush()
count += 1
if args.count == 0 or count < args.count:
time.sleep(interval)
except KeyboardInterrupt:
print("\nMonitoring stopped.")
finally:
if outfile:
outfile.close()
print(f"Data saved to {args.output}")
def cmd_efficiency_setup(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Configure efficiency calculation for MPPT testing."""
num = args.numerator or "P2"
den = args.denominator or "P1"
formula = args.formula
meter.set_efficiency(formula, num, den)
print(f"EFF{formula} = ({num}) / ({den}) x 100%")
print(f"Config: {meter.get_efficiency_config(formula)}")
def cmd_setup_mppt(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Quick setup for solar MPPT efficiency testing.
Ch5 = Solar panel input (V-auto, I-auto, DC, SLOW)
Ch6 = MPPT converter output (V-auto, I-auto, DC, SLOW)
EFF1 = P6/P5 x 100%
"""
print("Configuring for MPPT efficiency testing...")
meter.set_wiring_mode("1P2W")
print(" Wiring: 1P2W")
# Ch5: Solar input - auto-range voltage & current, DC
meter.set_coupling(5, "DC")
meter.set_voltage_auto(5, True)
meter.set_current_auto(5, True)
print(" Ch5 (Solar input): V-auto, I-auto, DC")
# Ch6: MPPT output - auto-range voltage & current, DC
meter.set_coupling(6, "DC")
meter.set_voltage_auto(6, True)
meter.set_current_auto(6, True)
print(" Ch6 (MPPT output): V-auto, I-auto, DC")
# Response speed
meter.set_response_speed("SLOW")
print(" Response speed: SLOW")
# Efficiency: output / input
meter.set_efficiency(1, "P6", "P5")
print(" EFF1 = P6/P5 x 100% (output/input)")
# Display: Ch5 on left (1-8), Ch6 on right (9-16)
display_items = "U5,I5,P5,EFF1,OFF,OFF,OFF,OFF,U6,I6,P6,OFF,OFF,OFF,OFF,OFF"
meter.write(f":DISPlay:SELect16 {display_items}")
print(" Display: Ch5 (left) | Ch6 (right) + EFF1")
print(" Done. Use 'measure' or 'monitor' to read data.")
print(" Example: hioki measure U5 I5 P5 U6 I6 P6 EFF1")
def cmd_display_select(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Switch the instrument display to the selection screen."""
count = args.count
display_items = args.items
# Preset: MPPT channels - Ch5 left, Ch6 right
if args.mppt:
display_items = ["U5", "I5", "P5", "EFF1", "OFF", "OFF", "OFF", "OFF",
"U6", "I6", "P6", "OFF", "OFF", "OFF", "OFF", "OFF"]
count = 16
if display_items:
# Pad with OFF or truncate to match count
if len(display_items) < count:
display_items = display_items + ["OFF"] * (count - len(display_items))
elif len(display_items) > count:
display_items = display_items[:count]
item_str = ",".join(display_items)
meter.write(f":DISPlay:SELect{count} {item_str}")
print(f"Display items set to {count}-item selection screen:")
else:
meter.write(f":DISPlay:SELect{count}")
print(f"Display switched to {count}-item selection screen:")
# Query what's currently shown
current_items = meter.query(f":DISPlay:SELect{count}?")
items = [i.strip() for i in current_items.split(",")]
for i, item in enumerate(items, 1):
if item != "OFF":
print(f" [{i:2d}] {item}")
def cmd_send(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Send a raw SCPI command."""
command = " ".join(args.raw_command)
if "?" in command:
response = meter.query(command)
print(f"Response: {response}")
else:
meter.write(command)
print("OK")
def cmd_live(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Live monitor with real-time graph."""
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from collections import deque
items = args.items if args.items else ["U5", "I5", "P5", "U6", "I6", "P6", "EFF1"]
max_points = args.history
interval_ms = int(args.interval * 1000)
# Separate items into groups for subplots
voltage_items = [i for i in items if i.startswith("U")]
current_items = [i for i in items if i.startswith("I")]
power_items = [i for i in items if i.startswith("P")]
eff_items = [i for i in items if i.startswith("EFF")]
other_items = [i for i in items if not any(i.startswith(p) for p in ("U", "I", "P", "EFF"))]
groups: list[tuple[str, list[str], str]] = []
if voltage_items:
groups.append(("Voltage", voltage_items, "V"))
if current_items:
groups.append(("Current", current_items, "A"))
if power_items:
groups.append(("Power", power_items, "W"))
if eff_items:
groups.append(("Efficiency", eff_items, "%"))
if other_items:
groups.append(("Other", other_items, ""))
n_plots = len(groups)
if n_plots == 0:
print("No items to plot.")
return
# Data storage
timestamps: deque[float] = deque(maxlen=max_points)
data: dict[str, deque[float]] = {item: deque(maxlen=max_points) for item in items}
t0 = time.time()
# Detect which channels to log ranges for
voltage_channels = sorted({int(i[1:]) for i in items if i.startswith("U") and i[1:].isdigit()})
current_channels = sorted({int(i[1:]) for i in items if i.startswith("I") and i[1:].isdigit()})
range_cols = [f"U{ch}_range" for ch in voltage_channels] + [f"I{ch}_range" for ch in current_channels]
# CSV output setup
writer = None
outfile = None
if args.output:
outfile = open(args.output, "w", newline="")
writer = csv.writer(outfile)
writer.writerow(["timestamp"] + list(items) + range_cols)
print(f"Logging to {args.output}")
# Create figure
fig, axes = plt.subplots(n_plots, 1, figsize=(12, 3 * n_plots), squeeze=False)
fig.suptitle("HIOKI 3193-10 Live Monitor", fontsize=14, fontweight="bold")
axes = axes.flatten()
lines: dict[str, object] = {}
for ax, (title, group_items, unit) in zip(axes, groups):
ax.set_ylabel(f"{title} ({unit})" if unit else title)
ax.set_xlabel("Time (s)")
ax.grid(True, alpha=0.3)
ax.set_title(title, fontsize=11)
for item in group_items:
line, = ax.plot([], [], label=item, linewidth=1.5)
lines[item] = line
ax.legend(loc="upper left", fontsize=9)
fig.tight_layout()
# Error threshold - values above this are instrument error codes
ERROR_THRESHOLD = 1e90
def update(_frame):
try:
result = meter.measure(*items)
except Exception as e:
print(f"Read error: {e}")
return list(lines.values())
now = time.time() - t0
timestamps.append(now)
# Print to console
ts = time.strftime("%H:%M:%S")
vals = " ".join(f"{v:>+12.4E}" for v in result.values.values())
print(f"{ts} {vals}")
# CSV logging
if writer:
ranges = (
[meter.get_voltage_range(ch) for ch in voltage_channels]
+ [meter.get_current_range(ch) for ch in current_channels]
)
writer.writerow(
[time.strftime("%Y-%m-%d %H:%M:%S")]
+ [f"{v:.6E}" for v in result.values.values()]
+ ranges
)
outfile.flush()
for item in items:
val = result.values.get(item, 0.0)
# Replace error codes with NaN so they don't wreck the scale
if abs(val) > ERROR_THRESHOLD:
val = float("nan")
data[item].append(val)
t_list = list(timestamps)
for ax, (title, group_items, unit) in zip(axes, groups):
for item in group_items:
lines[item].set_data(t_list, list(data[item]))
ax.relim()
ax.autoscale_view()
return list(lines.values())
anim = FuncAnimation(fig, update, interval=interval_ms, blit=False, cache_frame_data=False)
try:
plt.show()
except KeyboardInterrupt:
pass
finally:
if outfile:
outfile.close()
print(f"Data saved to {args.output}")
def cmd_integration(meter: Hioki3193, args: argparse.Namespace) -> None:
"""Control integration (start/stop/reset/status)."""
action = args.action
if action == "start":
meter.integration_reset()
meter.integration_start()
print("Integration started (all channels).")
elif action == "stop":
meter.integration_stop()
print("Integration stopped.")
result = meter.measure("WP1", "WP2", "IH1", "IH2", "TIME")
print("Results:")
for name, value in result.values.items():
print(f" {name:>8s} = {value:+.6E}")
elif action == "reset":
meter.integration_reset()
print("Integration values reset.")
elif action == "status":
status = meter.integration_status()
if status == "0":
print("No channels integrating.")
else:
print(f"Integrating channels: {status}")
def main() -> None:
parser = argparse.ArgumentParser(
description="HIOKI 3193-10 Power Analyzer GPIB Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s identify
%(prog)s measure U5 I5 P5 U6 I6 P6 EFF1
%(prog)s monitor --interval 1.0 --output data.csv U5 I5 P5 U6 I6 P6 EFF1
%(prog)s monitor --interval 0.5 --count 100
%(prog)s live --interval 1.0 U5 I5 P5 U6 I6 P6 EFF1
%(prog)s live --interval 0.5 -o data.csv P5 P6 EFF1
%(prog)s setup-mppt
%(prog)s efficiency --numerator P2 --denominator P1
%(prog)s integration start
%(prog)s integration stop
%(prog)s send :VOLTage1:RANGe?
%(prog)s send *RST
""",
)
parser.add_argument(
"-a", "--address",
help="VISA resource address (e.g., GPIB0::1::INSTR). Auto-detects if omitted.",
)
parser.add_argument(
"--timeout", type=int, default=5000,
help="Communication timeout in ms (default: 5000)",
)
sub = parser.add_subparsers(dest="command", required=True)
# identify
sub.add_parser("identify", help="Show instrument identity and status")
# measure
p_meas = sub.add_parser("measure", help="Take a single measurement")
p_meas.add_argument("items", nargs="*", help="Item codes (default: U1 I1 P1 U2 I2 P2 EFF1)")
# monitor
p_mon = sub.add_parser("monitor", help="Continuously monitor measurements")
p_mon.add_argument("items", nargs="*", help="Item codes (default: U1 I1 P1 U2 I2 P2 EFF1)")
p_mon.add_argument("-i", "--interval", type=float, default=1.0, help="Seconds between readings (default: 1.0)")
p_mon.add_argument("-n", "--count", type=int, default=0, help="Number of readings (0=infinite)")
p_mon.add_argument("-o", "--output", help="CSV output file path")
# live (monitor with graph)
p_live = sub.add_parser("live", help="Live monitor with real-time graph")
p_live.add_argument("items", nargs="*", help="Item codes (default: U5 I5 P5 U6 I6 P6 EFF1)")
p_live.add_argument("-i", "--interval", type=float, default=1.0, help="Seconds between readings (default: 1.0)")
p_live.add_argument("-o", "--output", help="CSV output file path")
p_live.add_argument("--history", type=int, default=300, help="Max data points to display (default: 300)")
# efficiency
p_eff = sub.add_parser("efficiency", help="Configure efficiency formula")
p_eff.add_argument("-f", "--formula", type=int, default=1, choices=[1, 2, 3], help="Formula number (default: 1)")
p_eff.add_argument("-n", "--numerator", default="P2", help="Numerator items (default: P2)")
p_eff.add_argument("-d", "--denominator", default="P1", help="Denominator items (default: P1)")
# display-select
p_disp = sub.add_parser("display-select", help="Switch display to selection screen")
p_disp.add_argument("items", nargs="*", help="Display item codes (must match count exactly, padded with OFF)")
p_disp.add_argument("-c", "--count", type=int, default=16, choices=[4, 8, 16], help="Number of items on screen (default: 16)")
p_disp.add_argument("--mppt", action="store_true", help="Preset: show Ch5+Ch6 items (U5,I5,P5,...,U6,I6,P6,...)")
# setup-mppt
sub.add_parser("setup-mppt", help="Quick MPPT efficiency test setup (Ch5=solar, Ch6=output)")
# integration
p_int = sub.add_parser("integration", help="Integration control")
p_int.add_argument("action", choices=["start", "stop", "reset", "status"])
# send
p_send = sub.add_parser("send", help="Send raw SCPI/GPIB command")
p_send.add_argument("raw_command", nargs="+", help="Command string (queries auto-detected by '?')")
args = parser.parse_args()
address = find_instrument(args.address)
dispatch = {
"identify": cmd_identify,
"measure": cmd_measure,
"monitor": cmd_monitor,
"live": cmd_live,
"display-select": cmd_display_select,
"efficiency": cmd_efficiency_setup,
"setup-mppt": cmd_setup_mppt,
"integration": cmd_integration,
"send": cmd_send,
}
with Hioki3193(address, timeout_ms=args.timeout) as meter:
dispatch[args.command](meter, args)
if __name__ == "__main__":
main()

373
hioki3193/driver.py Normal file
View File

@@ -0,0 +1,373 @@
"""HIOKI 3193-10 Power Analyzer GPIB driver using PyVISA."""
from __future__ import annotations
import time
from dataclasses import dataclass, field
import pyvisa
@dataclass
class MeasurementResult:
"""Container for a single measurement snapshot."""
values: dict[str, float]
timestamp: float = field(default_factory=time.time)
def __repr__(self) -> str:
items = ", ".join(f"{k}={v:+.4E}" for k, v in self.values.items())
return f"MeasurementResult({items})"
class Hioki3193:
"""Driver for HIOKI 3193-10 Power Analyzer over GPIB.
Args:
address: VISA resource string, e.g. "GPIB0::1::INSTR"
timeout_ms: Communication timeout in milliseconds.
"""
# Special error values from the instrument
DISPLAY_BLANK = 6666.6e99
SCALING_ERROR = 7777.7e99
INPUT_OVER = 9999.9e99
def __init__(self, address: str, timeout_ms: int = 5000) -> None:
self._address = address
self._rm = pyvisa.ResourceManager()
self._inst = self._rm.open_resource(address)
self._inst.timeout = timeout_ms
self._inst.read_termination = "\n"
self._inst.write_termination = "\n"
self._configure_for_parsing()
def _configure_for_parsing(self) -> None:
"""Set instrument to machine-friendly output format."""
self.write(":HEADer OFF")
self.write(":TRANsmit:SEParator 1") # comma separator
self.write(":TRANsmit:COLumn 1") # fixed-width NR3
# -- Low-level communication --
def write(self, command: str) -> None:
"""Send a command to the instrument."""
self._inst.write(command)
def query(self, command: str) -> str:
"""Send a query and return the response string."""
return self._inst.query(command).strip()
def close(self) -> None:
"""Close the VISA connection."""
self._inst.close()
self._rm.close()
def __enter__(self) -> Hioki3193:
return self
def __exit__(self, *exc) -> None:
self.close()
# -- Identity & Status --
def idn(self) -> str:
"""Query instrument identity."""
return self.query("*IDN?")
def reset(self) -> None:
"""Reset instrument to factory defaults and reconfigure for parsing."""
self.write("*RST")
time.sleep(1)
self._configure_for_parsing()
def self_test(self) -> int:
"""Run self-test. Returns 0 if all OK, bitmask of errors otherwise."""
return int(self.query("*TST?"))
def status_byte(self) -> int:
"""Read the status byte register."""
return int(self.query("*STB?"))
def clear_status(self) -> None:
"""Clear status byte and all event registers."""
self.write("*CLS")
def operation_complete(self) -> bool:
"""Block until operations complete, returns True."""
return self.query("*OPC?") == "1"
def options(self) -> str:
"""Query installed options/input units."""
return self.query("*OPT?")
# -- Wiring Mode --
def set_wiring_mode(self, mode: str) -> None:
"""Set wiring mode: '1P2W', '1P3W', '3P3W', '3V3A', '3P4W'."""
valid = {"1P2W", "1P3W", "3P3W", "3V3A", "3P4W"}
if mode not in valid:
raise ValueError(f"Invalid wiring mode '{mode}'. Must be one of {valid}")
self.write(f":MODE {mode}")
def get_wiring_mode(self) -> str:
"""Query current wiring mode."""
return self.query(":MODE?")
# -- Voltage Settings --
def set_voltage_auto(self, channel: int, on: bool = True) -> None:
"""Enable/disable voltage auto-ranging for a channel (1-6)."""
self.write(f":VOLTage{channel}:AUTO {'ON' if on else 'OFF'}")
def set_voltage_range(self, channel: int, range_v: int) -> None:
"""Set voltage range for a channel. Values: 6,15,30,60,150,300,600,1000."""
self.write(f":VOLTage{channel}:RANGe {range_v}")
def get_voltage_range(self, channel: int) -> str:
"""Query voltage range for a channel."""
return self.query(f":VOLTage{channel}:RANGe?")
# -- Current Settings --
def set_current_auto(self, channel: int, on: bool = True) -> None:
"""Enable/disable current auto-ranging for a channel (1-6)."""
self.write(f":CURRent{channel}:AUTO {'ON' if on else 'OFF'}")
def set_current_range(self, channel: int, range_a: float) -> None:
"""Set current range for a channel."""
self.write(f":CURRent{channel}:RANGe {range_a}")
def get_current_range(self, channel: int) -> str:
"""Query current range for a channel."""
return self.query(f":CURRent{channel}:RANGe?")
# -- Input Coupling --
def set_coupling(self, channel: int, mode: str) -> None:
"""Set input coupling: 'AC', 'DC', or 'ACDC'."""
self.write(f":COUPling{channel} {mode}")
def get_coupling(self, channel: int) -> str:
"""Query input coupling mode."""
return self.query(f":COUPling{channel}?")
# -- Frequency --
def set_frequency_source(self, freq_ch: str, source: str) -> None:
"""Set frequency source. freq_ch='A','B','C'; source='U1','I1', etc."""
self.write(f":FREQuency{freq_ch}:SOURce {source}")
def get_frequency_source(self, freq_ch: str) -> str:
"""Query frequency source."""
return self.query(f":FREQuency{freq_ch}:SOURce?")
# -- Low-Pass Filter --
def set_lpf(self, channel: int, freq: int) -> None:
"""Set LPF cutoff: 0=off, 500, 5000, 300000 Hz."""
self.write(f":LPF{channel} {freq}")
# -- Averaging --
def set_averaging(self, mode: str, coefficient: int | None = None) -> None:
"""Set averaging mode: 'TIM','LIN','EXP','OFF'. Coefficient: 8,16,32,64."""
self.write(f":AVEraging:MODE {mode}")
if coefficient is not None:
self.write(f":AVEraging:COEFficient {coefficient}")
def get_averaging(self) -> str:
"""Query averaging settings."""
return self.query(":AVEraging?")
# -- Response Speed --
def set_response_speed(self, speed: str) -> None:
"""Set response speed: 'FAST', 'MID', 'SLOW'."""
self.write(f":RESPonse {speed}")
def get_response_speed(self) -> str:
"""Query response speed."""
return self.query(":RESPonse?")
# -- Hold / Trigger --
def set_hold(self, on: bool) -> None:
"""Enable/disable display hold mode."""
self.write(f":HOLD {'ON' if on else 'OFF'}")
def trigger(self) -> None:
"""Trigger a single measurement (same as GET/*TRG)."""
self.write("*TRG")
def wait_complete(self) -> None:
"""Wait until current sampling is completed."""
self.write("*WAI")
# -- Efficiency Calculation --
def set_efficiency(
self,
formula: int,
numerator: str,
denominator: str,
) -> None:
"""Configure an efficiency formula (1-3).
Args:
formula: 1, 2, or 3
numerator: Power items for output, e.g. "P2" or "P2,P4"
denominator: Power items for input, e.g. "P1" or "P1,P3"
"""
if formula not in (1, 2, 3):
raise ValueError("formula must be 1, 2, or 3")
self.write(f":CALCulate{formula}:NUMerator {numerator}")
self.write(f":CALCulate{formula}:DENominator {denominator}")
def get_efficiency_config(self, formula: int) -> str:
"""Query efficiency formula configuration."""
return self.query(f":CALCulate{formula}?")
# -- Scaling (PT/CT/SC) --
def set_scaling_control(
self, channel: int, pt: bool, ct: bool, sc: bool
) -> None:
"""Enable/disable PT, CT, SC scaling for a channel."""
pt_s = "ON" if pt else "OFF"
ct_s = "ON" if ct else "OFF"
sc_s = "ON" if sc else "OFF"
self.write(f":SCALe{channel}:CONTrol {pt_s},{ct_s},{sc_s}")
def set_pt_ratio(self, channel: int, ratio: float) -> None:
"""Set PT (potential transformer) ratio. 0.0001-10000."""
self.write(f":SCALe{channel}:PT {ratio}")
def set_ct_ratio(self, channel: int, ratio: float) -> None:
"""Set CT (current transformer) ratio. 0.0001-10000."""
self.write(f":SCALe{channel}:CT {ratio}")
# -- Integration --
def integration_reset(self) -> None:
"""Reset all integration values."""
self.write(":INTEGrate:RESEt")
def integration_start(self, channel: int | None = None) -> None:
"""Start integration. None = all channels."""
cmd = ":INTEGrate:STARt"
if channel is not None:
cmd += f" {channel}"
self.write(cmd)
def integration_stop(self, channel: int | None = None) -> None:
"""Stop integration. None = all channels."""
cmd = ":INTEGrate:STOP"
if channel is not None:
cmd += f" {channel}"
self.write(cmd)
def integration_status(self) -> str:
"""Query which channels are currently integrating. '0' = none."""
return self.query(":INTEGrate?")
# -- Display --
def get_display(self) -> str:
"""Query current display screen."""
return self.query(":DISPlay?")
# -- Key Lock --
def set_keylock(self, on: bool) -> None:
"""Lock/unlock front panel keys."""
self.write(f":KEYLock {'ON' if on else 'OFF'}")
# -- Clock --
def get_clock(self) -> str:
"""Query system clock."""
return self.query(":CLOCK?")
def set_clock(self, year: int, month: int, day: int, hour: int, minute: int, second: int) -> None:
"""Set system clock."""
self.write(f":CLOCK {year},{month},{day},{hour},{minute},{second}")
# -- Measurement Query (THE KEY METHOD) --
def measure(self, *items: str) -> MeasurementResult:
"""Query measurement data.
Args:
*items: Measurement item codes. If none given, uses default items
set by :MEASure:ITEM commands.
Examples: "U1", "I1", "P1", "EFF1", "WP1", etc.
Returns:
MeasurementResult with item names mapped to float values.
Example:
result = meter.measure("U1", "I1", "P1", "U2", "I2", "P2", "EFF1")
print(result.values["P1"]) # Input power
print(result.values["EFF1"]) # Efficiency %
"""
if items:
item_str = ",".join(items)
response = self.query(f":MEASure? {item_str}")
keys = list(items)
else:
response = self.query(":MEASure?")
keys = None
raw_values = [v.strip() for v in response.split(",")]
float_values = [float(v) for v in raw_values]
if keys is None:
keys = [f"item_{i}" for i in range(len(float_values))]
values = dict(zip(keys, float_values))
return MeasurementResult(values=values)
def set_measure_items(
self,
normal: list[int] | None = None,
efficiency: int | None = None,
frequency: int | None = None,
integrate: list[int] | None = None,
) -> None:
"""Configure default items for :MEASure? (default mode).
Args:
normal: 8 bitmask values for U,I,P,S,Q,PF,DEG,PK per channel.
efficiency: Bitmask for EFF1-EFF3 (0-7).
frequency: Bitmask for F1-F3 (0-7).
integrate: 10 bitmask values for integration items.
"""
self.write(":MEASure:ITEM:ALLClear")
if normal is not None:
vals = ",".join(str(v) for v in normal)
self.write(f":MEASure:ITEM:NORMal {vals}")
if efficiency is not None:
self.write(f":MEASure:ITEM:EFFiciency {efficiency}")
if frequency is not None:
self.write(f":MEASure:ITEM:FREQuency {frequency}")
if integrate is not None:
vals = ",".join(str(v) for v in integrate)
self.write(f":MEASure:ITEM:INTEGrate {vals}")
# -- Sampling Count (RTC) --
def set_sampling_count(self, count: int) -> None:
"""Set RTC sampling count for event generation. 0=disabled, 8=1sec."""
self.write(f":RTC:COUNt {count}")
# -- Convenience: Trigger and Read --
def trigger_and_read(self, *items: str) -> MeasurementResult:
"""Trigger a single measurement, wait for completion, then read.
Useful in HOLD mode for synchronized single-shot readings.
"""
self.trigger()
self.wait_complete()
return self.measure(*items)