#!/usr/bin/env python3 """ Photon Feeder Mk2 Serial Debug Monitor Console-based GUI for monitoring feeder diagnostics via USART1 Usage: python feeder_monitor.py [COM_PORT] [BAUD_RATE] python feeder_monitor.py COM3 python feeder_monitor.py /dev/ttyUSB0 115200 Controls: q - Quit c - Clear screen p - Pause/Resume display r - Reset statistics """ import sys import os import time import threading from collections import deque try: import serial import serial.tools.list_ports except ImportError: print("Error: pyserial not installed. Run: pip install pyserial") sys.exit(1) # ANSI color codes class Colors: RESET = '\033[0m' BOLD = '\033[1m' RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' MAGENTA = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' DIM = '\033[2m' # Enable ANSI on Windows if os.name == 'nt': os.system('') def clear_screen(): os.system('cls' if os.name == 'nt' else 'clear') def move_cursor(row, col): print(f'\033[{row};{col}H', end='') def hide_cursor(): print('\033[?25l', end='') def show_cursor(): print('\033[?25h', end='') class FeederMonitor: def __init__(self, port, baudrate=115200): self.port = port self.baudrate = baudrate self.serial = None self.running = False self.paused = False # Current data self.position = 0 self.target = 0 self.error = 0 self.pid_output = 0 self.state = "UNKNOWN" self.is_initialized = False self.feed_in_progress = False self.beefy_tape = False self.address = 0xFF self.sw1_pressed = False self.sw2_pressed = False self.drive_value = 0 self.drive_mode = 0 # Statistics self.packet_count = 0 self.error_count = 0 self.start_time = time.time() self.last_packet_time = 0 self.max_error = 0 self.min_error = 0 # History for sparkline self.error_history = deque(maxlen=50) self.pid_history = deque(maxlen=50) # Lock for thread safety self.lock = threading.Lock() def connect(self): try: self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=0.1, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE ) return True except serial.SerialException as e: print(f"{Colors.RED}Error opening port {self.port}: {e}{Colors.RESET}") return False def disconnect(self): if self.serial and self.serial.is_open: self.serial.close() def parse_packet(self, line): """Parse debug packet: $P:pos:tgt:err,I:pid,S:state,F:flags,A:addr,B:btns,D:drv*""" try: if not line.startswith('$') or not '*' in line: return False # Remove $ and * markers line = line[1:line.index('*')] # State number to name mapping state_names = { 0: "IDLE", 1: "PEEL_FWD", 2: "PEEL_BACK", 3: "UNPEEL", 4: "DRIVING", 5: "SLACK_REM", 6: "BACKLASH", 7: "SETTLING", 8: "COMPLETE", 9: "TIMEOUT" } parts = line.split(',') for part in parts: if part.startswith('P:'): values = part[2:].split(':') if len(values) >= 3: self.position = int(values[0]) self.target = int(values[1]) self.error = int(values[2]) self.error_history.append(self.error) if abs(self.error) > abs(self.max_error): self.max_error = self.error if self.error < self.min_error: self.min_error = self.error elif part.startswith('I:'): self.pid_output = int(part[2:]) self.pid_history.append(self.pid_output) elif part.startswith('S:'): state_num = int(part[2:]) self.state = state_names.get(state_num, f"UNK({state_num})") elif part.startswith('F:'): flags = part[2:] if len(flags) >= 3: self.is_initialized = flags[0] == '1' self.feed_in_progress = flags[1] == '1' self.beefy_tape = flags[2] == '1' elif part.startswith('A:'): self.address = int(part[2:], 16) elif part.startswith('B:'): btns = part[2:] if len(btns) >= 2: self.sw1_pressed = btns[0] == '1' self.sw2_pressed = btns[1] == '1' elif part.startswith('D:'): self.drive_value = int(part[2:]) elif part.startswith('M:'): self.drive_mode = int(part[2:]) self.packet_count += 1 self.last_packet_time = time.time() return True except (ValueError, IndexError) as e: self.error_count += 1 return False def read_thread(self): """Background thread for reading serial data""" buffer = "" while self.running: try: if self.serial and self.serial.in_waiting: data = self.serial.read(self.serial.in_waiting) buffer += data.decode('utf-8', errors='ignore') while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() if line: with self.lock: self.parse_packet(line) else: time.sleep(0.01) except Exception as e: time.sleep(0.1) def make_sparkline(self, data, width=40, min_val=None, max_val=None): """Create a text sparkline from data""" if not data: return ' ' * width chars = '▁▂▃▄▅▆▇█' if min_val is None: min_val = min(data) if max_val is None: max_val = max(data) if max_val == min_val: return chars[4] * min(len(data), width) result = [] step = max(1, len(data) // width) for i in range(0, min(len(data), width * step), step): val = data[min(i, len(data)-1)] normalized = (val - min_val) / (max_val - min_val) idx = min(int(normalized * (len(chars) - 1)), len(chars) - 1) result.append(chars[idx]) return ''.join(result).ljust(width) def make_bar(self, value, max_value, width=20, char='█'): """Create a progress bar""" if max_value == 0: return ' ' * width ratio = min(abs(value) / max_value, 1.0) filled = int(ratio * width) return (char * filled).ljust(width) def display(self): """Display the monitor interface""" move_cursor(1, 1) uptime = time.time() - self.start_time pkt_rate = self.packet_count / uptime if uptime > 0 else 0 last_pkt_ago = time.time() - self.last_packet_time if self.last_packet_time > 0 else 999 with self.lock: # Header print(f"{Colors.BOLD}{Colors.CYAN}╔══════════════════════════════════════════════════════════════════════╗{Colors.RESET}") print(f"{Colors.BOLD}{Colors.CYAN}║{Colors.WHITE} PHOTON FEEDER Mk2 DEBUG MONITOR {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.BOLD}{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # Connection status conn_color = Colors.GREEN if last_pkt_ago < 1 else (Colors.YELLOW if last_pkt_ago < 5 else Colors.RED) conn_status = "CONNECTED" if last_pkt_ago < 5 else "NO DATA" print(f"{Colors.CYAN}║{Colors.RESET} Port: {Colors.YELLOW}{self.port:10}{Colors.RESET} @ {self.baudrate} baud │ Status: {conn_color}{conn_status:10}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # Position section print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}POSITION{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}║{Colors.RESET} Current: {Colors.GREEN}{self.position:>10}{Colors.RESET} Target: {Colors.YELLOW}{self.target:>10}{Colors.RESET} Error: {Colors.RED if abs(self.error) > 10 else Colors.GREEN}{self.error:>8}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") # Error sparkline err_spark = self.make_sparkline(list(self.error_history), 50) print(f"{Colors.CYAN}║{Colors.RESET} Error: [{Colors.DIM}{err_spark}{Colors.RESET}] {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # PID section print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}PID CONTROL{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") pid_color = Colors.GREEN if self.pid_output >= 0 else Colors.RED pid_dir = "FWD" if self.pid_output >= 0 else "REV" print(f"{Colors.CYAN}║{Colors.RESET} Output: {pid_color}{self.pid_output:>6}{Colors.RESET} ({pid_dir}) Drive Value: {Colors.YELLOW}{self.drive_value:>3}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") # PID sparkline pid_spark = self.make_sparkline(list(self.pid_history), 50, -2400, 2400) print(f"{Colors.CYAN}║{Colors.RESET} Output: [{Colors.DIM}{pid_spark}{Colors.RESET}] {Colors.CYAN}║{Colors.RESET}") # PWM bar pwm_bar = self.make_bar(self.pid_output, 2400, 30) pwm_pct = abs(self.pid_output) / 2400 * 100 print(f"{Colors.CYAN}║{Colors.RESET} PWM: [{pid_color}{pwm_bar}{Colors.RESET}] {pwm_pct:5.1f}% {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # State section print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}FEED STATE{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") state_color = Colors.GREEN if self.state == "IDLE" else (Colors.YELLOW if self.state == "DRIVING" else Colors.CYAN) print(f"{Colors.CYAN}║{Colors.RESET} State: {state_color}{self.state:15}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") # State indicators states = ["IDLE", "PEEL_FWD", "PEEL_BACK", "UNPEEL", "DRIVING", "SLACK_REM", "BACKLASH", "SETTLING"] state_line = " " for s in states: if s == self.state: state_line += f"{Colors.GREEN}[{s[:3]}]{Colors.RESET} " else: state_line += f"{Colors.DIM}[{s[:3]}]{Colors.RESET} " print(f"{Colors.CYAN}║{Colors.RESET}{state_line} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # Flags section print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}STATUS FLAGS{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") init_icon = f"{Colors.GREEN}●{Colors.RESET}" if self.is_initialized else f"{Colors.RED}○{Colors.RESET}" feed_icon = f"{Colors.YELLOW}●{Colors.RESET}" if self.feed_in_progress else f"{Colors.DIM}○{Colors.RESET}" beefy_icon = f"{Colors.MAGENTA}●{Colors.RESET}" if self.beefy_tape else f"{Colors.DIM}○{Colors.RESET}" mode_str = f"{Colors.YELLOW}PEEL{Colors.RESET}" if self.drive_mode else f"{Colors.BLUE}TAPE{Colors.RESET}" print(f"{Colors.CYAN}║{Colors.RESET} {init_icon} Initialized {feed_icon} Feeding {beefy_icon} Beefy Tape Mode: {mode_str} {Colors.CYAN}║{Colors.RESET}") # Address and buttons addr_str = f"0x{self.address:02X}" if self.address != 0xFF else "UNSET" sw1_icon = f"{Colors.GREEN}[SW1]{Colors.RESET}" if self.sw1_pressed else f"{Colors.DIM}[SW1]{Colors.RESET}" sw2_icon = f"{Colors.GREEN}[SW2]{Colors.RESET}" if self.sw2_pressed else f"{Colors.DIM}[SW2]{Colors.RESET}" print(f"{Colors.CYAN}║{Colors.RESET} Address: {Colors.YELLOW}{addr_str:6}{Colors.RESET} Buttons: {sw1_icon} {sw2_icon} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # Statistics print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}STATISTICS{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}║{Colors.RESET} Packets: {Colors.GREEN}{self.packet_count:>8}{Colors.RESET} Errors: {Colors.RED}{self.error_count:>5}{Colors.RESET} Rate: {Colors.CYAN}{pkt_rate:>5.1f}{Colors.RESET}/s {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}║{Colors.RESET} Max Err: {Colors.RED}{self.max_error:>8}{Colors.RESET} Min Err: {Colors.RED}{self.min_error:>8}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}") # Controls pause_status = f"{Colors.YELLOW}PAUSED{Colors.RESET}" if self.paused else "" print(f"{Colors.CYAN}║{Colors.RESET} {Colors.DIM}[Q]uit [C]lear [P]ause [R]eset Stats{Colors.RESET} {pause_status:20} {Colors.CYAN}║{Colors.RESET}") print(f"{Colors.CYAN}╚══════════════════════════════════════════════════════════════════════╝{Colors.RESET}") sys.stdout.flush() def reset_stats(self): with self.lock: self.packet_count = 0 self.error_count = 0 self.start_time = time.time() self.max_error = 0 self.min_error = 0 self.error_history.clear() self.pid_history.clear() def run(self): if not self.connect(): return self.running = True # Start read thread read_thread = threading.Thread(target=self.read_thread, daemon=True) read_thread.start() # Setup terminal clear_screen() hide_cursor() # Input handling (non-blocking on Windows requires msvcrt) if os.name == 'nt': import msvcrt def get_key(): if msvcrt.kbhit(): return msvcrt.getch().decode('utf-8', errors='ignore').lower() return None else: import select import tty import termios old_settings = termios.tcgetattr(sys.stdin) tty.setcbreak(sys.stdin.fileno()) def get_key(): if select.select([sys.stdin], [], [], 0)[0]: return sys.stdin.read(1).lower() return None try: while self.running: # Handle input key = get_key() if key == 'q': self.running = False elif key == 'c': clear_screen() elif key == 'p': self.paused = not self.paused elif key == 'r': self.reset_stats() # Update display if not self.paused: self.display() time.sleep(0.1) except KeyboardInterrupt: pass finally: self.running = False show_cursor() if os.name != 'nt': termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) self.disconnect() print(f"\n{Colors.CYAN}Disconnected from {self.port}{Colors.RESET}") def list_ports(): """List available serial ports""" ports = serial.tools.list_ports.comports() if not ports: print(f"{Colors.YELLOW}No serial ports found{Colors.RESET}") return None print(f"\n{Colors.CYAN}Available serial ports:{Colors.RESET}") for i, port in enumerate(ports): print(f" {i+1}. {Colors.GREEN}{port.device}{Colors.RESET} - {port.description}") print() return ports def main(): print(f"{Colors.BOLD}{Colors.CYAN}Photon Feeder Mk2 Debug Monitor{Colors.RESET}") print(f"{Colors.DIM}{'='*40}{Colors.RESET}\n") # Parse arguments port = None baudrate = 115200 if len(sys.argv) >= 2: port = sys.argv[1] if len(sys.argv) >= 3: try: baudrate = int(sys.argv[2]) except ValueError: print(f"{Colors.RED}Invalid baud rate: {sys.argv[2]}{Colors.RESET}") sys.exit(1) # If no port specified, list and prompt if port is None: ports = list_ports() if ports: try: choice = input(f"Select port (1-{len(ports)}) or enter name: ").strip() if choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(ports): port = ports[idx].device else: print(f"{Colors.RED}Invalid selection{Colors.RESET}") sys.exit(1) else: port = choice except (KeyboardInterrupt, EOFError): print(f"\n{Colors.YELLOW}Cancelled{Colors.RESET}") sys.exit(0) else: sys.exit(1) # Run monitor monitor = FeederMonitor(port, baudrate) monitor.run() if __name__ == "__main__": main()