469 lines
20 KiB
Python
469 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Photon Feeder Mk2 Serial Debug Monitor
|
|
Console-based GUI for monitoring feeder diagnostics via USART1
|
|
|
|
Usage: python feeder_monitor.py [COM_PORT] [BAUD_RATE]
|
|
python feeder_monitor.py COM3
|
|
python feeder_monitor.py /dev/ttyUSB0 115200
|
|
|
|
Controls:
|
|
q - Quit
|
|
c - Clear screen
|
|
p - Pause/Resume display
|
|
r - Reset statistics
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import threading
|
|
from collections import deque
|
|
|
|
try:
|
|
import serial
|
|
import serial.tools.list_ports
|
|
except ImportError:
|
|
print("Error: pyserial not installed. Run: pip install pyserial")
|
|
sys.exit(1)
|
|
|
|
# ANSI color codes
|
|
class Colors:
|
|
RESET = '\033[0m'
|
|
BOLD = '\033[1m'
|
|
RED = '\033[91m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
BLUE = '\033[94m'
|
|
MAGENTA = '\033[95m'
|
|
CYAN = '\033[96m'
|
|
WHITE = '\033[97m'
|
|
DIM = '\033[2m'
|
|
|
|
# Enable ANSI on Windows
|
|
if os.name == 'nt':
|
|
os.system('')
|
|
|
|
def clear_screen():
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
|
|
def move_cursor(row, col):
|
|
print(f'\033[{row};{col}H', end='')
|
|
|
|
def hide_cursor():
|
|
print('\033[?25l', end='')
|
|
|
|
def show_cursor():
|
|
print('\033[?25h', end='')
|
|
|
|
|
|
class FeederMonitor:
|
|
def __init__(self, port, baudrate=115200):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.serial = None
|
|
self.running = False
|
|
self.paused = False
|
|
|
|
# Current data
|
|
self.position = 0
|
|
self.target = 0
|
|
self.error = 0
|
|
self.pid_output = 0
|
|
self.state = "UNKNOWN"
|
|
self.is_initialized = False
|
|
self.feed_in_progress = False
|
|
self.beefy_tape = False
|
|
self.address = 0xFF
|
|
self.sw1_pressed = False
|
|
self.sw2_pressed = False
|
|
self.drive_value = 0
|
|
self.drive_mode = 0
|
|
|
|
# Statistics
|
|
self.packet_count = 0
|
|
self.error_count = 0
|
|
self.start_time = time.time()
|
|
self.last_packet_time = 0
|
|
self.max_error = 0
|
|
self.min_error = 0
|
|
|
|
# History for sparkline
|
|
self.error_history = deque(maxlen=50)
|
|
self.pid_history = deque(maxlen=50)
|
|
|
|
# Lock for thread safety
|
|
self.lock = threading.Lock()
|
|
|
|
def connect(self):
|
|
try:
|
|
self.serial = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baudrate,
|
|
timeout=0.1,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE
|
|
)
|
|
return True
|
|
except serial.SerialException as e:
|
|
print(f"{Colors.RED}Error opening port {self.port}: {e}{Colors.RESET}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
if self.serial and self.serial.is_open:
|
|
self.serial.close()
|
|
|
|
def parse_packet(self, line):
|
|
"""Parse debug packet: $P:pos:tgt:err,I:pid,S:state,F:flags,A:addr,B:btns,D:drv*"""
|
|
try:
|
|
if not line.startswith('$') or not '*' in line:
|
|
return False
|
|
|
|
# Remove $ and * markers
|
|
line = line[1:line.index('*')]
|
|
|
|
# State number to name mapping
|
|
state_names = {
|
|
0: "IDLE", 1: "PEEL_FWD", 2: "PEEL_BACK", 3: "UNPEEL",
|
|
4: "DRIVING", 5: "SLACK_REM", 6: "BACKLASH", 7: "SETTLING",
|
|
8: "COMPLETE", 9: "TIMEOUT"
|
|
}
|
|
|
|
parts = line.split(',')
|
|
for part in parts:
|
|
if part.startswith('P:'):
|
|
values = part[2:].split(':')
|
|
if len(values) >= 3:
|
|
self.position = int(values[0])
|
|
self.target = int(values[1])
|
|
self.error = int(values[2])
|
|
self.error_history.append(self.error)
|
|
if abs(self.error) > abs(self.max_error):
|
|
self.max_error = self.error
|
|
if self.error < self.min_error:
|
|
self.min_error = self.error
|
|
|
|
elif part.startswith('I:'):
|
|
self.pid_output = int(part[2:])
|
|
self.pid_history.append(self.pid_output)
|
|
|
|
elif part.startswith('S:'):
|
|
state_num = int(part[2:])
|
|
self.state = state_names.get(state_num, f"UNK({state_num})")
|
|
|
|
elif part.startswith('F:'):
|
|
flags = part[2:]
|
|
if len(flags) >= 3:
|
|
self.is_initialized = flags[0] == '1'
|
|
self.feed_in_progress = flags[1] == '1'
|
|
self.beefy_tape = flags[2] == '1'
|
|
|
|
elif part.startswith('A:'):
|
|
self.address = int(part[2:], 16)
|
|
|
|
elif part.startswith('B:'):
|
|
btns = part[2:]
|
|
if len(btns) >= 2:
|
|
self.sw1_pressed = btns[0] == '1'
|
|
self.sw2_pressed = btns[1] == '1'
|
|
|
|
elif part.startswith('D:'):
|
|
self.drive_value = int(part[2:])
|
|
|
|
elif part.startswith('M:'):
|
|
self.drive_mode = int(part[2:])
|
|
|
|
self.packet_count += 1
|
|
self.last_packet_time = time.time()
|
|
return True
|
|
|
|
except (ValueError, IndexError) as e:
|
|
self.error_count += 1
|
|
return False
|
|
|
|
def read_thread(self):
|
|
"""Background thread for reading serial data"""
|
|
buffer = ""
|
|
while self.running:
|
|
try:
|
|
if self.serial and self.serial.in_waiting:
|
|
data = self.serial.read(self.serial.in_waiting)
|
|
buffer += data.decode('utf-8', errors='ignore')
|
|
|
|
while '\n' in buffer:
|
|
line, buffer = buffer.split('\n', 1)
|
|
line = line.strip()
|
|
if line:
|
|
with self.lock:
|
|
self.parse_packet(line)
|
|
else:
|
|
time.sleep(0.01)
|
|
except Exception as e:
|
|
time.sleep(0.1)
|
|
|
|
def make_sparkline(self, data, width=40, min_val=None, max_val=None):
|
|
"""Create a text sparkline from data"""
|
|
if not data:
|
|
return ' ' * width
|
|
|
|
chars = '▁▂▃▄▅▆▇█'
|
|
|
|
if min_val is None:
|
|
min_val = min(data)
|
|
if max_val is None:
|
|
max_val = max(data)
|
|
|
|
if max_val == min_val:
|
|
return chars[4] * min(len(data), width)
|
|
|
|
result = []
|
|
step = max(1, len(data) // width)
|
|
for i in range(0, min(len(data), width * step), step):
|
|
val = data[min(i, len(data)-1)]
|
|
normalized = (val - min_val) / (max_val - min_val)
|
|
idx = min(int(normalized * (len(chars) - 1)), len(chars) - 1)
|
|
result.append(chars[idx])
|
|
|
|
return ''.join(result).ljust(width)
|
|
|
|
def make_bar(self, value, max_value, width=20, char='█'):
|
|
"""Create a progress bar"""
|
|
if max_value == 0:
|
|
return ' ' * width
|
|
ratio = min(abs(value) / max_value, 1.0)
|
|
filled = int(ratio * width)
|
|
return (char * filled).ljust(width)
|
|
|
|
def display(self):
|
|
"""Display the monitor interface"""
|
|
move_cursor(1, 1)
|
|
|
|
uptime = time.time() - self.start_time
|
|
pkt_rate = self.packet_count / uptime if uptime > 0 else 0
|
|
last_pkt_ago = time.time() - self.last_packet_time if self.last_packet_time > 0 else 999
|
|
|
|
with self.lock:
|
|
# Header
|
|
print(f"{Colors.BOLD}{Colors.CYAN}╔══════════════════════════════════════════════════════════════════════╗{Colors.RESET}")
|
|
print(f"{Colors.BOLD}{Colors.CYAN}║{Colors.WHITE} PHOTON FEEDER Mk2 DEBUG MONITOR {Colors.CYAN}║{Colors.RESET}")
|
|
print(f"{Colors.BOLD}{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# Connection status
|
|
conn_color = Colors.GREEN if last_pkt_ago < 1 else (Colors.YELLOW if last_pkt_ago < 5 else Colors.RED)
|
|
conn_status = "CONNECTED" if last_pkt_ago < 5 else "NO DATA"
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Port: {Colors.YELLOW}{self.port:10}{Colors.RESET} @ {self.baudrate} baud │ Status: {conn_color}{conn_status:10}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# Position section
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}POSITION{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Current: {Colors.GREEN}{self.position:>10}{Colors.RESET} Target: {Colors.YELLOW}{self.target:>10}{Colors.RESET} Error: {Colors.RED if abs(self.error) > 10 else Colors.GREEN}{self.error:>8}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
# Error sparkline
|
|
err_spark = self.make_sparkline(list(self.error_history), 50)
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Error: [{Colors.DIM}{err_spark}{Colors.RESET}] {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# PID section
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}PID CONTROL{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
pid_color = Colors.GREEN if self.pid_output >= 0 else Colors.RED
|
|
pid_dir = "FWD" if self.pid_output >= 0 else "REV"
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Output: {pid_color}{self.pid_output:>6}{Colors.RESET} ({pid_dir}) Drive Value: {Colors.YELLOW}{self.drive_value:>3}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
# PID sparkline
|
|
pid_spark = self.make_sparkline(list(self.pid_history), 50, -2400, 2400)
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Output: [{Colors.DIM}{pid_spark}{Colors.RESET}] {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
# PWM bar
|
|
pwm_bar = self.make_bar(self.pid_output, 2400, 30)
|
|
pwm_pct = abs(self.pid_output) / 2400 * 100
|
|
print(f"{Colors.CYAN}║{Colors.RESET} PWM: [{pid_color}{pwm_bar}{Colors.RESET}] {pwm_pct:5.1f}% {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# State section
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}FEED STATE{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
state_color = Colors.GREEN if self.state == "IDLE" else (Colors.YELLOW if self.state == "DRIVING" else Colors.CYAN)
|
|
print(f"{Colors.CYAN}║{Colors.RESET} State: {state_color}{self.state:15}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
# State indicators
|
|
states = ["IDLE", "PEEL_FWD", "PEEL_BACK", "UNPEEL", "DRIVING", "SLACK_REM", "BACKLASH", "SETTLING"]
|
|
state_line = " "
|
|
for s in states:
|
|
if s == self.state:
|
|
state_line += f"{Colors.GREEN}[{s[:3]}]{Colors.RESET} "
|
|
else:
|
|
state_line += f"{Colors.DIM}[{s[:3]}]{Colors.RESET} "
|
|
print(f"{Colors.CYAN}║{Colors.RESET}{state_line} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# Flags section
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}STATUS FLAGS{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
init_icon = f"{Colors.GREEN}●{Colors.RESET}" if self.is_initialized else f"{Colors.RED}○{Colors.RESET}"
|
|
feed_icon = f"{Colors.YELLOW}●{Colors.RESET}" if self.feed_in_progress else f"{Colors.DIM}○{Colors.RESET}"
|
|
beefy_icon = f"{Colors.MAGENTA}●{Colors.RESET}" if self.beefy_tape else f"{Colors.DIM}○{Colors.RESET}"
|
|
|
|
mode_str = f"{Colors.YELLOW}PEEL{Colors.RESET}" if self.drive_mode else f"{Colors.BLUE}TAPE{Colors.RESET}"
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {init_icon} Initialized {feed_icon} Feeding {beefy_icon} Beefy Tape Mode: {mode_str} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
# Address and buttons
|
|
addr_str = f"0x{self.address:02X}" if self.address != 0xFF else "UNSET"
|
|
sw1_icon = f"{Colors.GREEN}[SW1]{Colors.RESET}" if self.sw1_pressed else f"{Colors.DIM}[SW1]{Colors.RESET}"
|
|
sw2_icon = f"{Colors.GREEN}[SW2]{Colors.RESET}" if self.sw2_pressed else f"{Colors.DIM}[SW2]{Colors.RESET}"
|
|
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Address: {Colors.YELLOW}{addr_str:6}{Colors.RESET} Buttons: {sw1_icon} {sw2_icon} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# Statistics
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}STATISTICS{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Packets: {Colors.GREEN}{self.packet_count:>8}{Colors.RESET} Errors: {Colors.RED}{self.error_count:>5}{Colors.RESET} Rate: {Colors.CYAN}{pkt_rate:>5.1f}{Colors.RESET}/s {Colors.CYAN}║{Colors.RESET}")
|
|
print(f"{Colors.CYAN}║{Colors.RESET} Max Err: {Colors.RED}{self.max_error:>8}{Colors.RESET} Min Err: {Colors.RED}{self.min_error:>8}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
|
|
|
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
|
|
|
# Controls
|
|
pause_status = f"{Colors.YELLOW}PAUSED{Colors.RESET}" if self.paused else ""
|
|
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.DIM}[Q]uit [C]lear [P]ause [R]eset Stats{Colors.RESET} {pause_status:20} {Colors.CYAN}║{Colors.RESET}")
|
|
print(f"{Colors.CYAN}╚══════════════════════════════════════════════════════════════════════╝{Colors.RESET}")
|
|
|
|
sys.stdout.flush()
|
|
|
|
def reset_stats(self):
|
|
with self.lock:
|
|
self.packet_count = 0
|
|
self.error_count = 0
|
|
self.start_time = time.time()
|
|
self.max_error = 0
|
|
self.min_error = 0
|
|
self.error_history.clear()
|
|
self.pid_history.clear()
|
|
|
|
def run(self):
|
|
if not self.connect():
|
|
return
|
|
|
|
self.running = True
|
|
|
|
# Start read thread
|
|
read_thread = threading.Thread(target=self.read_thread, daemon=True)
|
|
read_thread.start()
|
|
|
|
# Setup terminal
|
|
clear_screen()
|
|
hide_cursor()
|
|
|
|
# Input handling (non-blocking on Windows requires msvcrt)
|
|
if os.name == 'nt':
|
|
import msvcrt
|
|
def get_key():
|
|
if msvcrt.kbhit():
|
|
return msvcrt.getch().decode('utf-8', errors='ignore').lower()
|
|
return None
|
|
else:
|
|
import select
|
|
import tty
|
|
import termios
|
|
old_settings = termios.tcgetattr(sys.stdin)
|
|
tty.setcbreak(sys.stdin.fileno())
|
|
def get_key():
|
|
if select.select([sys.stdin], [], [], 0)[0]:
|
|
return sys.stdin.read(1).lower()
|
|
return None
|
|
|
|
try:
|
|
while self.running:
|
|
# Handle input
|
|
key = get_key()
|
|
if key == 'q':
|
|
self.running = False
|
|
elif key == 'c':
|
|
clear_screen()
|
|
elif key == 'p':
|
|
self.paused = not self.paused
|
|
elif key == 'r':
|
|
self.reset_stats()
|
|
|
|
# Update display
|
|
if not self.paused:
|
|
self.display()
|
|
|
|
time.sleep(0.1)
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
self.running = False
|
|
show_cursor()
|
|
if os.name != 'nt':
|
|
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
|
|
self.disconnect()
|
|
print(f"\n{Colors.CYAN}Disconnected from {self.port}{Colors.RESET}")
|
|
|
|
|
|
def list_ports():
|
|
"""List available serial ports"""
|
|
ports = serial.tools.list_ports.comports()
|
|
if not ports:
|
|
print(f"{Colors.YELLOW}No serial ports found{Colors.RESET}")
|
|
return None
|
|
|
|
print(f"\n{Colors.CYAN}Available serial ports:{Colors.RESET}")
|
|
for i, port in enumerate(ports):
|
|
print(f" {i+1}. {Colors.GREEN}{port.device}{Colors.RESET} - {port.description}")
|
|
|
|
print()
|
|
return ports
|
|
|
|
|
|
def main():
|
|
print(f"{Colors.BOLD}{Colors.CYAN}Photon Feeder Mk2 Debug Monitor{Colors.RESET}")
|
|
print(f"{Colors.DIM}{'='*40}{Colors.RESET}\n")
|
|
|
|
# Parse arguments
|
|
port = None
|
|
baudrate = 115200
|
|
|
|
if len(sys.argv) >= 2:
|
|
port = sys.argv[1]
|
|
if len(sys.argv) >= 3:
|
|
try:
|
|
baudrate = int(sys.argv[2])
|
|
except ValueError:
|
|
print(f"{Colors.RED}Invalid baud rate: {sys.argv[2]}{Colors.RESET}")
|
|
sys.exit(1)
|
|
|
|
# If no port specified, list and prompt
|
|
if port is None:
|
|
ports = list_ports()
|
|
if ports:
|
|
try:
|
|
choice = input(f"Select port (1-{len(ports)}) or enter name: ").strip()
|
|
if choice.isdigit():
|
|
idx = int(choice) - 1
|
|
if 0 <= idx < len(ports):
|
|
port = ports[idx].device
|
|
else:
|
|
print(f"{Colors.RED}Invalid selection{Colors.RESET}")
|
|
sys.exit(1)
|
|
else:
|
|
port = choice
|
|
except (KeyboardInterrupt, EOFError):
|
|
print(f"\n{Colors.YELLOW}Cancelled{Colors.RESET}")
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
|
|
# Run monitor
|
|
monitor = FeederMonitor(port, baudrate)
|
|
monitor.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|