Files
HPCS6500-py/hpcs6500.py
grabowski 0f09c70215 first commit
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 13:57:58 +07:00

830 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HPCS 6500 Spectrophotometer / Integrating Sphere — Direct Reader
Communicates with the HPCS 6500 over USB serial (STM32 VCP) to take
measurements and extract photometric, colorimetric, radiometric,
electrical, and spectral data.
Usage:
uv run hpcs6500.py # auto-detect port, single reading
uv run hpcs6500.py --port COM42 # specify port
uv run hpcs6500.py --continuous # continuous readings
uv run hpcs6500.py --csv out.csv # save to CSV
uv run hpcs6500.py --spectrum # also print full spectrum
"""
import argparse
import csv
import struct
import sys
import time
from datetime import datetime
import serial
import serial.tools.list_ports
# Protocol constants
SYNC = 0x8C
CMD_IDENTIFY = 0x00
CMD_INTEGRATION_TIME = 0x01
CMD_POLL_STATE = 0x03
CMD_STATUS = 0x05
CMD_START = 0x0E
CMD_READ_MEASUREMENT = 0x13
CMD_RESET = 0x25
CMD_CONFIG = 0x2A
CMD_SET_DC = 0x73
CMD_READ_ELECTRICAL = 0x77
CMD_SET_AC = 0x78
CMD_READ_PSU = 0x79
CMD_PSU_OUTPUT = 0x72
CMD_SET_MODE = 0x7A
# State polling byte[5] values
STATE_IDLE = 0x04
STATE_MEASURING = 0x01
# Spectrum parameters
SPECTRUM_START_NM = 380
SPECTRUM_END_NM = 1050
SPECTRUM_OFFSET = 432
SPECTRUM_COUNT = 350
SPECTRUM_STEP = (SPECTRUM_END_NM - SPECTRUM_START_NM) / (SPECTRUM_COUNT - 1)
# Measurement block field offsets (all float32 LE)
FIELDS = {
"Phi_lm": 36,
"eta_lm_W": 40,
"CCT_K": 44,
"Duv": 48,
"x": 52,
"y": 56,
"u": 60,
"v": 64,
"u_prime": 68,
"v_prime": 72,
"SDCM": 76,
"Ra": 80,
"R1": 84,
"R2": 88,
"R3": 92,
"R4": 96,
"R5": 100,
"R6": 104,
"R7": 108,
"R8": 112,
"R9": 116,
"R10": 120,
"R11": 124,
"R12": 128,
"R13": 132,
"R14": 136,
"R15": 140,
"Phi_e_mW": 144,
"Phi_euv_mW": 148,
"Phi_eb_mW": 152,
"Phi_ey_mW": 156,
"Phi_er_mW": 160,
"Phi_efr_mW": 164,
"Phi_eir_mW": 168,
"Phi_e_total": 172,
"CIE_X": 224,
"CIE_Y": 228,
"CIE_Z": 232,
"TLCI": 236,
"PeakSignal": 244,
"DarkSignal": 248,
"Compensate": 252,
}
# Electrical block field offsets
ELECTRICAL_FIELDS = {
"Voltage_V": 8,
"Current_A": 12,
"Power_W": 16,
"Freq_Hz": 20,
"PF": 24,
}
# Harmonics offsets within the electrical block
HARMONICS_V_OFFSET = 544 # 50 × float32 voltage harmonics (%)
HARMONICS_V_COUNT = 50
UTHD_OFFSET = 744 # float32 UThd (%)
HARMONICS_I_OFFSET = 800 # 50 × float32 current harmonics (%)
HARMONICS_I_COUNT = 50
ATHD_OFFSET = 1000 # float32 AThd (%)
# Waveform offsets (int16 LE samples, 128 per waveform)
WAVEFORM_V_OFFSET = 30
WAVEFORM_I_OFFSET = 286
WAVEFORM_SAMPLES = 128
def parse_pcap_messages(filepath):
"""Parse a USBPcap pcap file and extract bulk transfer payloads."""
with open(filepath, "rb") as f:
data = f.read()
magic = struct.unpack_from("<I", data, 0)[0]
endian = "<" if magic == 0xA1B2C3D4 else ">"
offset = 24
messages = []
first_ts = None
while offset + 16 <= len(data):
ts_sec, ts_usec, incl_len, orig_len = struct.unpack_from(
f"{endian}IIII", data, offset
)
offset += 16
if offset + incl_len > len(data):
break
pkt_data = data[offset : offset + incl_len]
offset += incl_len
if len(pkt_data) < 27:
continue
hdr_len = struct.unpack_from("<H", pkt_data, 0)[0]
endpoint = pkt_data[21]
transfer = pkt_data[22]
payload = pkt_data[hdr_len:]
if len(payload) == 0 or transfer != 3:
continue
ts = ts_sec + ts_usec / 1_000_000
if first_ts is None:
first_ts = ts
ep_dir = "IN" if (endpoint & 0x80) else "OUT"
messages.append(
{
"ts": ts - first_ts,
"dir": "TX" if ep_dir == "OUT" else "RX",
"data": payload,
}
)
return messages
class HPCS6500:
"""Driver for the HPCS 6500 spectrophotometer."""
def __init__(self, port, timeout=2.0):
self.ser = serial.Serial(
port=port,
baudrate=115200,
timeout=timeout,
write_timeout=timeout,
)
self.port = port
def close(self):
self.ser.close()
def _send(self, *cmd_bytes):
"""Send a command to the device."""
data = bytes([SYNC] + list(cmd_bytes))
self.ser.write(data)
def _recv(self, expected_len, timeout=3.0):
"""Read expected_len bytes from the device."""
data = b""
deadline = time.monotonic() + timeout
while len(data) < expected_len:
remaining = expected_len - len(data)
chunk = self.ser.read(remaining)
if chunk:
data += chunk
elif time.monotonic() > deadline:
break
return data
def _recv_response(self, cmd_id, timeout=3.0):
"""Read a response, expecting it to start with 8C <cmd_id>."""
# Read the 2-byte header first
hdr = self._recv(2, timeout)
if len(hdr) < 2:
return None
if hdr[0] != SYNC or hdr[1] != cmd_id:
# Try to resync — read until we find 8C <cmd_id>
buf = hdr
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
b = self.ser.read(1)
if not b:
continue
buf += b
idx = buf.find(bytes([SYNC, cmd_id]))
if idx >= 0:
hdr = buf[idx:idx+2]
break
else:
return None
return hdr
def identify(self):
"""Send identify command, return device name string."""
self.ser.reset_input_buffer()
self._send(CMD_IDENTIFY)
resp = self._recv(16, timeout=2.0)
if len(resp) < 16:
return None
if resp[0] != SYNC or resp[1] != CMD_IDENTIFY:
return None
name = resp[2:10].decode("ascii", errors="replace").rstrip("\x00")
return name
def read_measurement_block(self):
"""Send 8C 13 and read the 3904-byte measurement block."""
self.ser.reset_input_buffer()
self._send(CMD_READ_MEASUREMENT)
# First, read the 4-byte header: 8C 13 0F 40
hdr = self._recv(4, timeout=3.0)
if len(hdr) < 4 or hdr[0] != SYNC or hdr[1] != CMD_READ_MEASUREMENT:
return None
# Then read the 3904-byte payload
payload = self._recv(3904, timeout=5.0)
if len(payload) < 3904:
print(f"Warning: measurement block short ({len(payload)}/3904 bytes)")
return None
return payload
def read_electrical_block(self):
"""Send 8C 77 and read the 1584-byte electrical block."""
self.ser.reset_input_buffer()
self._send(CMD_READ_ELECTRICAL)
# 4-byte header: 8C 77 06 30
hdr = self._recv(4, timeout=3.0)
if len(hdr) < 4 or hdr[0] != SYNC or hdr[1] != CMD_READ_ELECTRICAL:
return None
payload = self._recv(1584, timeout=5.0)
if len(payload) < 1584:
print(f"Warning: electrical block short ({len(payload)}/1584 bytes)")
return None
return payload
def poll_state(self):
"""Send 8C 03 and return (data_available, state)."""
self.ser.reset_input_buffer()
self._send(CMD_POLL_STATE)
resp = self._recv(9, timeout=2.0)
if len(resp) < 9 or resp[0] != SYNC or resp[1] != CMD_POLL_STATE:
return None, None
data_available = resp[2] == 1
state = resp[5]
return data_available, state
def start_measurement(self):
"""Send 8C 0E 01 to start measurement."""
self.ser.reset_input_buffer()
self._send(CMD_START, 0x01)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_START])
def stop_measurement(self):
"""Send 8C 0E 02 to stop measurement."""
self.ser.reset_input_buffer()
self._send(CMD_START, 0x02)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_START])
def reset(self):
"""Send 8C 25 to reset."""
self.ser.reset_input_buffer()
self._send(CMD_RESET)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_RESET])
# --- Power supply control ---
def psu_on(self):
"""Turn PSU output on."""
self.ser.reset_input_buffer()
self._send(CMD_PSU_OUTPUT, 0x00)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_PSU_OUTPUT])
def psu_off(self):
"""Turn PSU output off."""
self.ser.reset_input_buffer()
self._send(CMD_PSU_OUTPUT, 0x01)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_PSU_OUTPUT])
def set_mode(self, mode):
"""Set output mode: 'ac' or 'dc'."""
mode_byte = 0x00 if mode.lower() == "ac" else 0x01
self.ser.reset_input_buffer()
self._send(CMD_SET_MODE, mode_byte)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_MODE])
def set_ac_voltage(self, volts):
"""Set AC output voltage (100-240 V)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_AC, 0x00]) + struct.pack("<f", volts)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_AC])
def set_ac_frequency(self, hz):
"""Set AC output frequency (50 or 60 Hz)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_AC, 0x01]) + struct.pack("<f", hz)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_AC])
def set_dc_voltage(self, volts):
"""Set DC output voltage (1-60 V)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_DC, 0x01]) + struct.pack("<f", volts)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_DC])
def set_dc_current(self, amps):
"""Set DC current limit (0-5 A)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_DC, 0x00]) + struct.pack("<f", amps)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_DC])
def set_integration_time(self, ms):
"""Set integration time in milliseconds. Use 0 for auto."""
us = int(ms * 1000)
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_INTEGRATION_TIME]) + struct.pack("<I", us)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_INTEGRATION_TIME])
def read_psu_settings(self):
"""Read current power supply settings."""
self.ser.reset_input_buffer()
self._send(CMD_READ_PSU)
resp = self._recv(20, timeout=2.0)
if len(resp) < 20 or resp[0] != SYNC or resp[1] != CMD_READ_PSU:
return None
payload = resp[2:]
return {
"ac_voltage": struct.unpack_from("<f", payload, 0)[0],
"ac_frequency": struct.unpack_from("<f", payload, 4)[0],
"dc_voltage": struct.unpack_from("<f", payload, 8)[0],
"dc_current": struct.unpack_from("<f", payload, 12)[0],
"mode": "DC" if payload[16] == 1 else "AC",
}
def wait_for_data(self, timeout=30.0):
"""Poll until data is available or timeout."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
data_avail, state = self.poll_state()
if data_avail:
return True
time.sleep(0.1)
return False
def take_reading(self, psu=True):
"""Take a single measurement reading. Returns dict or None.
If psu=True, turns PSU output on before measuring and off after."""
if psu:
self.psu_on()
time.sleep(0.2)
# Start measurement
if not self.start_measurement():
print("Failed to start measurement")
if psu:
self.psu_off()
return None
# Wait for data
if not self.wait_for_data(timeout=30.0):
print("Timeout waiting for measurement data")
self.stop_measurement()
if psu:
self.psu_off()
return None
# Read measurement block
meas = self.read_measurement_block()
elec = self.read_electrical_block()
if meas is None:
print("Failed to read measurement block")
self.stop_measurement()
if psu:
self.psu_off()
return None
result = self._parse_measurement(meas)
if elec is not None:
result.update(self._parse_electrical(elec))
# Stop measurement
self.stop_measurement()
if psu:
self.psu_off()
return result
def read_current(self):
"""Read current data without starting/stopping measurement.
Use this when the vendor software is controlling the instrument."""
meas = self.read_measurement_block()
elec = self.read_electrical_block()
if meas is None:
return None
result = self._parse_measurement(meas)
if elec is not None:
result.update(self._parse_electrical(elec))
return result
def _parse_measurement(self, data):
"""Parse the 3904-byte measurement block into a dict."""
result = {}
# Check header
header = data[:10].decode("ascii", errors="replace").rstrip("\x00")
result["device"] = header
# Extract all named fields
for name, offset in FIELDS.items():
if offset + 4 <= len(data):
result[name] = struct.unpack_from("<f", data, offset)[0]
# Extract date/time strings
date_off = 272
time_off = 283
if date_off + 10 <= len(data):
date_str = data[date_off:date_off+10].decode("ascii", errors="replace").rstrip("\x00")
result["test_date"] = date_str
if time_off + 8 <= len(data):
time_str = data[time_off:time_off+8].decode("ascii", errors="replace").rstrip("\x00")
result["test_time"] = time_str
# Extract spectrum
spectrum = []
for i in range(SPECTRUM_COUNT):
off = SPECTRUM_OFFSET + i * 4
if off + 4 <= len(data):
val = struct.unpack_from("<f", data, off)[0]
spectrum.append(val)
result["spectrum"] = spectrum
result["spectrum_nm"] = [
SPECTRUM_START_NM + i * SPECTRUM_STEP for i in range(len(spectrum))
]
return result
def _parse_electrical(self, data):
"""Parse the 1584-byte electrical block into a dict."""
result = {}
for name, offset in ELECTRICAL_FIELDS.items():
if offset + 4 <= len(data):
result[name] = struct.unpack_from("<f", data, offset)[0]
# Check if harmonics data is present (non-zero at harmonics offset)
if len(data) >= ATHD_OFFSET + 4:
h1_v = struct.unpack_from("<f", data, HARMONICS_V_OFFSET)[0]
if abs(h1_v - 100.0) < 1.0: # H1 should be ~100%
# Voltage harmonics
v_harmonics = []
for i in range(HARMONICS_V_COUNT):
off = HARMONICS_V_OFFSET + i * 4
v_harmonics.append(struct.unpack_from("<f", data, off)[0])
result["V_harmonics"] = v_harmonics
result["UThd"] = struct.unpack_from("<f", data, UTHD_OFFSET)[0]
# Current harmonics
i_harmonics = []
for i in range(HARMONICS_I_COUNT):
off = HARMONICS_I_OFFSET + i * 4
i_harmonics.append(struct.unpack_from("<f", data, off)[0])
result["I_harmonics"] = i_harmonics
result["AThd"] = struct.unpack_from("<f", data, ATHD_OFFSET)[0]
# Waveforms (int16 LE)
v_waveform = []
for i in range(WAVEFORM_SAMPLES):
off = WAVEFORM_V_OFFSET + i * 2
v_waveform.append(struct.unpack_from("<h", data, off)[0])
result["V_waveform"] = v_waveform
i_waveform = []
for i in range(WAVEFORM_SAMPLES):
off = WAVEFORM_I_OFFSET + i * 2
i_waveform.append(struct.unpack_from("<h", data, off)[0])
result["I_waveform"] = i_waveform
return result
def find_hpcs_port():
"""Auto-detect the HPCS 6500 COM port."""
for port in serial.tools.list_ports.comports():
if port.vid == 0x0483 and port.pid == 0x5741:
return port.device
return None
def format_reading(r, show_spectrum=False):
"""Format a reading dict for display."""
lines = []
lines.append(f" Device: {r.get('device', '?')}")
lines.append(f" Date: {r.get('test_date', '?')} Time: {r.get('test_time', '?')}")
lines.append("")
lines.append(" --- Photometric ---")
lines.append(f" Phi (lm): {r.get('Phi_lm', 0):10.2f}")
lines.append(f" eta (lm/W): {r.get('eta_lm_W', 0):10.2f}")
lines.append(f" CCT (K): {r.get('CCT_K', 0):10.0f}")
lines.append(f" Duv: {r.get('Duv', 0):10.5f}")
lines.append("")
lines.append(" --- Chromaticity ---")
lines.append(f" x: {r.get('x', 0):.4f} y: {r.get('y', 0):.4f}")
lines.append(f" u: {r.get('u', 0):.4f} v: {r.get('v', 0):.4f}")
lines.append(f" u': {r.get('u_prime', 0):.4f} v': {r.get('v_prime', 0):.4f}")
lines.append("")
lines.append(" --- CRI ---")
ra = r.get('Ra', 0)
lines.append(f" Ra: {ra:.1f} SDCM: {r.get('SDCM', 0):.2f}")
ri = []
for i in range(1, 16):
ri.append(f"R{i}={r.get(f'R{i}', 0):.0f}")
lines.append(f" {', '.join(ri[:8])}")
lines.append(f" {', '.join(ri[8:])}")
lines.append("")
lines.append(" --- CIE 1931 ---")
lines.append(f" X: {r.get('CIE_X', 0):.3f} Y: {r.get('CIE_Y', 0):.3f} Z: {r.get('CIE_Z', 0):.3f}")
lines.append(f" TLCI-2012: {r.get('TLCI', 0):.0f}")
lines.append("")
lines.append(" --- Radiometric ---")
lines.append(f" Phi_e (mW): {r.get('Phi_e_mW', 0):10.3f}")
lines.append(f" Phi_eb (mW): {r.get('Phi_eb_mW', 0):10.3f}")
lines.append(f" Phi_ey (mW): {r.get('Phi_ey_mW', 0):10.3f}")
lines.append(f" Phi_er (mW): {r.get('Phi_er_mW', 0):10.3f}")
lines.append(f" Phi_efr (mW): {r.get('Phi_efr_mW', 0):10.3f}")
lines.append("")
lines.append(" --- Electrical ---")
lines.append(f" Voltage (V): {r.get('Voltage_V', 0):10.2f}")
lines.append(f" Current (A): {r.get('Current_A', 0):10.4f}")
lines.append(f" Power (W): {r.get('Power_W', 0):10.3f}")
lines.append(f" Frequency (Hz):{r.get('Freq_Hz', 0):10.2f}")
lines.append(f" Power Factor: {r.get('PF', 0):10.4f}")
if "UThd" in r:
lines.append(f" UThd (%): {r['UThd']:10.4f}")
lines.append(f" AThd (%): {r.get('AThd', 0):10.4f}")
lines.append("")
if show_spectrum and "V_harmonics" in r:
lines.append(" --- Voltage Harmonics (%) ---")
vh = r["V_harmonics"]
for i, val in enumerate(vh):
if val > 0.001 or i == 0:
lines.append(f" H{i+1:2d}: {val:8.4f}")
lines.append("")
lines.append(" --- Current Harmonics (%) ---")
ih = r["I_harmonics"]
for i, val in enumerate(ih):
if val > 0.001 or i == 0:
lines.append(f" H{i+1:2d}: {val:8.4f}")
lines.append("")
lines.append(" --- Sensor ---")
lines.append(f" Peak Signal: {r.get('PeakSignal', 0):10.0f}")
lines.append(f" Dark Signal: {r.get('DarkSignal', 0):10.0f}")
lines.append(f" Compensate: {r.get('Compensate', 0):10.0f}")
if show_spectrum and "spectrum" in r:
lines.append("")
lines.append(" --- Spectrum (380-1050nm) ---")
spectrum = r["spectrum"]
nm = r["spectrum_nm"]
peak_val = max(spectrum) if spectrum else 1
for i, (wl, val) in enumerate(zip(nm, spectrum)):
if i % 5 == 0: # Print every 5th point (~10nm spacing)
norm = val / peak_val if peak_val > 0 else 0
bar = "#" * int(norm * 40)
lines.append(f" {wl:7.1f}nm: {val:8.4f} [{bar}]")
return "\n".join(lines)
def format_quick(r, reading_num=None):
"""Format a reading for quick test mode: lumen + CCT only."""
lm = r.get("Phi_lm", 0)
cct = r.get("CCT_K", 0)
prefix = f"#{reading_num} " if reading_num else ""
return f" {prefix}{lm:.1f} lm {cct:.0f} K"
def main():
parser = argparse.ArgumentParser(description="HPCS 6500 Spectrophotometer Reader")
parser.add_argument("--port", help="COM port (auto-detect if not specified)")
parser.add_argument("--continuous", action="store_true", help="Continuous reading mode")
parser.add_argument("--csv", metavar="FILE", help="Save readings to CSV file")
parser.add_argument("--spectrum", action="store_true", help="Show full spectrum data")
parser.add_argument("--passive", action="store_true",
help="Passive mode: read data without controlling instrument")
parser.add_argument("--quick", action="store_true",
help="Quick test mode: only report lumen and CCT")
parser.add_argument("--harmonics", action="store_true",
help="Show harmonics data (voltage/current THD and per-harmonic)")
parser.add_argument("--parse", metavar="PCAP", help="Parse a pcap capture file instead")
# Power supply control
psu = parser.add_argument_group("power supply")
psu.add_argument("--mode", choices=["ac", "dc"], help="Set output mode")
psu.add_argument("--voltage", type=float, help="Set output voltage (V)")
psu.add_argument("--frequency", type=float, help="Set AC frequency (Hz)")
psu.add_argument("--current", type=float, help="Set DC current limit (A)")
psu.add_argument("--integration", type=float, help="Set integration time (ms), 0=auto")
psu.add_argument("--psu-on", action="store_true", help="Turn PSU output on")
psu.add_argument("--psu-off", action="store_true", help="Turn PSU output off")
psu.add_argument("--no-psu", action="store_true",
help="Don't auto-control PSU during measurements")
psu.add_argument("--psu-status", action="store_true", help="Read power supply settings")
args = parser.parse_args()
# --harmonics implies --spectrum for the detailed display
if args.harmonics:
args.spectrum = True
# Parse mode: analyze an existing capture
if args.parse:
messages = parse_pcap_messages(args.parse)
blocks = [m for m in messages if m["dir"] == "RX" and len(m["data"]) == 3904
and m["data"][:8] == b"HPCS6500"]
config_blocks = [m for m in messages if m["dir"] == "RX" and len(m["data"]) == 1584]
dev = HPCS6500.__new__(HPCS6500)
for i, block in enumerate(blocks):
result = dev._parse_measurement(block["data"])
if i < len(config_blocks):
result.update(dev._parse_electrical(config_blocks[i]["data"]))
if args.quick:
print(format_quick(result, i + 1))
else:
print(f"\n{'='*60}")
print(f"Reading {i+1} (t={block['ts']:.2f}s)")
print(f"{'='*60}")
print(format_reading(result, show_spectrum=args.spectrum))
return
# Find port
port = args.port or find_hpcs_port()
if not port:
print("ERROR: HPCS 6500 not found. Connect the device or specify --port.")
print("\nAvailable ports:")
for p in serial.tools.list_ports.comports():
print(f" {p.device}: {p.description} [{p.hwid}]")
sys.exit(1)
print(f"HPCS 6500 Spectrophotometer Reader")
print(f"Port: {port}")
print("=" * 60)
dev = HPCS6500(port)
# Identify
name = dev.identify()
if name:
print(f"Device: {name}")
else:
print("Warning: no identify response (device may be busy)")
# Power supply control
has_psu_cmd = any([args.mode, args.voltage is not None, args.frequency is not None,
args.current is not None, args.integration is not None,
args.psu_status, args.psu_on, args.psu_off])
if args.psu_status:
settings = dev.read_psu_settings()
if settings:
print(f"\nPower Supply Settings:")
print(f" Mode: {settings['mode']}")
print(f" AC Voltage: {settings['ac_voltage']:.0f} V")
print(f" AC Frequency: {settings['ac_frequency']:.0f} Hz")
print(f" DC Voltage: {settings['dc_voltage']:.1f} V")
print(f" DC Current: {settings['dc_current']:.1f} A")
else:
print("Failed to read PSU settings")
if not args.continuous and args.voltage is None and args.mode is None:
dev.close()
return
if args.mode:
ok = dev.set_mode(args.mode)
print(f"Set mode to {args.mode.upper()}: {'OK' if ok else 'FAILED'}")
if args.voltage is not None:
mode = args.mode or "ac"
if mode == "ac":
ok = dev.set_ac_voltage(args.voltage)
else:
ok = dev.set_dc_voltage(args.voltage)
print(f"Set {mode.upper()} voltage to {args.voltage} V: {'OK' if ok else 'FAILED'}")
if args.frequency is not None:
ok = dev.set_ac_frequency(args.frequency)
print(f"Set AC frequency to {args.frequency} Hz: {'OK' if ok else 'FAILED'}")
if args.current is not None:
ok = dev.set_dc_current(args.current)
print(f"Set DC current limit to {args.current} A: {'OK' if ok else 'FAILED'}")
if args.integration is not None:
ok = dev.set_integration_time(args.integration)
print(f"Set integration time to {args.integration} ms: {'OK' if ok else 'FAILED'}")
if args.psu_on:
ok = dev.psu_on()
print(f"PSU output ON: {'OK' if ok else 'FAILED'}")
if args.psu_off:
ok = dev.psu_off()
print(f"PSU output OFF: {'OK' if ok else 'FAILED'}")
# If only PSU commands were given with no measurement request, exit
if has_psu_cmd and not args.continuous and not args.passive:
dev.close()
return
csv_writer = None
csv_file = None
if args.csv:
csv_file = open(args.csv, "w", newline="")
fieldnames = ["timestamp"] + list(FIELDS.keys()) + list(ELECTRICAL_FIELDS.keys())
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames, extrasaction="ignore")
csv_writer.writeheader()
print(f"Saving to: {args.csv}")
use_psu = not args.no_psu and not args.passive
try:
if use_psu:
dev.psu_on()
time.sleep(0.2)
reading_num = 0
while True:
reading_num += 1
if not args.quick:
print(f"\n{'='*60}")
print(f"Reading {reading_num} ({datetime.now().strftime('%H:%M:%S')})")
print(f"{'='*60}")
if args.passive:
result = dev.read_current()
else:
result = dev.take_reading(psu=False)
if result is None:
print("Failed to get reading")
if not args.continuous:
break
time.sleep(1)
continue
if args.quick:
print(format_quick(result, reading_num))
else:
print(format_reading(result, show_spectrum=args.spectrum))
if csv_writer:
row = {"timestamp": datetime.now().isoformat()}
row.update({k: v for k, v in result.items()
if isinstance(v, (int, float))})
csv_writer.writerow(row)
csv_file.flush()
if not args.continuous:
break
time.sleep(0.5)
except KeyboardInterrupt:
print("\nStopped.")
finally:
if use_psu:
dev.psu_off()
dev.reset()
dev.close()
if csv_file:
csv_file.close()
print(f"Data saved to: {args.csv}")
if __name__ == "__main__":
main()