Files
HPCS6500-py/usb_capture.py
grabowski 0f09c70215 first commit
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 13:57:58 +07:00

342 lines
11 KiB
Python

"""
HPCS6500 USB Traffic Capture & Parser
1. Starts USBPcap to capture USB traffic from the HPCS6500 device
2. You use the vendor software normally while this runs
3. Press Ctrl+C to stop
4. Parses the .pcap file and extracts serial data (USB bulk transfers)
"""
import subprocess
import struct
import sys
import os
import time
from datetime import datetime
USBPCAP_CMD = r"C:\Program Files\USBPcap\USBPcapCMD.exe"
CAPTURE_DIR = "captures"
DEVICE_ADDRESS = 4 # From registry: Address = 4
def find_usbpcap_devices():
"""Return all USBPcap devices (USBPcap1 through USBPcap10)."""
devices = []
for i in range(1, 11):
device = rf"\\.\USBPcap{i}"
# Check if the device path exists by trying a quick capture to NUL
try:
result = subprocess.run(
[USBPCAP_CMD, "-d", device, "-o", "-", "-A"],
capture_output=True, timeout=2,
)
devices.append(device)
print(f" {device} - available")
except subprocess.TimeoutExpired:
devices.append(device)
print(f" {device} - available (active)")
except Exception:
pass
return devices
def start_capture(device, output_file):
"""Start USBPcap capture in background."""
cmd = [
USBPCAP_CMD,
"-d", device,
"-o", output_file,
"-A", # Capture from all devices on this hub (safe, we filter later)
"--devices", str(DEVICE_ADDRESS), # Only our device
"-s", "65535", # Max snapshot length
"-b", "1048576", # 1MB buffer
]
print(f"Starting capture: {' '.join(cmd)}")
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return proc
def parse_pcap(filepath):
"""Parse a pcap file with USBPcap headers and extract serial data."""
with open(filepath, "rb") as f:
data = f.read()
if len(data) < 24:
print("Capture file too small or empty.")
return
# Parse pcap global header
magic = struct.unpack_from("<I", data, 0)[0]
if magic == 0xA1B2C3D4:
endian = "<"
elif magic == 0xD4C3B2A1:
endian = ">"
else:
print(f"Unknown pcap magic: {hex(magic)}")
return
ver_major, ver_minor = struct.unpack_from(f"{endian}HH", data, 4)
snaplen = struct.unpack_from(f"{endian}I", data, 16)[0]
link_type = struct.unpack_from(f"{endian}I", data, 20)[0]
print(f"PCAP: v{ver_major}.{ver_minor}, snaplen={snaplen}, link_type={link_type}")
# link_type 249 = USBPcap
if link_type != 249:
print(f"Warning: expected link_type 249 (USBPcap), got {link_type}")
offset = 24 # Start of first packet
packets = []
tx_data = bytearray() # Host -> Device (OUT)
rx_data = bytearray() # Device -> Host (IN)
pkt_num = 0
while offset + 16 <= len(data):
# pcap packet header: ts_sec(4), ts_usec(4), incl_len(4), orig_len(4)
ts_sec, ts_usec, incl_len, orig_len = struct.unpack_from(f"{endian}IIII", data, offset)
offset += 16
if offset + incl_len > len(data):
break
pkt_data = data[offset:offset + incl_len]
offset += incl_len
pkt_num += 1
# Parse USBPcap packet header (minimum 27 bytes)
if len(pkt_data) < 27:
continue
# USBPcap header:
# headerLen (2 bytes LE)
# irpId (8 bytes)
# status (4 bytes)
# function (2 bytes) - URB function
# info (1 byte) - direction: bit 0: 0=OUT(host->dev), 1=IN(dev->host)
# bus (2 bytes)
# device (2 bytes)
# endpoint (1 byte) - bit 7 = direction, bits 0-3 = endpoint number
# transfer (1 byte) - 0=isoch, 1=interrupt, 2=control, 3=bulk
# dataLength (4 bytes)
hdr_len = struct.unpack_from("<H", pkt_data, 0)[0]
irp_id = struct.unpack_from("<Q", pkt_data, 2)[0]
status = struct.unpack_from("<I", pkt_data, 10)[0]
function = struct.unpack_from("<H", pkt_data, 14)[0]
info = pkt_data[16]
bus = struct.unpack_from("<H", pkt_data, 17)[0]
device = struct.unpack_from("<H", pkt_data, 19)[0]
endpoint = pkt_data[21]
transfer = pkt_data[22]
data_length = struct.unpack_from("<I", pkt_data, 23)[0]
# Payload starts after the USBPcap header
payload = pkt_data[hdr_len:]
if len(payload) == 0:
continue
# We care about bulk transfers (transfer type 3) with actual data
transfer_types = {0: "ISOCH", 1: "INT", 2: "CTRL", 3: "BULK"}
transfer_name = transfer_types.get(transfer, f"UNK({transfer})")
direction = "IN" if (info & 1) else "OUT"
ep_num = endpoint & 0x0F
ep_dir = "IN" if (endpoint & 0x80) else "OUT"
ts = ts_sec + ts_usec / 1_000_000
# Log all packets with data
if transfer == 3 and len(payload) > 0: # Bulk transfers
packets.append({
"num": pkt_num,
"ts": ts,
"direction": direction,
"ep": endpoint,
"ep_dir": ep_dir,
"transfer": transfer_name,
"data": payload,
"status": status,
"device": device,
})
if direction == "IN" or ep_dir == "IN":
rx_data.extend(payload)
else:
tx_data.extend(payload)
print(f"\nParsed {pkt_num} total packets, {len(packets)} bulk transfers with data")
print(f"TX (host -> device): {len(tx_data)} bytes")
print(f"RX (device -> host): {len(rx_data)} bytes")
if not packets:
print("\nNo bulk transfer data found.")
return
# Print timeline
print(f"\n{'='*80}")
print("TRAFFIC TIMELINE")
print(f"{'='*80}")
first_ts = packets[0]["ts"] if packets else 0
for pkt in packets[:100]: # First 100 packets
rel_ts = pkt["ts"] - first_ts
d = pkt["direction"]
payload = pkt["data"]
if d == "IN" or pkt["ep_dir"] == "IN":
color = "\033[92m" # green for device -> host
label = "DEV->PC"
else:
color = "\033[93m" # yellow for host -> device
label = "PC->DEV"
reset = "\033[0m"
hex_str = payload.hex(" ")
if len(hex_str) > 90:
hex_str = hex_str[:90] + f" ... (+{len(payload) - 30}B)"
ascii_str = "".join(chr(b) if 32 <= b < 127 else "." for b in payload[:40])
print(f"{color}[{rel_ts:8.4f}] {label} EP{pkt['ep']:02X} ({len(payload):4d}B): {hex_str}{reset}")
# Show ASCII if useful
printable = sum(1 for b in payload if 32 <= b < 127)
if printable > len(payload) * 0.4:
print(f" ASCII: {ascii_str!r}")
if len(packets) > 100:
print(f" ... ({len(packets) - 100} more packets)")
# Save extracted serial data
if tx_data:
tx_file = filepath.replace(".pcap", "_TX.bin")
with open(tx_file, "wb") as f:
f.write(tx_data)
print(f"\nTX data saved to: {tx_file}")
if rx_data:
rx_file = filepath.replace(".pcap", "_RX.bin")
with open(rx_file, "wb") as f:
f.write(rx_data)
print(f"\nRX data saved to: {rx_file}")
# Quick analysis of the data
print(f"\n{'='*80}")
print("QUICK ANALYSIS")
print(f"{'='*80}")
for label, stream in [("TX (PC -> Device)", tx_data), ("RX (Device -> PC)", rx_data)]:
if not stream:
continue
print(f"\n--- {label}: {len(stream)} bytes ---")
printable = sum(1 for b in stream if 32 <= b < 127 or b in (0x0A, 0x0D))
print(f" Printable ratio: {printable/len(stream):.0%}")
# Show first 200 bytes
sample = bytes(stream[:200])
print(f" HEX: {sample.hex(' ')}")
print(f" ASCII: {''.join(chr(b) if 32 <= b < 127 else '.' for b in sample)}")
# Try to find repeating patterns
if len(stream) > 10:
# Look for common delimiters
for delim_name, delim in [("\\r\\n", b"\r\n"), ("\\n", b"\n"), ("0xFF", b"\xff"), ("0xAA", b"\xaa")]:
count = bytes(stream).count(delim)
if count > 2:
print(f" Delimiter {delim_name}: appears {count} times")
def main():
os.makedirs(CAPTURE_DIR, exist_ok=True)
if len(sys.argv) > 1 and sys.argv[1] == "--parse":
# Parse mode: just parse an existing pcap file
if len(sys.argv) < 3:
print("Usage: usb_capture.py --parse <file.pcap>")
sys.exit(1)
parse_pcap(sys.argv[2])
return
print("HPCS6500 USB Traffic Capture")
print("=" * 60)
print(f"Device address: {DEVICE_ADDRESS}")
print()
# Find all USBPcap devices
print("Looking for USBPcap filter devices...")
devices = find_usbpcap_devices()
if not devices:
print("ERROR: No USBPcap devices found.")
print(" - You may need to REBOOT after installing USBPcap")
print(" - You must run this script as ADMINISTRATOR")
sys.exit(1)
print(f"\nFound {len(devices)} USBPcap hub(s)")
print()
# Try ALL hubs — capture from each one simultaneously
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
procs = []
for i, device in enumerate(devices):
pcap_file = os.path.join(CAPTURE_DIR, f"usb_capture_{timestamp}_hub{i+1}.pcap")
print(f"Starting capture on {device} -> {pcap_file}")
proc = start_capture(device, pcap_file)
procs.append((proc, device, pcap_file))
print()
print("Capturing on ALL hubs simultaneously.")
print(">>> Now use the vendor software — take a measurement")
print(">>> Press Ctrl+C when done")
print("-" * 60)
try:
while True:
# Check if any process died
all_dead = True
for proc, device, _ in procs:
if proc.poll() is None:
all_dead = False
if all_dead:
print("\nAll capture processes have exited.")
for proc, device, _ in procs:
stderr = proc.stderr.read().decode(errors="replace").strip()
print(f" {device}: exit code {proc.returncode}" +
(f" - {stderr}" if stderr else ""))
break
time.sleep(0.5)
except KeyboardInterrupt:
print("\n\nStopping captures...")
for proc, _, _ in procs:
proc.terminate()
for proc, _, _ in procs:
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
# Check which captures got data
print()
best_file = None
best_size = 0
for proc, device, pcap_file in procs:
if os.path.exists(pcap_file):
size = os.path.getsize(pcap_file)
print(f"{device} -> {pcap_file}: {size} bytes")
if size > best_size:
best_size = size
best_file = pcap_file
else:
print(f"{device} -> no file created")
if best_file and best_size > 24:
print(f"\nBest capture: {best_file} ({best_size} bytes)")
print("\nParsing capture...\n")
parse_pcap(best_file)
else:
print("\nNo USB data captured from any hub.")
print("Make sure you're running as ADMINISTRATOR.")
if __name__ == "__main__":
main()