The device needs the test config commands to enter single-test mode. Without them it behaves as continuous start+stop. Supports auto_psu flag so the device can handle PSU on/off automatically. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
886 lines
30 KiB
Python
886 lines
30 KiB
Python
"""
|
|
HPCS 6500 Spectrophotometer / Integrating Sphere — Direct Reader
|
|
|
|
Communicates with the HPCS 6500 over USB serial (STM32 VCP) to take
|
|
measurements and extract photometric, colorimetric, radiometric,
|
|
electrical, and spectral data.
|
|
|
|
Usage:
|
|
uv run hpcs6500.py # auto-detect port, single reading
|
|
uv run hpcs6500.py --port COM42 # specify port
|
|
uv run hpcs6500.py --continuous # continuous readings
|
|
uv run hpcs6500.py --csv out.csv # save to CSV
|
|
uv run hpcs6500.py --spectrum # also print full spectrum
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import struct
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
|
|
# Protocol constants
|
|
SYNC = 0x8C
|
|
CMD_IDENTIFY = 0x00
|
|
CMD_INTEGRATION_TIME = 0x01
|
|
CMD_POLL_STATE = 0x03
|
|
CMD_STATUS = 0x05
|
|
CMD_START = 0x0E
|
|
CMD_READ_MEASUREMENT = 0x13
|
|
CMD_RESET = 0x25
|
|
CMD_CONFIG = 0x2A
|
|
CMD_SET_DC = 0x73
|
|
CMD_READ_ELECTRICAL = 0x77
|
|
CMD_SET_AC = 0x78
|
|
CMD_READ_PSU = 0x79
|
|
CMD_PSU_OUTPUT = 0x72
|
|
CMD_SET_MODE = 0x7A
|
|
CMD_TEST_CONFIG = 0x2B
|
|
|
|
# Single-test config payloads captured from vendor software.
|
|
# Sub 00: test parameters, sub 01: test config with auto-PSU flag at byte 37.
|
|
_TEST_CONFIG_SUB0 = bytes.fromhex(
|
|
"00003c00000001008813037c011a04000000484200000208"
|
|
"000000000001060000000000803f0000803f0000803f0000"
|
|
"803f0000803f00000000003f"
|
|
)
|
|
_TEST_CONFIG_SUB1_BASE = bytes.fromhex(
|
|
"017c010c037c010c037c010c030000803f000000000000000000000000"
|
|
"00000000000000000001000000400000803f"
|
|
)
|
|
|
|
# State polling byte[5] values
|
|
STATE_IDLE = 0x04
|
|
STATE_MEASURING = 0x01
|
|
|
|
# Spectrum parameters
|
|
SPECTRUM_START_NM = 380
|
|
SPECTRUM_END_NM = 1050
|
|
SPECTRUM_OFFSET = 432
|
|
SPECTRUM_COUNT = 350
|
|
SPECTRUM_STEP = (SPECTRUM_END_NM - SPECTRUM_START_NM) / (SPECTRUM_COUNT - 1)
|
|
|
|
# Measurement block field offsets (all float32 LE)
|
|
FIELDS = {
|
|
"Phi_lm": 36,
|
|
"eta_lm_W": 40,
|
|
"CCT_K": 44,
|
|
"Duv": 48,
|
|
"x": 52,
|
|
"y": 56,
|
|
"u": 60,
|
|
"v": 64,
|
|
"u_prime": 68,
|
|
"v_prime": 72,
|
|
"SDCM": 76,
|
|
"Ra": 80,
|
|
"R1": 84,
|
|
"R2": 88,
|
|
"R3": 92,
|
|
"R4": 96,
|
|
"R5": 100,
|
|
"R6": 104,
|
|
"R7": 108,
|
|
"R8": 112,
|
|
"R9": 116,
|
|
"R10": 120,
|
|
"R11": 124,
|
|
"R12": 128,
|
|
"R13": 132,
|
|
"R14": 136,
|
|
"R15": 140,
|
|
"Phi_e_mW": 144,
|
|
"Phi_euv_mW": 148,
|
|
"Phi_eb_mW": 152,
|
|
"Phi_ey_mW": 156,
|
|
"Phi_er_mW": 160,
|
|
"Phi_efr_mW": 164,
|
|
"Phi_eir_mW": 168,
|
|
"Phi_e_total": 172,
|
|
"CIE_X": 224,
|
|
"CIE_Y": 228,
|
|
"CIE_Z": 232,
|
|
"TLCI": 236,
|
|
"PeakSignal": 244,
|
|
"DarkSignal": 248,
|
|
"Compensate": 252,
|
|
}
|
|
|
|
# Electrical block field offsets
|
|
ELECTRICAL_FIELDS = {
|
|
"Voltage_V": 8,
|
|
"Current_A": 12,
|
|
"Power_W": 16,
|
|
"Freq_Hz": 20,
|
|
"PF": 24,
|
|
}
|
|
|
|
# Harmonics offsets within the electrical block
|
|
HARMONICS_V_OFFSET = 544 # 50 x float32 voltage harmonics (%)
|
|
HARMONICS_V_COUNT = 50
|
|
UTHD_OFFSET = 744 # float32 UThd (%)
|
|
HARMONICS_I_OFFSET = 800 # 50 x float32 current harmonics (%)
|
|
HARMONICS_I_COUNT = 50
|
|
ATHD_OFFSET = 1000 # float32 AThd (%)
|
|
|
|
# Waveform offsets (int16 LE samples, 128 per waveform)
|
|
WAVEFORM_V_OFFSET = 30
|
|
WAVEFORM_I_OFFSET = 286
|
|
WAVEFORM_SAMPLES = 128
|
|
|
|
|
|
def parse_pcap_messages(filepath):
|
|
"""Parse a USBPcap pcap file and extract bulk transfer payloads."""
|
|
with open(filepath, "rb") as f:
|
|
data = f.read()
|
|
magic = struct.unpack_from("<I", data, 0)[0]
|
|
endian = "<" if magic == 0xA1B2C3D4 else ">"
|
|
offset = 24
|
|
messages = []
|
|
first_ts = None
|
|
while offset + 16 <= len(data):
|
|
ts_sec, ts_usec, incl_len, orig_len = struct.unpack_from(
|
|
f"{endian}IIII", data, offset
|
|
)
|
|
offset += 16
|
|
if offset + incl_len > len(data):
|
|
break
|
|
pkt_data = data[offset : offset + incl_len]
|
|
offset += incl_len
|
|
if len(pkt_data) < 27:
|
|
continue
|
|
hdr_len = struct.unpack_from("<H", pkt_data, 0)[0]
|
|
endpoint = pkt_data[21]
|
|
transfer = pkt_data[22]
|
|
payload = pkt_data[hdr_len:]
|
|
if len(payload) == 0 or transfer != 3:
|
|
continue
|
|
ts = ts_sec + ts_usec / 1_000_000
|
|
if first_ts is None:
|
|
first_ts = ts
|
|
ep_dir = "IN" if (endpoint & 0x80) else "OUT"
|
|
messages.append(
|
|
{
|
|
"ts": ts - first_ts,
|
|
"dir": "TX" if ep_dir == "OUT" else "RX",
|
|
"data": payload,
|
|
}
|
|
)
|
|
return messages
|
|
|
|
|
|
class HPCS6500:
|
|
"""Driver for the HPCS 6500 spectrophotometer."""
|
|
|
|
def __init__(self, port, timeout=2.0):
|
|
self.ser = serial.Serial(
|
|
port=port,
|
|
baudrate=115200,
|
|
timeout=timeout,
|
|
write_timeout=timeout,
|
|
)
|
|
self.port = port
|
|
|
|
def close(self):
|
|
self.ser.close()
|
|
|
|
def _send(self, *cmd_bytes):
|
|
"""Send a command to the device."""
|
|
data = bytes([SYNC] + list(cmd_bytes))
|
|
self.ser.write(data)
|
|
|
|
def _recv(self, expected_len, timeout=3.0):
|
|
"""Read expected_len bytes from the device."""
|
|
data = b""
|
|
deadline = time.monotonic() + timeout
|
|
while len(data) < expected_len:
|
|
remaining = expected_len - len(data)
|
|
chunk = self.ser.read(remaining)
|
|
if chunk:
|
|
data += chunk
|
|
elif time.monotonic() > deadline:
|
|
break
|
|
return data
|
|
|
|
def _recv_response(self, cmd_id, timeout=3.0):
|
|
"""Read a response, expecting it to start with 8C <cmd_id>."""
|
|
# Read the 2-byte header first
|
|
hdr = self._recv(2, timeout)
|
|
if len(hdr) < 2:
|
|
return None
|
|
if hdr[0] != SYNC or hdr[1] != cmd_id:
|
|
# Try to resync — read until we find 8C <cmd_id>
|
|
buf = hdr
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
b = self.ser.read(1)
|
|
if not b:
|
|
continue
|
|
buf += b
|
|
idx = buf.find(bytes([SYNC, cmd_id]))
|
|
if idx >= 0:
|
|
hdr = buf[idx:idx+2]
|
|
break
|
|
else:
|
|
return None
|
|
return hdr
|
|
|
|
def identify(self):
|
|
"""Send identify command, return device name string."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_IDENTIFY)
|
|
resp = self._recv(16, timeout=2.0)
|
|
if len(resp) < 16:
|
|
return None
|
|
if resp[0] != SYNC or resp[1] != CMD_IDENTIFY:
|
|
return None
|
|
name = resp[2:10].decode("ascii", errors="replace").rstrip("\x00")
|
|
return name
|
|
|
|
def read_measurement_block(self):
|
|
"""Send 8C 13 and read the 3904-byte measurement block."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_READ_MEASUREMENT)
|
|
# First, read the 4-byte header: 8C 13 0F 40
|
|
hdr = self._recv(4, timeout=3.0)
|
|
if len(hdr) < 4 or hdr[0] != SYNC or hdr[1] != CMD_READ_MEASUREMENT:
|
|
return None
|
|
# Then read the 3904-byte payload
|
|
payload = self._recv(3904, timeout=5.0)
|
|
if len(payload) < 3904:
|
|
print(f"Warning: measurement block short ({len(payload)}/3904 bytes)")
|
|
return None
|
|
return payload
|
|
|
|
def read_electrical_block(self):
|
|
"""Send 8C 77 and read the 1584-byte electrical block."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_READ_ELECTRICAL)
|
|
# 4-byte header: 8C 77 06 30
|
|
hdr = self._recv(4, timeout=3.0)
|
|
if len(hdr) < 4 or hdr[0] != SYNC or hdr[1] != CMD_READ_ELECTRICAL:
|
|
return None
|
|
payload = self._recv(1584, timeout=5.0)
|
|
if len(payload) < 1584:
|
|
print(f"Warning: electrical block short ({len(payload)}/1584 bytes)")
|
|
return None
|
|
return payload
|
|
|
|
def poll_state(self):
|
|
"""Send 8C 03 and return (data_available, state)."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_POLL_STATE)
|
|
resp = self._recv(9, timeout=2.0)
|
|
if len(resp) < 9 or resp[0] != SYNC or resp[1] != CMD_POLL_STATE:
|
|
return None, None
|
|
data_available = resp[2] == 1
|
|
state = resp[5]
|
|
return data_available, state
|
|
|
|
def send_test_config(self, auto_psu=False):
|
|
"""Send single-test configuration (8C 2B sub 00 + sub 01).
|
|
If auto_psu=True, device handles PSU on/off automatically."""
|
|
self.ser.reset_input_buffer()
|
|
self.ser.write(bytes([SYNC, CMD_TEST_CONFIG]) + _TEST_CONFIG_SUB0)
|
|
resp = self._recv(2, timeout=2.0)
|
|
if resp != bytes([SYNC, CMD_TEST_CONFIG]):
|
|
return False
|
|
|
|
sub1 = bytearray(_TEST_CONFIG_SUB1_BASE)
|
|
if auto_psu:
|
|
sub1[37] = 0x01
|
|
self.ser.reset_input_buffer()
|
|
self.ser.write(bytes([SYNC, CMD_TEST_CONFIG]) + bytes(sub1))
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_TEST_CONFIG])
|
|
|
|
def start_measurement(self):
|
|
"""Send 8C 0E 01 to start measurement."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_START, 0x01)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_START])
|
|
|
|
def stop_measurement(self):
|
|
"""Send 8C 0E 02 to stop measurement."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_START, 0x02)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_START])
|
|
|
|
def reset(self):
|
|
"""Send 8C 25 to reset."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_RESET)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_RESET])
|
|
|
|
# --- Power supply control ---
|
|
|
|
def psu_on(self):
|
|
"""Turn PSU output on."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_PSU_OUTPUT, 0x00)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_PSU_OUTPUT])
|
|
|
|
def psu_off(self):
|
|
"""Turn PSU output off."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_PSU_OUTPUT, 0x01)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_PSU_OUTPUT])
|
|
|
|
def set_mode(self, mode):
|
|
"""Set output mode: 'ac' or 'dc'."""
|
|
mode_byte = 0x00 if mode.lower() == "ac" else 0x01
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_SET_MODE, mode_byte)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_SET_MODE])
|
|
|
|
def set_ac_voltage(self, volts):
|
|
"""Set AC output voltage (100-240 V)."""
|
|
self.ser.reset_input_buffer()
|
|
data = bytes([SYNC, CMD_SET_AC, 0x00]) + struct.pack("<f", volts)
|
|
self.ser.write(data)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_SET_AC])
|
|
|
|
def set_ac_frequency(self, hz):
|
|
"""Set AC output frequency (50 or 60 Hz)."""
|
|
self.ser.reset_input_buffer()
|
|
data = bytes([SYNC, CMD_SET_AC, 0x01]) + struct.pack("<f", hz)
|
|
self.ser.write(data)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_SET_AC])
|
|
|
|
def set_dc_voltage(self, volts):
|
|
"""Set DC output voltage (1-60 V)."""
|
|
self.ser.reset_input_buffer()
|
|
data = bytes([SYNC, CMD_SET_DC, 0x01]) + struct.pack("<f", volts)
|
|
self.ser.write(data)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_SET_DC])
|
|
|
|
def set_dc_current(self, amps):
|
|
"""Set DC current limit (0-5 A)."""
|
|
self.ser.reset_input_buffer()
|
|
data = bytes([SYNC, CMD_SET_DC, 0x00]) + struct.pack("<f", amps)
|
|
self.ser.write(data)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_SET_DC])
|
|
|
|
def set_integration_time(self, ms):
|
|
"""Set integration time in milliseconds. Use 0 for auto."""
|
|
us = int(ms * 1000)
|
|
self.ser.reset_input_buffer()
|
|
data = bytes([SYNC, CMD_INTEGRATION_TIME]) + struct.pack("<I", us)
|
|
self.ser.write(data)
|
|
resp = self._recv(2, timeout=2.0)
|
|
return resp == bytes([SYNC, CMD_INTEGRATION_TIME])
|
|
|
|
def read_psu_settings(self):
|
|
"""Read current power supply settings."""
|
|
self.ser.reset_input_buffer()
|
|
self._send(CMD_READ_PSU)
|
|
resp = self._recv(20, timeout=2.0)
|
|
if len(resp) < 20 or resp[0] != SYNC or resp[1] != CMD_READ_PSU:
|
|
return None
|
|
payload = resp[2:]
|
|
return {
|
|
"ac_voltage": struct.unpack_from("<f", payload, 0)[0],
|
|
"ac_frequency": struct.unpack_from("<f", payload, 4)[0],
|
|
"dc_voltage": struct.unpack_from("<f", payload, 8)[0],
|
|
"dc_current": struct.unpack_from("<f", payload, 12)[0],
|
|
"mode": "DC" if payload[16] == 1 else "AC",
|
|
}
|
|
|
|
def wait_for_data(self, timeout=30.0):
|
|
"""Poll until data is available or timeout."""
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
data_avail, state = self.poll_state()
|
|
if data_avail:
|
|
return True
|
|
time.sleep(0.1)
|
|
return False
|
|
|
|
def take_reading(self, psu=True):
|
|
"""Take a single measurement reading. Returns dict or None.
|
|
If psu=True, turns PSU output on before measuring and off after."""
|
|
if psu:
|
|
self.psu_on()
|
|
time.sleep(0.2)
|
|
|
|
# Start measurement
|
|
if not self.start_measurement():
|
|
print("Failed to start measurement")
|
|
if psu:
|
|
self.psu_off()
|
|
return None
|
|
|
|
# Wait for data
|
|
if not self.wait_for_data(timeout=30.0):
|
|
print("Timeout waiting for measurement data")
|
|
self.stop_measurement()
|
|
if psu:
|
|
self.psu_off()
|
|
return None
|
|
|
|
# Read measurement block
|
|
meas = self.read_measurement_block()
|
|
elec = self.read_electrical_block()
|
|
|
|
if meas is None:
|
|
print("Failed to read measurement block")
|
|
self.stop_measurement()
|
|
if psu:
|
|
self.psu_off()
|
|
return None
|
|
|
|
result = self._parse_measurement(meas)
|
|
|
|
if elec is not None:
|
|
result.update(self._parse_electrical(elec))
|
|
|
|
# Stop measurement
|
|
self.stop_measurement()
|
|
|
|
if psu:
|
|
self.psu_off()
|
|
|
|
return result
|
|
|
|
def take_single_reading(self, auto_psu=False):
|
|
"""Trigger a single measurement matching the vendor's single-test flow.
|
|
Sends test config, then stop to trigger a one-shot reading, waits for
|
|
data, reads it, then resets.
|
|
If auto_psu=True, the device handles PSU on/off automatically."""
|
|
self.send_test_config(auto_psu=auto_psu)
|
|
self.stop_measurement()
|
|
|
|
if not self.wait_for_data(timeout=30.0):
|
|
print("Timeout waiting for measurement data")
|
|
return None
|
|
|
|
meas = self.read_measurement_block()
|
|
elec = self.read_electrical_block()
|
|
|
|
if meas is None:
|
|
print("Failed to read measurement block")
|
|
return None
|
|
|
|
result = self._parse_measurement(meas)
|
|
if elec is not None:
|
|
result.update(self._parse_electrical(elec))
|
|
|
|
self.reset()
|
|
return result
|
|
|
|
def read_current(self):
|
|
"""Read current data without starting/stopping measurement.
|
|
Use this when the vendor software is controlling the instrument."""
|
|
meas = self.read_measurement_block()
|
|
elec = self.read_electrical_block()
|
|
|
|
if meas is None:
|
|
return None
|
|
|
|
result = self._parse_measurement(meas)
|
|
if elec is not None:
|
|
result.update(self._parse_electrical(elec))
|
|
return result
|
|
|
|
def _parse_measurement(self, data):
|
|
"""Parse the 3904-byte measurement block into a dict."""
|
|
result = {}
|
|
|
|
# Check header
|
|
header = data[:10].decode("ascii", errors="replace").rstrip("\x00")
|
|
result["device"] = header
|
|
|
|
# Extract all named fields
|
|
for name, offset in FIELDS.items():
|
|
if offset + 4 <= len(data):
|
|
result[name] = struct.unpack_from("<f", data, offset)[0]
|
|
|
|
# Extract date/time strings
|
|
date_off = 272
|
|
time_off = 283
|
|
if date_off + 10 <= len(data):
|
|
date_str = data[date_off:date_off+10].decode("ascii", errors="replace").rstrip("\x00")
|
|
result["test_date"] = date_str
|
|
if time_off + 8 <= len(data):
|
|
time_str = data[time_off:time_off+8].decode("ascii", errors="replace").rstrip("\x00")
|
|
result["test_time"] = time_str
|
|
|
|
# Extract spectrum
|
|
spectrum = []
|
|
for i in range(SPECTRUM_COUNT):
|
|
off = SPECTRUM_OFFSET + i * 4
|
|
if off + 4 <= len(data):
|
|
val = struct.unpack_from("<f", data, off)[0]
|
|
spectrum.append(val)
|
|
result["spectrum"] = spectrum
|
|
result["spectrum_nm"] = [
|
|
SPECTRUM_START_NM + i * SPECTRUM_STEP for i in range(len(spectrum))
|
|
]
|
|
|
|
return result
|
|
|
|
def _parse_electrical(self, data):
|
|
"""Parse the 1584-byte electrical block into a dict."""
|
|
result = {}
|
|
for name, offset in ELECTRICAL_FIELDS.items():
|
|
if offset + 4 <= len(data):
|
|
result[name] = struct.unpack_from("<f", data, offset)[0]
|
|
|
|
# Check if harmonics data is present (non-zero at harmonics offset)
|
|
if len(data) >= ATHD_OFFSET + 4:
|
|
h1_v = struct.unpack_from("<f", data, HARMONICS_V_OFFSET)[0]
|
|
if abs(h1_v - 100.0) < 1.0: # H1 should be ~100%
|
|
# Voltage harmonics
|
|
v_harmonics = []
|
|
for i in range(HARMONICS_V_COUNT):
|
|
off = HARMONICS_V_OFFSET + i * 4
|
|
v_harmonics.append(struct.unpack_from("<f", data, off)[0])
|
|
result["V_harmonics"] = v_harmonics
|
|
result["UThd"] = struct.unpack_from("<f", data, UTHD_OFFSET)[0]
|
|
|
|
# Current harmonics
|
|
i_harmonics = []
|
|
for i in range(HARMONICS_I_COUNT):
|
|
off = HARMONICS_I_OFFSET + i * 4
|
|
i_harmonics.append(struct.unpack_from("<f", data, off)[0])
|
|
result["I_harmonics"] = i_harmonics
|
|
result["AThd"] = struct.unpack_from("<f", data, ATHD_OFFSET)[0]
|
|
|
|
# Waveforms (int16 LE)
|
|
v_waveform = []
|
|
for i in range(WAVEFORM_SAMPLES):
|
|
off = WAVEFORM_V_OFFSET + i * 2
|
|
v_waveform.append(struct.unpack_from("<h", data, off)[0])
|
|
result["V_waveform"] = v_waveform
|
|
|
|
i_waveform = []
|
|
for i in range(WAVEFORM_SAMPLES):
|
|
off = WAVEFORM_I_OFFSET + i * 2
|
|
i_waveform.append(struct.unpack_from("<h", data, off)[0])
|
|
result["I_waveform"] = i_waveform
|
|
|
|
return result
|
|
|
|
|
|
def find_hpcs_port():
|
|
"""Auto-detect the HPCS 6500 COM port."""
|
|
for port in serial.tools.list_ports.comports():
|
|
if port.vid == 0x0483 and port.pid == 0x5741:
|
|
return port.device
|
|
return None
|
|
|
|
|
|
def format_reading(r, show_spectrum=False):
|
|
"""Format a reading dict for display."""
|
|
lines = []
|
|
lines.append(f" Device: {r.get('device', '?')}")
|
|
lines.append(f" Date: {r.get('test_date', '?')} Time: {r.get('test_time', '?')}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- Photometric ---")
|
|
lines.append(f" Phi (lm): {r.get('Phi_lm', 0):10.2f}")
|
|
lines.append(f" eta (lm/W): {r.get('eta_lm_W', 0):10.2f}")
|
|
lines.append(f" CCT (K): {r.get('CCT_K', 0):10.0f}")
|
|
lines.append(f" Duv: {r.get('Duv', 0):10.5f}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- Chromaticity ---")
|
|
lines.append(f" x: {r.get('x', 0):.4f} y: {r.get('y', 0):.4f}")
|
|
lines.append(f" u: {r.get('u', 0):.4f} v: {r.get('v', 0):.4f}")
|
|
lines.append(f" u': {r.get('u_prime', 0):.4f} v': {r.get('v_prime', 0):.4f}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- CRI ---")
|
|
ra = r.get('Ra', 0)
|
|
lines.append(f" Ra: {ra:.1f} SDCM: {r.get('SDCM', 0):.2f}")
|
|
ri = []
|
|
for i in range(1, 16):
|
|
ri.append(f"R{i}={r.get(f'R{i}', 0):.0f}")
|
|
lines.append(f" {', '.join(ri[:8])}")
|
|
lines.append(f" {', '.join(ri[8:])}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- CIE 1931 ---")
|
|
lines.append(f" X: {r.get('CIE_X', 0):.3f} Y: {r.get('CIE_Y', 0):.3f} Z: {r.get('CIE_Z', 0):.3f}")
|
|
lines.append(f" TLCI-2012: {r.get('TLCI', 0):.0f}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- Radiometric ---")
|
|
lines.append(f" Phi_e (mW): {r.get('Phi_e_mW', 0):10.3f}")
|
|
lines.append(f" Phi_eb (mW): {r.get('Phi_eb_mW', 0):10.3f}")
|
|
lines.append(f" Phi_ey (mW): {r.get('Phi_ey_mW', 0):10.3f}")
|
|
lines.append(f" Phi_er (mW): {r.get('Phi_er_mW', 0):10.3f}")
|
|
lines.append(f" Phi_efr (mW): {r.get('Phi_efr_mW', 0):10.3f}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- Electrical ---")
|
|
lines.append(f" Voltage (V): {r.get('Voltage_V', 0):10.2f}")
|
|
lines.append(f" Current (A): {r.get('Current_A', 0):10.4f}")
|
|
lines.append(f" Power (W): {r.get('Power_W', 0):10.3f}")
|
|
lines.append(f" Frequency (Hz):{r.get('Freq_Hz', 0):10.2f}")
|
|
lines.append(f" Power Factor: {r.get('PF', 0):10.4f}")
|
|
|
|
if "UThd" in r:
|
|
lines.append(f" UThd (%): {r['UThd']:10.4f}")
|
|
lines.append(f" AThd (%): {r.get('AThd', 0):10.4f}")
|
|
lines.append("")
|
|
|
|
if show_spectrum and "V_harmonics" in r:
|
|
lines.append(" --- Voltage Harmonics (%) ---")
|
|
vh = r["V_harmonics"]
|
|
for i, val in enumerate(vh):
|
|
if val > 0.001 or i == 0:
|
|
lines.append(f" H{i+1:2d}: {val:8.4f}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- Current Harmonics (%) ---")
|
|
ih = r["I_harmonics"]
|
|
for i, val in enumerate(ih):
|
|
if val > 0.001 or i == 0:
|
|
lines.append(f" H{i+1:2d}: {val:8.4f}")
|
|
lines.append("")
|
|
|
|
lines.append(" --- Sensor ---")
|
|
lines.append(f" Peak Signal: {r.get('PeakSignal', 0):10.0f}")
|
|
lines.append(f" Dark Signal: {r.get('DarkSignal', 0):10.0f}")
|
|
lines.append(f" Compensate: {r.get('Compensate', 0):10.0f}")
|
|
|
|
if show_spectrum and "spectrum" in r:
|
|
lines.append("")
|
|
lines.append(" --- Spectrum (380-1050nm) ---")
|
|
spectrum = r["spectrum"]
|
|
nm = r["spectrum_nm"]
|
|
peak_val = max(spectrum) if spectrum else 1
|
|
for i, (wl, val) in enumerate(zip(nm, spectrum)):
|
|
if i % 5 == 0: # Print every 5th point (~10nm spacing)
|
|
norm = val / peak_val if peak_val > 0 else 0
|
|
bar = "#" * int(norm * 40)
|
|
lines.append(f" {wl:7.1f}nm: {val:8.4f} [{bar}]")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_quick(r, reading_num=None):
|
|
"""Format a reading for quick test mode: lumen + CCT only."""
|
|
lm = r.get("Phi_lm", 0)
|
|
cct = r.get("CCT_K", 0)
|
|
prefix = f"#{reading_num} " if reading_num else ""
|
|
return f" {prefix}{lm:.1f} lm {cct:.0f} K"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="HPCS 6500 Spectrophotometer Reader")
|
|
parser.add_argument("--port", help="COM port (auto-detect if not specified)")
|
|
parser.add_argument("--continuous", action="store_true", help="Continuous reading mode")
|
|
parser.add_argument("--csv", metavar="FILE", help="Save readings to CSV file")
|
|
parser.add_argument("--spectrum", action="store_true", help="Show full spectrum data")
|
|
parser.add_argument("--passive", action="store_true",
|
|
help="Passive mode: read data without controlling instrument")
|
|
parser.add_argument("--quick", action="store_true",
|
|
help="Quick test mode: only report lumen and CCT")
|
|
parser.add_argument("--harmonics", action="store_true",
|
|
help="Show harmonics data (voltage/current THD and per-harmonic)")
|
|
parser.add_argument("--parse", metavar="PCAP", help="Parse a pcap capture file instead")
|
|
|
|
# Power supply control
|
|
psu = parser.add_argument_group("power supply")
|
|
psu.add_argument("--mode", choices=["ac", "dc"], help="Set output mode")
|
|
psu.add_argument("--voltage", type=float, help="Set output voltage (V)")
|
|
psu.add_argument("--frequency", type=float, help="Set AC frequency (Hz)")
|
|
psu.add_argument("--current", type=float, help="Set DC current limit (A)")
|
|
psu.add_argument("--integration", type=float, help="Set integration time (ms), 0=auto")
|
|
psu.add_argument("--psu-on", action="store_true", help="Turn PSU output on")
|
|
psu.add_argument("--psu-off", action="store_true", help="Turn PSU output off")
|
|
psu.add_argument("--no-psu", action="store_true",
|
|
help="Don't auto-control PSU during measurements")
|
|
psu.add_argument("--psu-status", action="store_true", help="Read power supply settings")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# --harmonics implies --spectrum for the detailed display
|
|
if args.harmonics:
|
|
args.spectrum = True
|
|
|
|
# Parse mode: analyze an existing capture
|
|
if args.parse:
|
|
messages = parse_pcap_messages(args.parse)
|
|
blocks = [m for m in messages if m["dir"] == "RX" and len(m["data"]) == 3904
|
|
and m["data"][:8] == b"HPCS6500"]
|
|
config_blocks = [m for m in messages if m["dir"] == "RX" and len(m["data"]) == 1584]
|
|
|
|
dev = HPCS6500.__new__(HPCS6500)
|
|
for i, block in enumerate(blocks):
|
|
result = dev._parse_measurement(block["data"])
|
|
if i < len(config_blocks):
|
|
result.update(dev._parse_electrical(config_blocks[i]["data"]))
|
|
if args.quick:
|
|
print(format_quick(result, i + 1))
|
|
else:
|
|
print(f"\n{'='*60}")
|
|
print(f"Reading {i+1} (t={block['ts']:.2f}s)")
|
|
print(f"{'='*60}")
|
|
print(format_reading(result, show_spectrum=args.spectrum))
|
|
return
|
|
|
|
# Find port
|
|
port = args.port or find_hpcs_port()
|
|
if not port:
|
|
print("ERROR: HPCS 6500 not found. Connect the device or specify --port.")
|
|
print("\nAvailable ports:")
|
|
for p in serial.tools.list_ports.comports():
|
|
print(f" {p.device}: {p.description} [{p.hwid}]")
|
|
sys.exit(1)
|
|
|
|
print(f"HPCS 6500 Spectrophotometer Reader")
|
|
print(f"Port: {port}")
|
|
print("=" * 60)
|
|
|
|
dev = HPCS6500(port)
|
|
|
|
# Identify
|
|
name = dev.identify()
|
|
if name:
|
|
print(f"Device: {name}")
|
|
else:
|
|
print("Warning: no identify response (device may be busy)")
|
|
|
|
# Power supply control
|
|
has_psu_cmd = any([args.mode, args.voltage is not None, args.frequency is not None,
|
|
args.current is not None, args.integration is not None,
|
|
args.psu_status, args.psu_on, args.psu_off])
|
|
|
|
if args.psu_status:
|
|
settings = dev.read_psu_settings()
|
|
if settings:
|
|
print(f"\nPower Supply Settings:")
|
|
print(f" Mode: {settings['mode']}")
|
|
print(f" AC Voltage: {settings['ac_voltage']:.0f} V")
|
|
print(f" AC Frequency: {settings['ac_frequency']:.0f} Hz")
|
|
print(f" DC Voltage: {settings['dc_voltage']:.1f} V")
|
|
print(f" DC Current: {settings['dc_current']:.1f} A")
|
|
else:
|
|
print("Failed to read PSU settings")
|
|
if not args.continuous and args.voltage is None and args.mode is None:
|
|
dev.close()
|
|
return
|
|
|
|
if args.mode:
|
|
ok = dev.set_mode(args.mode)
|
|
print(f"Set mode to {args.mode.upper()}: {'OK' if ok else 'FAILED'}")
|
|
|
|
if args.voltage is not None:
|
|
mode = args.mode or "ac"
|
|
if mode == "ac":
|
|
ok = dev.set_ac_voltage(args.voltage)
|
|
else:
|
|
ok = dev.set_dc_voltage(args.voltage)
|
|
print(f"Set {mode.upper()} voltage to {args.voltage} V: {'OK' if ok else 'FAILED'}")
|
|
|
|
if args.frequency is not None:
|
|
ok = dev.set_ac_frequency(args.frequency)
|
|
print(f"Set AC frequency to {args.frequency} Hz: {'OK' if ok else 'FAILED'}")
|
|
|
|
if args.current is not None:
|
|
ok = dev.set_dc_current(args.current)
|
|
print(f"Set DC current limit to {args.current} A: {'OK' if ok else 'FAILED'}")
|
|
|
|
if args.integration is not None:
|
|
ok = dev.set_integration_time(args.integration)
|
|
print(f"Set integration time to {args.integration} ms: {'OK' if ok else 'FAILED'}")
|
|
|
|
if args.psu_on:
|
|
ok = dev.psu_on()
|
|
print(f"PSU output ON: {'OK' if ok else 'FAILED'}")
|
|
|
|
if args.psu_off:
|
|
ok = dev.psu_off()
|
|
print(f"PSU output OFF: {'OK' if ok else 'FAILED'}")
|
|
|
|
# If only PSU commands were given with no measurement request, exit
|
|
if has_psu_cmd and not args.continuous and not args.passive:
|
|
dev.close()
|
|
return
|
|
|
|
csv_writer = None
|
|
csv_file = None
|
|
if args.csv:
|
|
csv_file = open(args.csv, "w", newline="")
|
|
fieldnames = ["timestamp"] + list(FIELDS.keys()) + list(ELECTRICAL_FIELDS.keys())
|
|
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames, extrasaction="ignore")
|
|
csv_writer.writeheader()
|
|
print(f"Saving to: {args.csv}")
|
|
|
|
use_psu = not args.no_psu and not args.passive
|
|
|
|
try:
|
|
if use_psu:
|
|
dev.psu_on()
|
|
time.sleep(0.2)
|
|
|
|
reading_num = 0
|
|
while True:
|
|
reading_num += 1
|
|
|
|
if not args.quick:
|
|
print(f"\n{'='*60}")
|
|
print(f"Reading {reading_num} ({datetime.now().strftime('%H:%M:%S')})")
|
|
print(f"{'='*60}")
|
|
|
|
if args.passive:
|
|
result = dev.read_current()
|
|
else:
|
|
result = dev.take_reading(psu=False)
|
|
|
|
if result is None:
|
|
print("Failed to get reading")
|
|
if not args.continuous:
|
|
break
|
|
time.sleep(1)
|
|
continue
|
|
|
|
if args.quick:
|
|
print(format_quick(result, reading_num))
|
|
else:
|
|
print(format_reading(result, show_spectrum=args.spectrum))
|
|
|
|
if csv_writer:
|
|
row = {"timestamp": datetime.now().isoformat()}
|
|
row.update({k: v for k, v in result.items()
|
|
if isinstance(v, (int, float))})
|
|
csv_writer.writerow(row)
|
|
csv_file.flush()
|
|
|
|
if not args.continuous:
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|
|
finally:
|
|
if use_psu:
|
|
dev.psu_off()
|
|
dev.reset()
|
|
dev.close()
|
|
if csv_file:
|
|
csv_file.close()
|
|
print(f"Data saved to: {args.csv}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|