Files
HPCS6500-py/hpcs6500.py
grabowski 3bf4a7ad36 Add single-shot measurement mode matching vendor protocol
Stop command triggers a one-shot reading instead of start+poll+stop.
spectrometer.py now uses take_single_reading() for this flow.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 09:23:38 +07:00

854 lines
29 KiB
Python

"""
HPCS 6500 Spectrophotometer / Integrating Sphere — Direct Reader
Communicates with the HPCS 6500 over USB serial (STM32 VCP) to take
measurements and extract photometric, colorimetric, radiometric,
electrical, and spectral data.
Usage:
uv run hpcs6500.py # auto-detect port, single reading
uv run hpcs6500.py --port COM42 # specify port
uv run hpcs6500.py --continuous # continuous readings
uv run hpcs6500.py --csv out.csv # save to CSV
uv run hpcs6500.py --spectrum # also print full spectrum
"""
import argparse
import csv
import struct
import sys
import time
from datetime import datetime
import serial
import serial.tools.list_ports
# Protocol constants
SYNC = 0x8C
CMD_IDENTIFY = 0x00
CMD_INTEGRATION_TIME = 0x01
CMD_POLL_STATE = 0x03
CMD_STATUS = 0x05
CMD_START = 0x0E
CMD_READ_MEASUREMENT = 0x13
CMD_RESET = 0x25
CMD_CONFIG = 0x2A
CMD_SET_DC = 0x73
CMD_READ_ELECTRICAL = 0x77
CMD_SET_AC = 0x78
CMD_READ_PSU = 0x79
CMD_PSU_OUTPUT = 0x72
CMD_SET_MODE = 0x7A
# State polling byte[5] values
STATE_IDLE = 0x04
STATE_MEASURING = 0x01
# Spectrum parameters
SPECTRUM_START_NM = 380
SPECTRUM_END_NM = 1050
SPECTRUM_OFFSET = 432
SPECTRUM_COUNT = 350
SPECTRUM_STEP = (SPECTRUM_END_NM - SPECTRUM_START_NM) / (SPECTRUM_COUNT - 1)
# Measurement block field offsets (all float32 LE)
FIELDS = {
"Phi_lm": 36,
"eta_lm_W": 40,
"CCT_K": 44,
"Duv": 48,
"x": 52,
"y": 56,
"u": 60,
"v": 64,
"u_prime": 68,
"v_prime": 72,
"SDCM": 76,
"Ra": 80,
"R1": 84,
"R2": 88,
"R3": 92,
"R4": 96,
"R5": 100,
"R6": 104,
"R7": 108,
"R8": 112,
"R9": 116,
"R10": 120,
"R11": 124,
"R12": 128,
"R13": 132,
"R14": 136,
"R15": 140,
"Phi_e_mW": 144,
"Phi_euv_mW": 148,
"Phi_eb_mW": 152,
"Phi_ey_mW": 156,
"Phi_er_mW": 160,
"Phi_efr_mW": 164,
"Phi_eir_mW": 168,
"Phi_e_total": 172,
"CIE_X": 224,
"CIE_Y": 228,
"CIE_Z": 232,
"TLCI": 236,
"PeakSignal": 244,
"DarkSignal": 248,
"Compensate": 252,
}
# Electrical block field offsets
ELECTRICAL_FIELDS = {
"Voltage_V": 8,
"Current_A": 12,
"Power_W": 16,
"Freq_Hz": 20,
"PF": 24,
}
# Harmonics offsets within the electrical block
HARMONICS_V_OFFSET = 544 # 50 x float32 voltage harmonics (%)
HARMONICS_V_COUNT = 50
UTHD_OFFSET = 744 # float32 UThd (%)
HARMONICS_I_OFFSET = 800 # 50 x float32 current harmonics (%)
HARMONICS_I_COUNT = 50
ATHD_OFFSET = 1000 # float32 AThd (%)
# Waveform offsets (int16 LE samples, 128 per waveform)
WAVEFORM_V_OFFSET = 30
WAVEFORM_I_OFFSET = 286
WAVEFORM_SAMPLES = 128
def parse_pcap_messages(filepath):
"""Parse a USBPcap pcap file and extract bulk transfer payloads."""
with open(filepath, "rb") as f:
data = f.read()
magic = struct.unpack_from("<I", data, 0)[0]
endian = "<" if magic == 0xA1B2C3D4 else ">"
offset = 24
messages = []
first_ts = None
while offset + 16 <= len(data):
ts_sec, ts_usec, incl_len, orig_len = struct.unpack_from(
f"{endian}IIII", data, offset
)
offset += 16
if offset + incl_len > len(data):
break
pkt_data = data[offset : offset + incl_len]
offset += incl_len
if len(pkt_data) < 27:
continue
hdr_len = struct.unpack_from("<H", pkt_data, 0)[0]
endpoint = pkt_data[21]
transfer = pkt_data[22]
payload = pkt_data[hdr_len:]
if len(payload) == 0 or transfer != 3:
continue
ts = ts_sec + ts_usec / 1_000_000
if first_ts is None:
first_ts = ts
ep_dir = "IN" if (endpoint & 0x80) else "OUT"
messages.append(
{
"ts": ts - first_ts,
"dir": "TX" if ep_dir == "OUT" else "RX",
"data": payload,
}
)
return messages
class HPCS6500:
"""Driver for the HPCS 6500 spectrophotometer."""
def __init__(self, port, timeout=2.0):
self.ser = serial.Serial(
port=port,
baudrate=115200,
timeout=timeout,
write_timeout=timeout,
)
self.port = port
def close(self):
self.ser.close()
def _send(self, *cmd_bytes):
"""Send a command to the device."""
data = bytes([SYNC] + list(cmd_bytes))
self.ser.write(data)
def _recv(self, expected_len, timeout=3.0):
"""Read expected_len bytes from the device."""
data = b""
deadline = time.monotonic() + timeout
while len(data) < expected_len:
remaining = expected_len - len(data)
chunk = self.ser.read(remaining)
if chunk:
data += chunk
elif time.monotonic() > deadline:
break
return data
def _recv_response(self, cmd_id, timeout=3.0):
"""Read a response, expecting it to start with 8C <cmd_id>."""
# Read the 2-byte header first
hdr = self._recv(2, timeout)
if len(hdr) < 2:
return None
if hdr[0] != SYNC or hdr[1] != cmd_id:
# Try to resync — read until we find 8C <cmd_id>
buf = hdr
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
b = self.ser.read(1)
if not b:
continue
buf += b
idx = buf.find(bytes([SYNC, cmd_id]))
if idx >= 0:
hdr = buf[idx:idx+2]
break
else:
return None
return hdr
def identify(self):
"""Send identify command, return device name string."""
self.ser.reset_input_buffer()
self._send(CMD_IDENTIFY)
resp = self._recv(16, timeout=2.0)
if len(resp) < 16:
return None
if resp[0] != SYNC or resp[1] != CMD_IDENTIFY:
return None
name = resp[2:10].decode("ascii", errors="replace").rstrip("\x00")
return name
def read_measurement_block(self):
"""Send 8C 13 and read the 3904-byte measurement block."""
self.ser.reset_input_buffer()
self._send(CMD_READ_MEASUREMENT)
# First, read the 4-byte header: 8C 13 0F 40
hdr = self._recv(4, timeout=3.0)
if len(hdr) < 4 or hdr[0] != SYNC or hdr[1] != CMD_READ_MEASUREMENT:
return None
# Then read the 3904-byte payload
payload = self._recv(3904, timeout=5.0)
if len(payload) < 3904:
print(f"Warning: measurement block short ({len(payload)}/3904 bytes)")
return None
return payload
def read_electrical_block(self):
"""Send 8C 77 and read the 1584-byte electrical block."""
self.ser.reset_input_buffer()
self._send(CMD_READ_ELECTRICAL)
# 4-byte header: 8C 77 06 30
hdr = self._recv(4, timeout=3.0)
if len(hdr) < 4 or hdr[0] != SYNC or hdr[1] != CMD_READ_ELECTRICAL:
return None
payload = self._recv(1584, timeout=5.0)
if len(payload) < 1584:
print(f"Warning: electrical block short ({len(payload)}/1584 bytes)")
return None
return payload
def poll_state(self):
"""Send 8C 03 and return (data_available, state)."""
self.ser.reset_input_buffer()
self._send(CMD_POLL_STATE)
resp = self._recv(9, timeout=2.0)
if len(resp) < 9 or resp[0] != SYNC or resp[1] != CMD_POLL_STATE:
return None, None
data_available = resp[2] == 1
state = resp[5]
return data_available, state
def start_measurement(self):
"""Send 8C 0E 01 to start measurement."""
self.ser.reset_input_buffer()
self._send(CMD_START, 0x01)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_START])
def stop_measurement(self):
"""Send 8C 0E 02 to stop measurement."""
self.ser.reset_input_buffer()
self._send(CMD_START, 0x02)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_START])
def reset(self):
"""Send 8C 25 to reset."""
self.ser.reset_input_buffer()
self._send(CMD_RESET)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_RESET])
# --- Power supply control ---
def psu_on(self):
"""Turn PSU output on."""
self.ser.reset_input_buffer()
self._send(CMD_PSU_OUTPUT, 0x00)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_PSU_OUTPUT])
def psu_off(self):
"""Turn PSU output off."""
self.ser.reset_input_buffer()
self._send(CMD_PSU_OUTPUT, 0x01)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_PSU_OUTPUT])
def set_mode(self, mode):
"""Set output mode: 'ac' or 'dc'."""
mode_byte = 0x00 if mode.lower() == "ac" else 0x01
self.ser.reset_input_buffer()
self._send(CMD_SET_MODE, mode_byte)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_MODE])
def set_ac_voltage(self, volts):
"""Set AC output voltage (100-240 V)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_AC, 0x00]) + struct.pack("<f", volts)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_AC])
def set_ac_frequency(self, hz):
"""Set AC output frequency (50 or 60 Hz)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_AC, 0x01]) + struct.pack("<f", hz)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_AC])
def set_dc_voltage(self, volts):
"""Set DC output voltage (1-60 V)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_DC, 0x01]) + struct.pack("<f", volts)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_DC])
def set_dc_current(self, amps):
"""Set DC current limit (0-5 A)."""
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_SET_DC, 0x00]) + struct.pack("<f", amps)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_SET_DC])
def set_integration_time(self, ms):
"""Set integration time in milliseconds. Use 0 for auto."""
us = int(ms * 1000)
self.ser.reset_input_buffer()
data = bytes([SYNC, CMD_INTEGRATION_TIME]) + struct.pack("<I", us)
self.ser.write(data)
resp = self._recv(2, timeout=2.0)
return resp == bytes([SYNC, CMD_INTEGRATION_TIME])
def read_psu_settings(self):
"""Read current power supply settings."""
self.ser.reset_input_buffer()
self._send(CMD_READ_PSU)
resp = self._recv(20, timeout=2.0)
if len(resp) < 20 or resp[0] != SYNC or resp[1] != CMD_READ_PSU:
return None
payload = resp[2:]
return {
"ac_voltage": struct.unpack_from("<f", payload, 0)[0],
"ac_frequency": struct.unpack_from("<f", payload, 4)[0],
"dc_voltage": struct.unpack_from("<f", payload, 8)[0],
"dc_current": struct.unpack_from("<f", payload, 12)[0],
"mode": "DC" if payload[16] == 1 else "AC",
}
def wait_for_data(self, timeout=30.0):
"""Poll until data is available or timeout."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
data_avail, state = self.poll_state()
if data_avail:
return True
time.sleep(0.1)
return False
def take_reading(self, psu=True):
"""Take a single measurement reading. Returns dict or None.
If psu=True, turns PSU output on before measuring and off after."""
if psu:
self.psu_on()
time.sleep(0.2)
# Start measurement
if not self.start_measurement():
print("Failed to start measurement")
if psu:
self.psu_off()
return None
# Wait for data
if not self.wait_for_data(timeout=30.0):
print("Timeout waiting for measurement data")
self.stop_measurement()
if psu:
self.psu_off()
return None
# Read measurement block
meas = self.read_measurement_block()
elec = self.read_electrical_block()
if meas is None:
print("Failed to read measurement block")
self.stop_measurement()
if psu:
self.psu_off()
return None
result = self._parse_measurement(meas)
if elec is not None:
result.update(self._parse_electrical(elec))
# Stop measurement
self.stop_measurement()
if psu:
self.psu_off()
return result
def take_single_reading(self):
"""Trigger a single measurement (no PSU control).
Sends stop to trigger a one-shot reading, waits for data, reads it,
then resets. This matches the vendor software's single-test flow."""
self.stop_measurement()
if not self.wait_for_data(timeout=30.0):
print("Timeout waiting for measurement data")
return None
meas = self.read_measurement_block()
elec = self.read_electrical_block()
if meas is None:
print("Failed to read measurement block")
return None
result = self._parse_measurement(meas)
if elec is not None:
result.update(self._parse_electrical(elec))
self.reset()
return result
def read_current(self):
"""Read current data without starting/stopping measurement.
Use this when the vendor software is controlling the instrument."""
meas = self.read_measurement_block()
elec = self.read_electrical_block()
if meas is None:
return None
result = self._parse_measurement(meas)
if elec is not None:
result.update(self._parse_electrical(elec))
return result
def _parse_measurement(self, data):
"""Parse the 3904-byte measurement block into a dict."""
result = {}
# Check header
header = data[:10].decode("ascii", errors="replace").rstrip("\x00")
result["device"] = header
# Extract all named fields
for name, offset in FIELDS.items():
if offset + 4 <= len(data):
result[name] = struct.unpack_from("<f", data, offset)[0]
# Extract date/time strings
date_off = 272
time_off = 283
if date_off + 10 <= len(data):
date_str = data[date_off:date_off+10].decode("ascii", errors="replace").rstrip("\x00")
result["test_date"] = date_str
if time_off + 8 <= len(data):
time_str = data[time_off:time_off+8].decode("ascii", errors="replace").rstrip("\x00")
result["test_time"] = time_str
# Extract spectrum
spectrum = []
for i in range(SPECTRUM_COUNT):
off = SPECTRUM_OFFSET + i * 4
if off + 4 <= len(data):
val = struct.unpack_from("<f", data, off)[0]
spectrum.append(val)
result["spectrum"] = spectrum
result["spectrum_nm"] = [
SPECTRUM_START_NM + i * SPECTRUM_STEP for i in range(len(spectrum))
]
return result
def _parse_electrical(self, data):
"""Parse the 1584-byte electrical block into a dict."""
result = {}
for name, offset in ELECTRICAL_FIELDS.items():
if offset + 4 <= len(data):
result[name] = struct.unpack_from("<f", data, offset)[0]
# Check if harmonics data is present (non-zero at harmonics offset)
if len(data) >= ATHD_OFFSET + 4:
h1_v = struct.unpack_from("<f", data, HARMONICS_V_OFFSET)[0]
if abs(h1_v - 100.0) < 1.0: # H1 should be ~100%
# Voltage harmonics
v_harmonics = []
for i in range(HARMONICS_V_COUNT):
off = HARMONICS_V_OFFSET + i * 4
v_harmonics.append(struct.unpack_from("<f", data, off)[0])
result["V_harmonics"] = v_harmonics
result["UThd"] = struct.unpack_from("<f", data, UTHD_OFFSET)[0]
# Current harmonics
i_harmonics = []
for i in range(HARMONICS_I_COUNT):
off = HARMONICS_I_OFFSET + i * 4
i_harmonics.append(struct.unpack_from("<f", data, off)[0])
result["I_harmonics"] = i_harmonics
result["AThd"] = struct.unpack_from("<f", data, ATHD_OFFSET)[0]
# Waveforms (int16 LE)
v_waveform = []
for i in range(WAVEFORM_SAMPLES):
off = WAVEFORM_V_OFFSET + i * 2
v_waveform.append(struct.unpack_from("<h", data, off)[0])
result["V_waveform"] = v_waveform
i_waveform = []
for i in range(WAVEFORM_SAMPLES):
off = WAVEFORM_I_OFFSET + i * 2
i_waveform.append(struct.unpack_from("<h", data, off)[0])
result["I_waveform"] = i_waveform
return result
def find_hpcs_port():
"""Auto-detect the HPCS 6500 COM port."""
for port in serial.tools.list_ports.comports():
if port.vid == 0x0483 and port.pid == 0x5741:
return port.device
return None
def format_reading(r, show_spectrum=False):
"""Format a reading dict for display."""
lines = []
lines.append(f" Device: {r.get('device', '?')}")
lines.append(f" Date: {r.get('test_date', '?')} Time: {r.get('test_time', '?')}")
lines.append("")
lines.append(" --- Photometric ---")
lines.append(f" Phi (lm): {r.get('Phi_lm', 0):10.2f}")
lines.append(f" eta (lm/W): {r.get('eta_lm_W', 0):10.2f}")
lines.append(f" CCT (K): {r.get('CCT_K', 0):10.0f}")
lines.append(f" Duv: {r.get('Duv', 0):10.5f}")
lines.append("")
lines.append(" --- Chromaticity ---")
lines.append(f" x: {r.get('x', 0):.4f} y: {r.get('y', 0):.4f}")
lines.append(f" u: {r.get('u', 0):.4f} v: {r.get('v', 0):.4f}")
lines.append(f" u': {r.get('u_prime', 0):.4f} v': {r.get('v_prime', 0):.4f}")
lines.append("")
lines.append(" --- CRI ---")
ra = r.get('Ra', 0)
lines.append(f" Ra: {ra:.1f} SDCM: {r.get('SDCM', 0):.2f}")
ri = []
for i in range(1, 16):
ri.append(f"R{i}={r.get(f'R{i}', 0):.0f}")
lines.append(f" {', '.join(ri[:8])}")
lines.append(f" {', '.join(ri[8:])}")
lines.append("")
lines.append(" --- CIE 1931 ---")
lines.append(f" X: {r.get('CIE_X', 0):.3f} Y: {r.get('CIE_Y', 0):.3f} Z: {r.get('CIE_Z', 0):.3f}")
lines.append(f" TLCI-2012: {r.get('TLCI', 0):.0f}")
lines.append("")
lines.append(" --- Radiometric ---")
lines.append(f" Phi_e (mW): {r.get('Phi_e_mW', 0):10.3f}")
lines.append(f" Phi_eb (mW): {r.get('Phi_eb_mW', 0):10.3f}")
lines.append(f" Phi_ey (mW): {r.get('Phi_ey_mW', 0):10.3f}")
lines.append(f" Phi_er (mW): {r.get('Phi_er_mW', 0):10.3f}")
lines.append(f" Phi_efr (mW): {r.get('Phi_efr_mW', 0):10.3f}")
lines.append("")
lines.append(" --- Electrical ---")
lines.append(f" Voltage (V): {r.get('Voltage_V', 0):10.2f}")
lines.append(f" Current (A): {r.get('Current_A', 0):10.4f}")
lines.append(f" Power (W): {r.get('Power_W', 0):10.3f}")
lines.append(f" Frequency (Hz):{r.get('Freq_Hz', 0):10.2f}")
lines.append(f" Power Factor: {r.get('PF', 0):10.4f}")
if "UThd" in r:
lines.append(f" UThd (%): {r['UThd']:10.4f}")
lines.append(f" AThd (%): {r.get('AThd', 0):10.4f}")
lines.append("")
if show_spectrum and "V_harmonics" in r:
lines.append(" --- Voltage Harmonics (%) ---")
vh = r["V_harmonics"]
for i, val in enumerate(vh):
if val > 0.001 or i == 0:
lines.append(f" H{i+1:2d}: {val:8.4f}")
lines.append("")
lines.append(" --- Current Harmonics (%) ---")
ih = r["I_harmonics"]
for i, val in enumerate(ih):
if val > 0.001 or i == 0:
lines.append(f" H{i+1:2d}: {val:8.4f}")
lines.append("")
lines.append(" --- Sensor ---")
lines.append(f" Peak Signal: {r.get('PeakSignal', 0):10.0f}")
lines.append(f" Dark Signal: {r.get('DarkSignal', 0):10.0f}")
lines.append(f" Compensate: {r.get('Compensate', 0):10.0f}")
if show_spectrum and "spectrum" in r:
lines.append("")
lines.append(" --- Spectrum (380-1050nm) ---")
spectrum = r["spectrum"]
nm = r["spectrum_nm"]
peak_val = max(spectrum) if spectrum else 1
for i, (wl, val) in enumerate(zip(nm, spectrum)):
if i % 5 == 0: # Print every 5th point (~10nm spacing)
norm = val / peak_val if peak_val > 0 else 0
bar = "#" * int(norm * 40)
lines.append(f" {wl:7.1f}nm: {val:8.4f} [{bar}]")
return "\n".join(lines)
def format_quick(r, reading_num=None):
"""Format a reading for quick test mode: lumen + CCT only."""
lm = r.get("Phi_lm", 0)
cct = r.get("CCT_K", 0)
prefix = f"#{reading_num} " if reading_num else ""
return f" {prefix}{lm:.1f} lm {cct:.0f} K"
def main():
parser = argparse.ArgumentParser(description="HPCS 6500 Spectrophotometer Reader")
parser.add_argument("--port", help="COM port (auto-detect if not specified)")
parser.add_argument("--continuous", action="store_true", help="Continuous reading mode")
parser.add_argument("--csv", metavar="FILE", help="Save readings to CSV file")
parser.add_argument("--spectrum", action="store_true", help="Show full spectrum data")
parser.add_argument("--passive", action="store_true",
help="Passive mode: read data without controlling instrument")
parser.add_argument("--quick", action="store_true",
help="Quick test mode: only report lumen and CCT")
parser.add_argument("--harmonics", action="store_true",
help="Show harmonics data (voltage/current THD and per-harmonic)")
parser.add_argument("--parse", metavar="PCAP", help="Parse a pcap capture file instead")
# Power supply control
psu = parser.add_argument_group("power supply")
psu.add_argument("--mode", choices=["ac", "dc"], help="Set output mode")
psu.add_argument("--voltage", type=float, help="Set output voltage (V)")
psu.add_argument("--frequency", type=float, help="Set AC frequency (Hz)")
psu.add_argument("--current", type=float, help="Set DC current limit (A)")
psu.add_argument("--integration", type=float, help="Set integration time (ms), 0=auto")
psu.add_argument("--psu-on", action="store_true", help="Turn PSU output on")
psu.add_argument("--psu-off", action="store_true", help="Turn PSU output off")
psu.add_argument("--no-psu", action="store_true",
help="Don't auto-control PSU during measurements")
psu.add_argument("--psu-status", action="store_true", help="Read power supply settings")
args = parser.parse_args()
# --harmonics implies --spectrum for the detailed display
if args.harmonics:
args.spectrum = True
# Parse mode: analyze an existing capture
if args.parse:
messages = parse_pcap_messages(args.parse)
blocks = [m for m in messages if m["dir"] == "RX" and len(m["data"]) == 3904
and m["data"][:8] == b"HPCS6500"]
config_blocks = [m for m in messages if m["dir"] == "RX" and len(m["data"]) == 1584]
dev = HPCS6500.__new__(HPCS6500)
for i, block in enumerate(blocks):
result = dev._parse_measurement(block["data"])
if i < len(config_blocks):
result.update(dev._parse_electrical(config_blocks[i]["data"]))
if args.quick:
print(format_quick(result, i + 1))
else:
print(f"\n{'='*60}")
print(f"Reading {i+1} (t={block['ts']:.2f}s)")
print(f"{'='*60}")
print(format_reading(result, show_spectrum=args.spectrum))
return
# Find port
port = args.port or find_hpcs_port()
if not port:
print("ERROR: HPCS 6500 not found. Connect the device or specify --port.")
print("\nAvailable ports:")
for p in serial.tools.list_ports.comports():
print(f" {p.device}: {p.description} [{p.hwid}]")
sys.exit(1)
print(f"HPCS 6500 Spectrophotometer Reader")
print(f"Port: {port}")
print("=" * 60)
dev = HPCS6500(port)
# Identify
name = dev.identify()
if name:
print(f"Device: {name}")
else:
print("Warning: no identify response (device may be busy)")
# Power supply control
has_psu_cmd = any([args.mode, args.voltage is not None, args.frequency is not None,
args.current is not None, args.integration is not None,
args.psu_status, args.psu_on, args.psu_off])
if args.psu_status:
settings = dev.read_psu_settings()
if settings:
print(f"\nPower Supply Settings:")
print(f" Mode: {settings['mode']}")
print(f" AC Voltage: {settings['ac_voltage']:.0f} V")
print(f" AC Frequency: {settings['ac_frequency']:.0f} Hz")
print(f" DC Voltage: {settings['dc_voltage']:.1f} V")
print(f" DC Current: {settings['dc_current']:.1f} A")
else:
print("Failed to read PSU settings")
if not args.continuous and args.voltage is None and args.mode is None:
dev.close()
return
if args.mode:
ok = dev.set_mode(args.mode)
print(f"Set mode to {args.mode.upper()}: {'OK' if ok else 'FAILED'}")
if args.voltage is not None:
mode = args.mode or "ac"
if mode == "ac":
ok = dev.set_ac_voltage(args.voltage)
else:
ok = dev.set_dc_voltage(args.voltage)
print(f"Set {mode.upper()} voltage to {args.voltage} V: {'OK' if ok else 'FAILED'}")
if args.frequency is not None:
ok = dev.set_ac_frequency(args.frequency)
print(f"Set AC frequency to {args.frequency} Hz: {'OK' if ok else 'FAILED'}")
if args.current is not None:
ok = dev.set_dc_current(args.current)
print(f"Set DC current limit to {args.current} A: {'OK' if ok else 'FAILED'}")
if args.integration is not None:
ok = dev.set_integration_time(args.integration)
print(f"Set integration time to {args.integration} ms: {'OK' if ok else 'FAILED'}")
if args.psu_on:
ok = dev.psu_on()
print(f"PSU output ON: {'OK' if ok else 'FAILED'}")
if args.psu_off:
ok = dev.psu_off()
print(f"PSU output OFF: {'OK' if ok else 'FAILED'}")
# If only PSU commands were given with no measurement request, exit
if has_psu_cmd and not args.continuous and not args.passive:
dev.close()
return
csv_writer = None
csv_file = None
if args.csv:
csv_file = open(args.csv, "w", newline="")
fieldnames = ["timestamp"] + list(FIELDS.keys()) + list(ELECTRICAL_FIELDS.keys())
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames, extrasaction="ignore")
csv_writer.writeheader()
print(f"Saving to: {args.csv}")
use_psu = not args.no_psu and not args.passive
try:
if use_psu:
dev.psu_on()
time.sleep(0.2)
reading_num = 0
while True:
reading_num += 1
if not args.quick:
print(f"\n{'='*60}")
print(f"Reading {reading_num} ({datetime.now().strftime('%H:%M:%S')})")
print(f"{'='*60}")
if args.passive:
result = dev.read_current()
else:
result = dev.take_reading(psu=False)
if result is None:
print("Failed to get reading")
if not args.continuous:
break
time.sleep(1)
continue
if args.quick:
print(format_quick(result, reading_num))
else:
print(format_reading(result, show_spectrum=args.spectrum))
if csv_writer:
row = {"timestamp": datetime.now().isoformat()}
row.update({k: v for k, v in result.items()
if isinstance(v, (int, float))})
csv_writer.writerow(row)
csv_file.flush()
if not args.continuous:
break
time.sleep(0.5)
except KeyboardInterrupt:
print("\nStopped.")
finally:
if use_psu:
dev.psu_off()
dev.reset()
dev.close()
if csv_file:
csv_file.close()
print(f"Data saved to: {args.csv}")
if __name__ == "__main__":
main()