Files
feeder_mk2/code/tools/feeder_monitor.py
2026-02-27 17:31:42 +07:00

469 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Photon Feeder Mk2 Serial Debug Monitor
Console-based GUI for monitoring feeder diagnostics via USART1
Usage: python feeder_monitor.py [COM_PORT] [BAUD_RATE]
python feeder_monitor.py COM3
python feeder_monitor.py /dev/ttyUSB0 115200
Controls:
q - Quit
c - Clear screen
p - Pause/Resume display
r - Reset statistics
"""
import sys
import os
import time
import threading
from collections import deque
try:
import serial
import serial.tools.list_ports
except ImportError:
print("Error: pyserial not installed. Run: pip install pyserial")
sys.exit(1)
# ANSI color codes
class Colors:
RESET = '\033[0m'
BOLD = '\033[1m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
DIM = '\033[2m'
# Enable ANSI on Windows
if os.name == 'nt':
os.system('')
def clear_screen():
os.system('cls' if os.name == 'nt' else 'clear')
def move_cursor(row, col):
print(f'\033[{row};{col}H', end='')
def hide_cursor():
print('\033[?25l', end='')
def show_cursor():
print('\033[?25h', end='')
class FeederMonitor:
def __init__(self, port, baudrate=115200):
self.port = port
self.baudrate = baudrate
self.serial = None
self.running = False
self.paused = False
# Current data
self.position = 0
self.target = 0
self.error = 0
self.pid_output = 0
self.state = "UNKNOWN"
self.is_initialized = False
self.feed_in_progress = False
self.beefy_tape = False
self.address = 0xFF
self.sw1_pressed = False
self.sw2_pressed = False
self.drive_value = 0
self.drive_mode = 0
# Statistics
self.packet_count = 0
self.error_count = 0
self.start_time = time.time()
self.last_packet_time = 0
self.max_error = 0
self.min_error = 0
# History for sparkline
self.error_history = deque(maxlen=50)
self.pid_history = deque(maxlen=50)
# Lock for thread safety
self.lock = threading.Lock()
def connect(self):
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE
)
return True
except serial.SerialException as e:
print(f"{Colors.RED}Error opening port {self.port}: {e}{Colors.RESET}")
return False
def disconnect(self):
if self.serial and self.serial.is_open:
self.serial.close()
def parse_packet(self, line):
"""Parse debug packet: $P:pos:tgt:err,I:pid,S:state,F:flags,A:addr,B:btns,D:drv*"""
try:
if not line.startswith('$') or not '*' in line:
return False
# Remove $ and * markers
line = line[1:line.index('*')]
# State number to name mapping
state_names = {
0: "IDLE", 1: "PEEL_FWD", 2: "PEEL_BACK", 3: "UNPEEL",
4: "DRIVING", 5: "SLACK_REM", 6: "BACKLASH", 7: "SETTLING",
8: "COMPLETE", 9: "TIMEOUT"
}
parts = line.split(',')
for part in parts:
if part.startswith('P:'):
values = part[2:].split(':')
if len(values) >= 3:
self.position = int(values[0])
self.target = int(values[1])
self.error = int(values[2])
self.error_history.append(self.error)
if abs(self.error) > abs(self.max_error):
self.max_error = self.error
if self.error < self.min_error:
self.min_error = self.error
elif part.startswith('I:'):
self.pid_output = int(part[2:])
self.pid_history.append(self.pid_output)
elif part.startswith('S:'):
state_num = int(part[2:])
self.state = state_names.get(state_num, f"UNK({state_num})")
elif part.startswith('F:'):
flags = part[2:]
if len(flags) >= 3:
self.is_initialized = flags[0] == '1'
self.feed_in_progress = flags[1] == '1'
self.beefy_tape = flags[2] == '1'
elif part.startswith('A:'):
self.address = int(part[2:], 16)
elif part.startswith('B:'):
btns = part[2:]
if len(btns) >= 2:
self.sw1_pressed = btns[0] == '1'
self.sw2_pressed = btns[1] == '1'
elif part.startswith('D:'):
self.drive_value = int(part[2:])
elif part.startswith('M:'):
self.drive_mode = int(part[2:])
self.packet_count += 1
self.last_packet_time = time.time()
return True
except (ValueError, IndexError) as e:
self.error_count += 1
return False
def read_thread(self):
"""Background thread for reading serial data"""
buffer = ""
while self.running:
try:
if self.serial and self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer += data.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line:
with self.lock:
self.parse_packet(line)
else:
time.sleep(0.01)
except Exception as e:
time.sleep(0.1)
def make_sparkline(self, data, width=40, min_val=None, max_val=None):
"""Create a text sparkline from data"""
if not data:
return ' ' * width
chars = '▁▂▃▄▅▆▇█'
if min_val is None:
min_val = min(data)
if max_val is None:
max_val = max(data)
if max_val == min_val:
return chars[4] * min(len(data), width)
result = []
step = max(1, len(data) // width)
for i in range(0, min(len(data), width * step), step):
val = data[min(i, len(data)-1)]
normalized = (val - min_val) / (max_val - min_val)
idx = min(int(normalized * (len(chars) - 1)), len(chars) - 1)
result.append(chars[idx])
return ''.join(result).ljust(width)
def make_bar(self, value, max_value, width=20, char=''):
"""Create a progress bar"""
if max_value == 0:
return ' ' * width
ratio = min(abs(value) / max_value, 1.0)
filled = int(ratio * width)
return (char * filled).ljust(width)
def display(self):
"""Display the monitor interface"""
move_cursor(1, 1)
uptime = time.time() - self.start_time
pkt_rate = self.packet_count / uptime if uptime > 0 else 0
last_pkt_ago = time.time() - self.last_packet_time if self.last_packet_time > 0 else 999
with self.lock:
# Header
print(f"{Colors.BOLD}{Colors.CYAN}╔══════════════════════════════════════════════════════════════════════╗{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.CYAN}{Colors.WHITE} PHOTON FEEDER Mk2 DEBUG MONITOR {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# Connection status
conn_color = Colors.GREEN if last_pkt_ago < 1 else (Colors.YELLOW if last_pkt_ago < 5 else Colors.RED)
conn_status = "CONNECTED" if last_pkt_ago < 5 else "NO DATA"
print(f"{Colors.CYAN}{Colors.RESET} Port: {Colors.YELLOW}{self.port:10}{Colors.RESET} @ {self.baudrate} baud │ Status: {conn_color}{conn_status:10}{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# Position section
print(f"{Colors.CYAN}{Colors.RESET} {Colors.BOLD}POSITION{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.RESET} Current: {Colors.GREEN}{self.position:>10}{Colors.RESET} Target: {Colors.YELLOW}{self.target:>10}{Colors.RESET} Error: {Colors.RED if abs(self.error) > 10 else Colors.GREEN}{self.error:>8}{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
# Error sparkline
err_spark = self.make_sparkline(list(self.error_history), 50)
print(f"{Colors.CYAN}{Colors.RESET} Error: [{Colors.DIM}{err_spark}{Colors.RESET}] {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# PID section
print(f"{Colors.CYAN}{Colors.RESET} {Colors.BOLD}PID CONTROL{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
pid_color = Colors.GREEN if self.pid_output >= 0 else Colors.RED
pid_dir = "FWD" if self.pid_output >= 0 else "REV"
print(f"{Colors.CYAN}{Colors.RESET} Output: {pid_color}{self.pid_output:>6}{Colors.RESET} ({pid_dir}) Drive Value: {Colors.YELLOW}{self.drive_value:>3}{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
# PID sparkline
pid_spark = self.make_sparkline(list(self.pid_history), 50, -2400, 2400)
print(f"{Colors.CYAN}{Colors.RESET} Output: [{Colors.DIM}{pid_spark}{Colors.RESET}] {Colors.CYAN}{Colors.RESET}")
# PWM bar
pwm_bar = self.make_bar(self.pid_output, 2400, 30)
pwm_pct = abs(self.pid_output) / 2400 * 100
print(f"{Colors.CYAN}{Colors.RESET} PWM: [{pid_color}{pwm_bar}{Colors.RESET}] {pwm_pct:5.1f}% {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# State section
print(f"{Colors.CYAN}{Colors.RESET} {Colors.BOLD}FEED STATE{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
state_color = Colors.GREEN if self.state == "IDLE" else (Colors.YELLOW if self.state == "DRIVING" else Colors.CYAN)
print(f"{Colors.CYAN}{Colors.RESET} State: {state_color}{self.state:15}{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
# State indicators
states = ["IDLE", "PEEL_FWD", "PEEL_BACK", "UNPEEL", "DRIVING", "SLACK_REM", "BACKLASH", "SETTLING"]
state_line = " "
for s in states:
if s == self.state:
state_line += f"{Colors.GREEN}[{s[:3]}]{Colors.RESET} "
else:
state_line += f"{Colors.DIM}[{s[:3]}]{Colors.RESET} "
print(f"{Colors.CYAN}{Colors.RESET}{state_line} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# Flags section
print(f"{Colors.CYAN}{Colors.RESET} {Colors.BOLD}STATUS FLAGS{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
init_icon = f"{Colors.GREEN}{Colors.RESET}" if self.is_initialized else f"{Colors.RED}{Colors.RESET}"
feed_icon = f"{Colors.YELLOW}{Colors.RESET}" if self.feed_in_progress else f"{Colors.DIM}{Colors.RESET}"
beefy_icon = f"{Colors.MAGENTA}{Colors.RESET}" if self.beefy_tape else f"{Colors.DIM}{Colors.RESET}"
mode_str = f"{Colors.YELLOW}PEEL{Colors.RESET}" if self.drive_mode else f"{Colors.BLUE}TAPE{Colors.RESET}"
print(f"{Colors.CYAN}{Colors.RESET} {init_icon} Initialized {feed_icon} Feeding {beefy_icon} Beefy Tape Mode: {mode_str} {Colors.CYAN}{Colors.RESET}")
# Address and buttons
addr_str = f"0x{self.address:02X}" if self.address != 0xFF else "UNSET"
sw1_icon = f"{Colors.GREEN}[SW1]{Colors.RESET}" if self.sw1_pressed else f"{Colors.DIM}[SW1]{Colors.RESET}"
sw2_icon = f"{Colors.GREEN}[SW2]{Colors.RESET}" if self.sw2_pressed else f"{Colors.DIM}[SW2]{Colors.RESET}"
print(f"{Colors.CYAN}{Colors.RESET} Address: {Colors.YELLOW}{addr_str:6}{Colors.RESET} Buttons: {sw1_icon} {sw2_icon} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# Statistics
print(f"{Colors.CYAN}{Colors.RESET} {Colors.BOLD}STATISTICS{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.RESET} Packets: {Colors.GREEN}{self.packet_count:>8}{Colors.RESET} Errors: {Colors.RED}{self.error_count:>5}{Colors.RESET} Rate: {Colors.CYAN}{pkt_rate:>5.1f}{Colors.RESET}/s {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.RESET} Max Err: {Colors.RED}{self.max_error:>8}{Colors.RESET} Min Err: {Colors.RED}{self.min_error:>8}{Colors.RESET} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
# Controls
pause_status = f"{Colors.YELLOW}PAUSED{Colors.RESET}" if self.paused else ""
print(f"{Colors.CYAN}{Colors.RESET} {Colors.DIM}[Q]uit [C]lear [P]ause [R]eset Stats{Colors.RESET} {pause_status:20} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}╚══════════════════════════════════════════════════════════════════════╝{Colors.RESET}")
sys.stdout.flush()
def reset_stats(self):
with self.lock:
self.packet_count = 0
self.error_count = 0
self.start_time = time.time()
self.max_error = 0
self.min_error = 0
self.error_history.clear()
self.pid_history.clear()
def run(self):
if not self.connect():
return
self.running = True
# Start read thread
read_thread = threading.Thread(target=self.read_thread, daemon=True)
read_thread.start()
# Setup terminal
clear_screen()
hide_cursor()
# Input handling (non-blocking on Windows requires msvcrt)
if os.name == 'nt':
import msvcrt
def get_key():
if msvcrt.kbhit():
return msvcrt.getch().decode('utf-8', errors='ignore').lower()
return None
else:
import select
import tty
import termios
old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
def get_key():
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
try:
while self.running:
# Handle input
key = get_key()
if key == 'q':
self.running = False
elif key == 'c':
clear_screen()
elif key == 'p':
self.paused = not self.paused
elif key == 'r':
self.reset_stats()
# Update display
if not self.paused:
self.display()
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
self.running = False
show_cursor()
if os.name != 'nt':
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
self.disconnect()
print(f"\n{Colors.CYAN}Disconnected from {self.port}{Colors.RESET}")
def list_ports():
"""List available serial ports"""
ports = serial.tools.list_ports.comports()
if not ports:
print(f"{Colors.YELLOW}No serial ports found{Colors.RESET}")
return None
print(f"\n{Colors.CYAN}Available serial ports:{Colors.RESET}")
for i, port in enumerate(ports):
print(f" {i+1}. {Colors.GREEN}{port.device}{Colors.RESET} - {port.description}")
print()
return ports
def main():
print(f"{Colors.BOLD}{Colors.CYAN}Photon Feeder Mk2 Debug Monitor{Colors.RESET}")
print(f"{Colors.DIM}{'='*40}{Colors.RESET}\n")
# Parse arguments
port = None
baudrate = 115200
if len(sys.argv) >= 2:
port = sys.argv[1]
if len(sys.argv) >= 3:
try:
baudrate = int(sys.argv[2])
except ValueError:
print(f"{Colors.RED}Invalid baud rate: {sys.argv[2]}{Colors.RESET}")
sys.exit(1)
# If no port specified, list and prompt
if port is None:
ports = list_ports()
if ports:
try:
choice = input(f"Select port (1-{len(ports)}) or enter name: ").strip()
if choice.isdigit():
idx = int(choice) - 1
if 0 <= idx < len(ports):
port = ports[idx].device
else:
print(f"{Colors.RED}Invalid selection{Colors.RESET}")
sys.exit(1)
else:
port = choice
except (KeyboardInterrupt, EOFError):
print(f"\n{Colors.YELLOW}Cancelled{Colors.RESET}")
sys.exit(0)
else:
sys.exit(1)
# Run monitor
monitor = FeederMonitor(port, baudrate)
monitor.run()
if __name__ == "__main__":
main()