#!/usr/bin/env python3 """ Photon Feeder Mk2 Serial Debug Monitor (Textual TUI) Usage: python feeder_monitor.py [COM_PORT] [BAUD_RATE] python feeder_monitor.py COM3 python feeder_monitor.py /dev/ttyUSB0 115200 python feeder_monitor.py (auto-detect) Controls: q - Quit c - Clear statistics p - Pause/Resume display space - Toggle raw packet log """ from __future__ import annotations import sys import time from collections import deque from dataclasses import dataclass, field try: import serial import serial.tools.list_ports except ImportError: print("Error: pyserial not installed. Run: pip install pyserial") sys.exit(1) try: from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal, Vertical from textual.reactive import reactive from textual.widgets import Footer, Header, Label, RichLog, Sparkline, Static from textual import work from rich.text import Text except ImportError: print("Error: textual not installed. Run: pip install textual") sys.exit(1) # ============================================================================ # Constants # ============================================================================ PWM_MAX = 2400 BAUD_DEFAULT = 115200 SPARKLINE_LEN = 200 STATE_INFO = { 0: ("IDLE", "dim"), 1: ("PEEL_FWD", "#e5c07b"), 2: ("PEEL_BACK", "#e5c07b"), 3: ("UNPEEL", "#e5c07b"), 4: ("DRIVING", "#98c379"), 5: ("SLACK_REM", "#e5c07b"), 6: ("BACKLASH", "#e5c07b"), 7: ("SETTLING", "#56b6c2"), 8: ("COMPLETE", "dim"), } STATE_ABBREV = ["IDL", "PEF", "PEB", "UNP", "DRV", "SLK", "BAC", "SET", "CMP"] # ============================================================================ # Data Model # ============================================================================ @dataclass class FeederData: position: int = 0 target: int = 0 error: int = 0 pid_output: int = 0 state: int = 0 is_initialized: bool = False feed_in_progress: bool = False beefy_tape: bool = False address: str = "FF" feed_ok_count: int = 0 feed_fail_count: int = 0 brake_time_tenths: int = 30 feed_retries: int = 0 crc_drops: int = 0 msg_handled: int = 0 uart_errors: int = 0 trap_size: int = 0 trap_hex: str = "" trap_crc: str = "" # Optional fields (may be absent) sw1_pressed: bool = False sw2_pressed: bool = False drive_value: int = 0 drive_mode: int = 0 has_buttons: bool = False has_drive: bool = False has_mode: bool = False class Statistics: def __init__(self): self.clear() def clear(self): self.packet_count = 0 self.parse_errors = 0 self.start_time = time.time() self.last_packet_time = 0.0 self.error_history: deque[float] = deque(maxlen=SPARKLINE_LEN) self.pid_history: deque[float] = deque(maxlen=SPARKLINE_LEN) def update(self, data: FeederData): self.packet_count += 1 self.last_packet_time = time.time() self.error_history.append(float(data.error)) self.pid_history.append(float(data.pid_output)) @property def packet_rate(self) -> float: elapsed = time.time() - self.start_time return self.packet_count / elapsed if elapsed > 0 else 0 @property def connection_age(self) -> float: if self.last_packet_time == 0: return 999.0 return time.time() - self.last_packet_time # ============================================================================ # Packet Parser # ============================================================================ def parse_packet(line: str) -> FeederData | None: try: if not line.startswith("$") or "*" not in line: return None body = line[1:line.index("*")] data = FeederData() parts = body.split(",") for part in parts: try: if part.startswith("P:"): vals = part[2:].split(":") if len(vals) >= 3: data.position = int(vals[0]) data.target = int(vals[1]) data.error = int(vals[2]) elif part.startswith("I:"): data.pid_output = int(part[2:]) elif part.startswith("S:"): data.state = int(part[2:]) elif part.startswith("F:"): flags = part[2:] if len(flags) >= 2: data.is_initialized = flags[0] == "1" data.feed_in_progress = flags[1] == "1" if len(flags) >= 3: data.beefy_tape = flags[2] == "1" elif part.startswith("C:"): vals = part[2:].split(":") if len(vals) >= 2: data.feed_ok_count = int(vals[0]) data.feed_fail_count = int(vals[1]) if len(vals) >= 3: data.brake_time_tenths = int(vals[2]) if len(vals) >= 4: data.feed_retries = int(vals[3]) elif part.startswith("A:"): data.address = part[2:] elif part.startswith("R:"): vals = part[2:].split(":") if len(vals) >= 3: data.crc_drops = int(vals[0]) data.msg_handled = int(vals[1]) data.uart_errors = int(vals[2]) elif part.startswith("T:"): vals = part[2:].split(":") if len(vals) >= 1: data.trap_size = int(vals[0]) if len(vals) >= 2: data.trap_hex = vals[1] if len(vals) >= 3: data.trap_crc = vals[2] elif part.startswith("B:"): btns = part[2:] if len(btns) >= 2: data.sw1_pressed = btns[0] == "1" data.sw2_pressed = btns[1] == "1" data.has_buttons = True elif part.startswith("G:"): pass # GPIO raw state, not displayed elif part.startswith("D:"): data.drive_value = int(part[2:]) data.has_drive = True elif part.startswith("M:"): data.drive_mode = int(part[2:]) data.has_mode = True except (ValueError, IndexError): continue return data except Exception: return None # ============================================================================ # Auto-detect COM port # ============================================================================ def auto_detect_port() -> str | None: ports = serial.tools.list_ports.comports() keywords = ["CH340", "CH341", "USB-SERIAL", "FTDI", "CP210", "SILICON", "STM"] for port in ports: desc = (port.description or "").upper() for kw in keywords: if kw in desc: return port.device if len(ports) == 1: return ports[0].device return None # ============================================================================ # Custom Widgets # ============================================================================ class PWMBar(Static): """Horizontal bar showing motor PWM output and direction.""" value: reactive[int] = reactive(0) def render(self) -> Text: width = max(self.size.width - 12, 10) pct = min(abs(self.value) / PWM_MAX, 1.0) * 100 filled = int(abs(self.value) / PWM_MAX * width) direction = "FWD" if self.value >= 0 else "REV" color = "green" if self.value >= 0 else "red" bar = "\u2588" * filled + "\u2591" * (width - filled) return Text.assemble( (f"{direction} ", color), (bar[:filled], color), (bar[filled:], "dim"), f" {pct:5.1f}%", ) class ConnectionIndicator(Static): """Shows connection status with colored dot.""" status: reactive[str] = reactive("disconnected") def render(self) -> Text: if self.status == "connected": return Text.assemble(("\u25cf ", "green"), ("CONNECTED", "green")) elif self.status == "stale": return Text.assemble(("\u25cf ", "yellow"), ("STALE", "yellow")) elif self.status == "no_data": return Text.assemble(("\u25cf ", "red"), ("NO DATA", "red")) else: return Text.assemble(("\u25cf ", "red"), ("DISCONNECTED", "red")) class FlagIndicator(Static): """Shows a labeled boolean flag.""" active: reactive[bool] = reactive(False) label_text: str = "" def __init__(self, label_text: str, **kwargs): super().__init__(**kwargs) self.label_text = label_text def render(self) -> Text: dot = "\u25cf" if self.active else "\u25cb" color = "green" if self.active else "dim" return Text.assemble((f"{dot} ", color), (self.label_text, color)) class StateDisplay(Static): """Shows the current feed state with color coding and indicator row.""" state: reactive[int] = reactive(0) def render(self) -> Text: name, color = STATE_INFO.get(self.state, ("UNKNOWN", "red")) parts: list = [(f" \u25b6 {name}\n", f"bold {color}")] # Indicator row for i, abbrev in enumerate(STATE_ABBREV): if i == self.state: parts.append((f"[{abbrev}]", f"bold {STATE_INFO.get(i, ('', 'white'))[1]}")) else: parts.append((f" {abbrev} ", "dim")) parts.append((" ", "")) return Text.assemble(*parts) # ============================================================================ # Main App # ============================================================================ class FeederMonitorApp(App): TITLE = "Photon Feeder Mk2 Monitor" BINDINGS = [ Binding("q", "quit", "Quit"), Binding("c", "clear_stats", "Clear Stats"), Binding("p", "toggle_pause", "Pause"), Binding("space", "toggle_raw", "Raw Log"), ] CSS = """ Screen { background: $surface; } #status-bar { height: 1; background: $primary-background; padding: 0 1; } #status-bar Label { width: auto; margin: 0 2 0 0; } .panel { border: solid $primary; padding: 0 1; height: auto; } .panel-title { text-style: bold; color: $accent; margin: 0 0 0 0; } #top-row { height: auto; max-height: 12; } #position-panel { width: 1fr; } #motor-panel { width: 1fr; } #mid-row { height: auto; max-height: 8; } #state-panel { width: 2fr; } #flags-panel { width: 1fr; } #rs485-panel { width: 1fr; } #stats-row { height: 3; } #stats-panel { width: 1fr; } .stat-label { width: auto; margin: 0 3 0 0; } #raw-panel { height: 1fr; min-height: 4; } #raw-panel.hidden { display: none; } #raw-log { height: 1fr; } #error-sparkline { height: 3; } #pid-sparkline { height: 3; } #pwm-bar { height: 1; } .val-label { width: auto; margin: 0 2 0 0; } .flag-row { height: 1; } #pause-label { color: $warning; text-style: bold; } """ def __init__(self, port: str, baudrate: int = BAUD_DEFAULT): super().__init__() self.port = port self.baudrate = baudrate self.stats = Statistics() self.latest_data = FeederData() self.paused = False self._raw_lines: deque[str] = deque(maxlen=200) self._raw_pending: list[str] = [] self._rs485_pending: list[str] = [] self._last_msg_handled = 0 self._last_trap_hex = "" self._serial: serial.Serial | None = None def compose(self) -> ComposeResult: yield Header() # Status bar with Horizontal(id="status-bar"): yield Label(f"{self.port} @ {self.baudrate}", id="port-label") yield ConnectionIndicator(id="conn-indicator") yield Label("0.0 pkt/s", id="rate-label") yield Label("", id="pause-label") # Top row: Position + Motor with Horizontal(id="top-row"): with Vertical(id="position-panel", classes="panel"): yield Label("POSITION", classes="panel-title") with Horizontal(): yield Label("Pos: 0", id="pos-val", classes="val-label") yield Label("Tgt: 0", id="tgt-val", classes="val-label") yield Label("Err: 0", id="err-val", classes="val-label") yield Sparkline([], id="error-sparkline") with Vertical(id="motor-panel", classes="panel"): yield Label("MOTOR", classes="panel-title") yield Label("Output: 0", id="motor-val") yield PWMBar(id="pwm-bar") yield Sparkline([], id="pid-sparkline") # Mid row: State + Flags + RS485 with Horizontal(id="mid-row"): with Vertical(id="state-panel", classes="panel"): yield Label("FEED STATE", classes="panel-title") yield StateDisplay(id="state-display") with Vertical(id="flags-panel", classes="panel"): yield Label("FLAGS", classes="panel-title") yield FlagIndicator("Initialized", id="flag-init") yield FlagIndicator("Feeding", id="flag-feed") with Vertical(id="rs485-panel", classes="panel"): yield Label("RS485", classes="panel-title") yield Label("Addr: --", id="rs485-addr") yield Label("CRC Drops: 0", id="rs485-crc") yield Label("Handled: 0", id="rs485-hdl") yield Label("UART Err: 0", id="rs485-err") # Stats row with Horizontal(id="stats-row"): with Horizontal(id="stats-panel", classes="panel"): yield Label("Feeds: 0", id="stat-feeds", classes="stat-label") yield Label("Timeouts: 0", id="stat-timeouts", classes="stat-label") yield Label("Retries: 0", id="stat-retries", classes="stat-label") yield Label("Brake: 3ms", id="stat-brake", classes="stat-label") yield Label("Packets: 0", id="stat-pkts", classes="stat-label") # RS485 message log with Vertical(id="raw-panel", classes="panel"): yield Label("RS485 MESSAGES", classes="panel-title") yield RichLog(id="raw-log", max_lines=200, markup=True) yield Footer() def on_mount(self) -> None: self.start_serial_reader() self.set_interval(0.1, self.update_display) # ---- Serial Worker ---- @work(thread=True, exclusive=True) def start_serial_reader(self) -> None: while True: try: self._serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=0.1, ) self.call_from_thread(self._set_connection, "connected") buffer = "" while True: try: data = self._serial.read(max(1, self._serial.in_waiting)) if data: buffer += data.decode("ascii", errors="replace") while "\n" in buffer: line, buffer = buffer.split("\n", 1) line = line.strip() if line: self.call_from_thread(self._handle_line, line) except serial.SerialException: break except serial.SerialException: self.call_from_thread(self._set_connection, "disconnected") # Reconnect delay if self._serial: try: self._serial.close() except Exception: pass self._serial = None time.sleep(2) def _set_connection(self, status: str) -> None: self.query_one("#conn-indicator", ConnectionIndicator).status = status def _handle_line(self, line: str) -> None: self._raw_lines.append(line) parsed = parse_packet(line) if parsed: prev = self.latest_data self.latest_data = parsed self.stats.update(parsed) # Detect RS485 events if parsed.msg_handled != self._last_msg_handled: diff = parsed.msg_handled - self._last_msg_handled self._last_msg_handled = parsed.msg_handled self._rs485_pending.append( f"[green]RX #{parsed.msg_handled}[/green] handled (+{diff})" ) if parsed.uart_errors > 0 and parsed.uart_errors != getattr(self, '_last_uart_err', 0): self._rs485_pending.append( f"[red]UART ERROR[/red] count: {parsed.uart_errors}" ) self._last_uart_err = parsed.uart_errors if parsed.trap_hex and parsed.trap_hex != self._last_trap_hex: self._last_trap_hex = parsed.trap_hex # Decode trap hex into spaced bytes hex_str = parsed.trap_hex spaced = " ".join(hex_str[i:i+2] for i in range(0, len(hex_str), 2)) self._rs485_pending.append( f"[cyan]TRAP[/cyan] size={parsed.trap_size} [{spaced}] crc={parsed.trap_crc}" ) if parsed.crc_drops != getattr(self, '_last_crc_drops', 0): self._rs485_pending.append( f"[yellow]CRC DROP[/yellow] total: {parsed.crc_drops}" ) self._last_crc_drops = parsed.crc_drops else: self.stats.parse_errors += 1 # ---- Display Update ---- def update_display(self) -> None: # Connection status age = self.stats.connection_age if age < 1: status = "connected" elif age < 5: status = "stale" elif age < 999: status = "no_data" else: status = "disconnected" self.query_one("#conn-indicator", ConnectionIndicator).status = status self.query_one("#rate-label", Label).update(f"{self.stats.packet_rate:.1f} pkt/s") self.query_one("#pause-label", Label).update("PAUSED" if self.paused else "") if self.paused: # Still flush raw log self._flush_raw_log() return d = self.latest_data s = self.stats # Position err_color = "green" if abs(d.error) <= 5 else ("yellow" if abs(d.error) <= 50 else "red") self.query_one("#pos-val", Label).update(f"Pos: {d.position}") self.query_one("#tgt-val", Label).update(f"Tgt: {d.target}") err_label = self.query_one("#err-val", Label) err_label.update(f"Err: {d.error}") err_label.styles.color = err_color # Sparklines if s.error_history: self.query_one("#error-sparkline", Sparkline).data = list(s.error_history) if s.pid_history: self.query_one("#pid-sparkline", Sparkline).data = list(s.pid_history) # Motor self.query_one("#motor-val", Label).update(f"Output: {d.pid_output}") self.query_one("#pwm-bar", PWMBar).value = d.pid_output # State self.query_one("#state-display", StateDisplay).state = d.state # Flags self.query_one("#flag-init", FlagIndicator).active = d.is_initialized self.query_one("#flag-feed", FlagIndicator).active = d.feed_in_progress # RS485 self.query_one("#rs485-addr", Label).update(f"Addr: 0x{d.address}") self.query_one("#rs485-crc", Label).update(f"CRC Drops: {d.crc_drops}") self.query_one("#rs485-hdl", Label).update(f"Handled: {d.msg_handled}") self.query_one("#rs485-err", Label).update(f"UART Err: {d.uart_errors}") # Stats self.query_one("#stat-feeds", Label).update(f"Feeds: {d.feed_ok_count}") self.query_one("#stat-timeouts", Label).update(f"Timeouts: {d.feed_fail_count}") self.query_one("#stat-retries", Label).update(f"Retries: {d.feed_retries}") self.query_one("#stat-brake", Label).update(f"Brake: {d.brake_time_tenths/10:.1f}ms") self.query_one("#stat-pkts", Label).update(f"Packets: {s.packet_count}") # Raw log self._flush_raw_log() def _flush_raw_log(self) -> None: if not self._rs485_pending: return raw_log = self.query_one("#raw-log", RichLog) for msg in self._rs485_pending: raw_log.write(msg) self._rs485_pending.clear() # ---- Actions ---- def action_clear_stats(self) -> None: self.stats.clear() def action_toggle_pause(self) -> None: self.paused = not self.paused def action_toggle_raw(self) -> None: panel = self.query_one("#raw-panel") panel.toggle_class("hidden") # ============================================================================ # Main # ============================================================================ def main(): port = None baudrate = BAUD_DEFAULT if len(sys.argv) >= 2: port = sys.argv[1] if len(sys.argv) >= 3: try: baudrate = int(sys.argv[2]) except ValueError: print(f"Invalid baud rate: {sys.argv[2]}") sys.exit(1) if port is None: ports = serial.tools.list_ports.comports() if not ports: print("No serial ports found.") sys.exit(1) elif len(ports) == 1: port = ports[0].device print(f"Using: {port} ({ports[0].description})") else: print("Available serial ports:") for i, p in enumerate(ports): print(f" {i + 1}. {p.device}: {p.description}") print() try: choice = input(f"Select port (1-{len(ports)}) or enter name: ").strip() if choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(ports): port = ports[idx].device else: print("Invalid selection.") sys.exit(1) else: port = choice except (KeyboardInterrupt, EOFError): print("\nCancelled.") sys.exit(0) app = FeederMonitorApp(port, baudrate) app.run() if __name__ == "__main__": main()