"""CLI tool for querying the HIOKI 3193-10 Power Analyzer via GPIB.""" from __future__ import annotations import argparse import csv import sys import time from hioki3193.driver import Hioki3193 def find_instrument(address: str | None) -> str: """Find the GPIB instrument address or use the provided one.""" if address: return address import pyvisa rm = pyvisa.ResourceManager() resources = rm.list_resources() rm.close() # Look for HIOKI 3193 by USB descriptor first, then fall back to GPIB hioki = [r for r in resources if "3193" in r or "03EB" in r] if hioki: print(f"Found HIOKI 3193: {hioki[0]}") return hioki[0] gpib = [r for r in resources if "GPIB" in r] if not gpib: print("No HIOKI 3193 or GPIB instruments found. Available resources:") for r in resources: print(f" {r}") sys.exit(1) if len(gpib) == 1: print(f"Found GPIB instrument: {gpib[0]}") return gpib[0] print("Multiple GPIB instruments found:") for i, r in enumerate(gpib): print(f" [{i}] {r}") choice = input("Select instrument number: ") return gpib[int(choice)] def cmd_identify(meter: Hioki3193, _args: argparse.Namespace) -> None: """Print instrument identity and options.""" print(f"Identity: {meter.idn()}") print(f"Options: {meter.options()}") print(f"Clock: {meter.get_clock()}") print(f"Mode: {meter.get_wiring_mode()}") print(f"Speed: {meter.get_response_speed()}") def cmd_measure(meter: Hioki3193, args: argparse.Namespace) -> None: """Take a single measurement.""" items = args.items if args.items else ["U5", "I5", "P5", "U6", "I6", "P6", "EFF1"] result = meter.measure(*items) for name, value in result.values.items(): print(f" {name:>8s} = {value:+.6E}") SETTLE_CYCLES = 3 # Number of cycles to skip after a range change def _read_ranges( meter: Hioki3193, voltage_channels: list[int], current_channels: list[int], ) -> list[str]: """Read current voltage and current ranges for the given channels.""" return ( [meter.get_voltage_range(ch) for ch in voltage_channels] + [meter.get_current_range(ch) for ch in current_channels] ) def cmd_monitor(meter: Hioki3193, args: argparse.Namespace) -> None: """Continuously monitor measurements at an interval.""" items = args.items if args.items else ["U5", "I5", "P5", "U6", "I6", "P6", "EFF1"] interval = args.interval # Detect which channels to log ranges for voltage_channels = sorted({int(i[1:]) for i in items if i.startswith("U") and i[1:].isdigit()}) current_channels = sorted({int(i[1:]) for i in items if i.startswith("I") and i[1:].isdigit()}) # CSV output setup writer = None outfile = None range_cols = [f"U{ch}_range" for ch in voltage_channels] + [f"I{ch}_range" for ch in current_channels] if args.output: outfile = open(args.output, "w", newline="") writer = csv.writer(outfile) writer.writerow(["timestamp"] + list(items) + range_cols) print(f"Logging to {args.output}") # Print header header = " ".join(f"{item:>12s}" for item in items) print(f"{'Time':>10s} {header}") print("-" * (12 + 14 * len(items))) # Range settle tracking prev_ranges = _read_ranges(meter, voltage_channels, current_channels) settle_remaining = 0 try: count = 0 while args.count == 0 or count < args.count: result = meter.measure(*items) ranges = _read_ranges(meter, voltage_channels, current_channels) # Detect range change if ranges != prev_ranges: settle_remaining = SETTLE_CYCLES range_desc = ", ".join( f"{a}->{b}" for a, b in zip(prev_ranges, ranges) if a != b ) print(f" ** Range changed ({range_desc}), settling {SETTLE_CYCLES} cycles...") prev_ranges = ranges ts = time.strftime("%H:%M:%S") vals = " ".join(f"{v:>+12.4E}" for v in result.values.values()) if settle_remaining > 0: print(f"{ts:>10s} {vals} [settling {settle_remaining}]") settle_remaining -= 1 time.sleep(interval) continue print(f"{ts:>10s} {vals}") if writer: writer.writerow( [time.strftime("%Y-%m-%d %H:%M:%S")] + [f"{v:.6E}" for v in result.values.values()] + ranges ) outfile.flush() count += 1 if args.count == 0 or count < args.count: time.sleep(interval) except KeyboardInterrupt: print("\nMonitoring stopped.") finally: if outfile: outfile.close() print(f"Data saved to {args.output}") def cmd_efficiency_setup(meter: Hioki3193, args: argparse.Namespace) -> None: """Configure efficiency calculation for MPPT testing.""" num = args.numerator or "P2" den = args.denominator or "P1" formula = args.formula meter.set_efficiency(formula, num, den) print(f"EFF{formula} = ({num}) / ({den}) x 100%") print(f"Config: {meter.get_efficiency_config(formula)}") def cmd_setup_mppt(meter: Hioki3193, args: argparse.Namespace) -> None: """Quick setup for solar MPPT efficiency testing. Ch5 = Solar panel input (V-auto, I-auto, DC, SLOW) Ch6 = MPPT converter output (V-auto, I-auto, DC, SLOW) EFF1 = P6/P5 x 100% """ print("Configuring for MPPT efficiency testing...") meter.set_wiring_mode("1P2W") print(" Wiring: 1P2W") # Ch5: Solar input - auto-range voltage & current, DC meter.set_coupling(5, "DC") meter.set_voltage_auto(5, True) meter.set_current_auto(5, True) print(" Ch5 (Solar input): V-auto, I-auto, DC") # Ch6: MPPT output - auto-range voltage & current, DC meter.set_coupling(6, "DC") meter.set_voltage_auto(6, True) meter.set_current_auto(6, True) print(" Ch6 (MPPT output): V-auto, I-auto, DC") # Response speed meter.set_response_speed("SLOW") print(" Response speed: SLOW") # Efficiency: output / input meter.set_efficiency(1, "P6", "P5") print(" EFF1 = P6/P5 x 100% (output/input)") # Display: Ch5 on left (1-8), Ch6 on right (9-16) display_items = "U5,I5,P5,EFF1,OFF,OFF,OFF,OFF,U6,I6,P6,OFF,OFF,OFF,OFF,OFF" meter.write(f":DISPlay:SELect16 {display_items}") print(" Display: Ch5 (left) | Ch6 (right) + EFF1") print(" Done. Use 'measure' or 'monitor' to read data.") print(" Example: hioki measure U5 I5 P5 U6 I6 P6 EFF1") def cmd_display_select(meter: Hioki3193, args: argparse.Namespace) -> None: """Switch the instrument display to the selection screen.""" count = args.count display_items = args.items # Preset: MPPT channels - Ch5 left, Ch6 right if args.mppt: display_items = ["U5", "I5", "P5", "EFF1", "OFF", "OFF", "OFF", "OFF", "U6", "I6", "P6", "OFF", "OFF", "OFF", "OFF", "OFF"] count = 16 if display_items: # Pad with OFF or truncate to match count if len(display_items) < count: display_items = display_items + ["OFF"] * (count - len(display_items)) elif len(display_items) > count: display_items = display_items[:count] item_str = ",".join(display_items) meter.write(f":DISPlay:SELect{count} {item_str}") print(f"Display items set to {count}-item selection screen:") else: meter.write(f":DISPlay:SELect{count}") print(f"Display switched to {count}-item selection screen:") # Query what's currently shown current_items = meter.query(f":DISPlay:SELect{count}?") items = [i.strip() for i in current_items.split(",")] for i, item in enumerate(items, 1): if item != "OFF": print(f" [{i:2d}] {item}") def cmd_send(meter: Hioki3193, args: argparse.Namespace) -> None: """Send a raw SCPI command.""" command = " ".join(args.raw_command) if "?" in command: response = meter.query(command) print(f"Response: {response}") else: meter.write(command) print("OK") def cmd_live(meter: Hioki3193, args: argparse.Namespace) -> None: """Live monitor with real-time graph.""" import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from collections import deque items = args.items if args.items else ["U5", "I5", "P5", "U6", "I6", "P6", "EFF1"] max_points = args.history interval_ms = int(args.interval * 1000) # Separate items into groups for subplots voltage_items = [i for i in items if i.startswith("U")] current_items = [i for i in items if i.startswith("I")] power_items = [i for i in items if i.startswith("P")] eff_items = [i for i in items if i.startswith("EFF")] other_items = [i for i in items if not any(i.startswith(p) for p in ("U", "I", "P", "EFF"))] groups: list[tuple[str, list[str], str]] = [] if voltage_items: groups.append(("Voltage", voltage_items, "V")) if current_items: groups.append(("Current", current_items, "A")) if power_items: groups.append(("Power", power_items, "W")) if eff_items: groups.append(("Efficiency", eff_items, "%")) if other_items: groups.append(("Other", other_items, "")) n_plots = len(groups) if n_plots == 0: print("No items to plot.") return # Data storage timestamps: deque[float] = deque(maxlen=max_points) data: dict[str, deque[float]] = {item: deque(maxlen=max_points) for item in items} t0 = time.time() # Detect which channels to log ranges for voltage_channels = sorted({int(i[1:]) for i in items if i.startswith("U") and i[1:].isdigit()}) current_channels = sorted({int(i[1:]) for i in items if i.startswith("I") and i[1:].isdigit()}) range_cols = [f"U{ch}_range" for ch in voltage_channels] + [f"I{ch}_range" for ch in current_channels] # CSV output setup writer = None outfile = None if args.output: outfile = open(args.output, "w", newline="") writer = csv.writer(outfile) writer.writerow(["timestamp"] + list(items) + range_cols) print(f"Logging to {args.output}") # Create figure fig, axes = plt.subplots(n_plots, 1, figsize=(12, 3 * n_plots), squeeze=False) fig.suptitle("HIOKI 3193-10 Live Monitor", fontsize=14, fontweight="bold") axes = axes.flatten() lines: dict[str, object] = {} for ax, (title, group_items, unit) in zip(axes, groups): ax.set_ylabel(f"{title} ({unit})" if unit else title) ax.set_xlabel("Time (s)") ax.grid(True, alpha=0.3) ax.set_title(title, fontsize=11) for item in group_items: line, = ax.plot([], [], label=item, linewidth=1.5) lines[item] = line ax.legend(loc="upper left", fontsize=9) fig.tight_layout() # Error threshold - values above this are instrument error codes ERROR_THRESHOLD = 1e90 # Range settle tracking (lists for mutability in closure) settle_state = {"prev": _read_ranges(meter, voltage_channels, current_channels), "remaining": 0} def update(_frame): try: result = meter.measure(*items) except Exception as e: print(f"Read error: {e}") return list(lines.values()) ranges = _read_ranges(meter, voltage_channels, current_channels) # Detect range change if ranges != settle_state["prev"]: settle_state["remaining"] = SETTLE_CYCLES range_desc = ", ".join( f"{a}->{b}" for a, b in zip(settle_state["prev"], ranges) if a != b ) print(f" ** Range changed ({range_desc}), settling {SETTLE_CYCLES} cycles...") settle_state["prev"] = ranges now = time.time() - t0 ts = time.strftime("%H:%M:%S") vals = " ".join(f"{v:>+12.4E}" for v in result.values.values()) if settle_state["remaining"] > 0: print(f"{ts} {vals} [settling {settle_state['remaining']}]") settle_state["remaining"] -= 1 return list(lines.values()) timestamps.append(now) print(f"{ts} {vals}") # CSV logging if writer: writer.writerow( [time.strftime("%Y-%m-%d %H:%M:%S")] + [f"{v:.6E}" for v in result.values.values()] + ranges ) outfile.flush() for item in items: val = result.values.get(item, 0.0) # Replace error codes with NaN so they don't wreck the scale if abs(val) > ERROR_THRESHOLD: val = float("nan") data[item].append(val) t_list = list(timestamps) for ax, (title, group_items, unit) in zip(axes, groups): for item in group_items: lines[item].set_data(t_list, list(data[item])) ax.relim() ax.autoscale_view() return list(lines.values()) anim = FuncAnimation(fig, update, interval=interval_ms, blit=False, cache_frame_data=False) try: plt.show() except KeyboardInterrupt: pass finally: if outfile: outfile.close() print(f"Data saved to {args.output}") def cmd_integration(meter: Hioki3193, args: argparse.Namespace) -> None: """Control integration (start/stop/reset/status).""" action = args.action if action == "start": meter.integration_reset() meter.integration_start() print("Integration started (all channels).") elif action == "stop": meter.integration_stop() print("Integration stopped.") result = meter.measure("WP1", "WP2", "IH1", "IH2", "TIME") print("Results:") for name, value in result.values.items(): print(f" {name:>8s} = {value:+.6E}") elif action == "reset": meter.integration_reset() print("Integration values reset.") elif action == "status": status = meter.integration_status() if status == "0": print("No channels integrating.") else: print(f"Integrating channels: {status}") def main() -> None: parser = argparse.ArgumentParser( description="HIOKI 3193-10 Power Analyzer GPIB Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s identify %(prog)s measure U5 I5 P5 U6 I6 P6 EFF1 %(prog)s monitor --interval 1.0 --output data.csv U5 I5 P5 U6 I6 P6 EFF1 %(prog)s monitor --interval 0.5 --count 100 %(prog)s live --interval 1.0 U5 I5 P5 U6 I6 P6 EFF1 %(prog)s live --interval 0.5 -o data.csv P5 P6 EFF1 %(prog)s setup-mppt %(prog)s efficiency --numerator P2 --denominator P1 %(prog)s integration start %(prog)s integration stop %(prog)s send :VOLTage1:RANGe? %(prog)s send *RST """, ) parser.add_argument( "-a", "--address", help="VISA resource address (e.g., GPIB0::1::INSTR). Auto-detects if omitted.", ) parser.add_argument( "--timeout", type=int, default=5000, help="Communication timeout in ms (default: 5000)", ) sub = parser.add_subparsers(dest="command", required=True) # identify sub.add_parser("identify", help="Show instrument identity and status") # measure p_meas = sub.add_parser("measure", help="Take a single measurement") p_meas.add_argument("items", nargs="*", help="Item codes (default: U1 I1 P1 U2 I2 P2 EFF1)") # monitor p_mon = sub.add_parser("monitor", help="Continuously monitor measurements") p_mon.add_argument("items", nargs="*", help="Item codes (default: U1 I1 P1 U2 I2 P2 EFF1)") p_mon.add_argument("-i", "--interval", type=float, default=1.0, help="Seconds between readings (default: 1.0)") p_mon.add_argument("-n", "--count", type=int, default=0, help="Number of readings (0=infinite)") p_mon.add_argument("-o", "--output", help="CSV output file path") # live (monitor with graph) p_live = sub.add_parser("live", help="Live monitor with real-time graph") p_live.add_argument("items", nargs="*", help="Item codes (default: U5 I5 P5 U6 I6 P6 EFF1)") p_live.add_argument("-i", "--interval", type=float, default=1.0, help="Seconds between readings (default: 1.0)") p_live.add_argument("-o", "--output", help="CSV output file path") p_live.add_argument("--history", type=int, default=300, help="Max data points to display (default: 300)") # efficiency p_eff = sub.add_parser("efficiency", help="Configure efficiency formula") p_eff.add_argument("-f", "--formula", type=int, default=1, choices=[1, 2, 3], help="Formula number (default: 1)") p_eff.add_argument("-n", "--numerator", default="P2", help="Numerator items (default: P2)") p_eff.add_argument("-d", "--denominator", default="P1", help="Denominator items (default: P1)") # display-select p_disp = sub.add_parser("display-select", help="Switch display to selection screen") p_disp.add_argument("items", nargs="*", help="Display item codes (must match count exactly, padded with OFF)") p_disp.add_argument("-c", "--count", type=int, default=16, choices=[4, 8, 16], help="Number of items on screen (default: 16)") p_disp.add_argument("--mppt", action="store_true", help="Preset: show Ch5+Ch6 items (U5,I5,P5,...,U6,I6,P6,...)") # setup-mppt sub.add_parser("setup-mppt", help="Quick MPPT efficiency test setup (Ch5=solar, Ch6=output)") # integration p_int = sub.add_parser("integration", help="Integration control") p_int.add_argument("action", choices=["start", "stop", "reset", "status"]) # send p_send = sub.add_parser("send", help="Send raw SCPI/GPIB command") p_send.add_argument("raw_command", nargs="+", help="Command string (queries auto-detected by '?')") args = parser.parse_args() address = find_instrument(args.address) dispatch = { "identify": cmd_identify, "measure": cmd_measure, "monitor": cmd_monitor, "live": cmd_live, "display-select": cmd_display_select, "efficiency": cmd_efficiency_setup, "setup-mppt": cmd_setup_mppt, "integration": cmd_integration, "send": cmd_send, } with Hioki3193(address, timeout_ms=args.timeout) as meter: dispatch[args.command](meter, args) if __name__ == "__main__": main()