code finished for testing, added serial monitor and debug output
This commit is contained in:
@@ -0,0 +1,463 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Photon Feeder Mk2 Serial Debug Monitor
|
||||
Console-based GUI for monitoring feeder diagnostics via USART1
|
||||
|
||||
Usage: python feeder_monitor.py [COM_PORT] [BAUD_RATE]
|
||||
python feeder_monitor.py COM3
|
||||
python feeder_monitor.py /dev/ttyUSB0 115200
|
||||
|
||||
Controls:
|
||||
q - Quit
|
||||
c - Clear screen
|
||||
p - Pause/Resume display
|
||||
r - Reset statistics
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
from collections import deque
|
||||
|
||||
try:
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
except ImportError:
|
||||
print("Error: pyserial not installed. Run: pip install pyserial")
|
||||
sys.exit(1)
|
||||
|
||||
# ANSI color codes
|
||||
class Colors:
|
||||
RESET = '\033[0m'
|
||||
BOLD = '\033[1m'
|
||||
RED = '\033[91m'
|
||||
GREEN = '\033[92m'
|
||||
YELLOW = '\033[93m'
|
||||
BLUE = '\033[94m'
|
||||
MAGENTA = '\033[95m'
|
||||
CYAN = '\033[96m'
|
||||
WHITE = '\033[97m'
|
||||
DIM = '\033[2m'
|
||||
|
||||
# Enable ANSI on Windows
|
||||
if os.name == 'nt':
|
||||
os.system('')
|
||||
|
||||
def clear_screen():
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
def move_cursor(row, col):
|
||||
print(f'\033[{row};{col}H', end='')
|
||||
|
||||
def hide_cursor():
|
||||
print('\033[?25l', end='')
|
||||
|
||||
def show_cursor():
|
||||
print('\033[?25h', end='')
|
||||
|
||||
|
||||
class FeederMonitor:
|
||||
def __init__(self, port, baudrate=115200):
|
||||
self.port = port
|
||||
self.baudrate = baudrate
|
||||
self.serial = None
|
||||
self.running = False
|
||||
self.paused = False
|
||||
|
||||
# Current data
|
||||
self.position = 0
|
||||
self.target = 0
|
||||
self.error = 0
|
||||
self.pid_output = 0
|
||||
self.state = "UNKNOWN"
|
||||
self.is_initialized = False
|
||||
self.feed_in_progress = False
|
||||
self.beefy_tape = False
|
||||
self.address = 0xFF
|
||||
self.sw1_pressed = False
|
||||
self.sw2_pressed = False
|
||||
self.drive_value = 0
|
||||
|
||||
# Statistics
|
||||
self.packet_count = 0
|
||||
self.error_count = 0
|
||||
self.start_time = time.time()
|
||||
self.last_packet_time = 0
|
||||
self.max_error = 0
|
||||
self.min_error = 0
|
||||
|
||||
# History for sparkline
|
||||
self.error_history = deque(maxlen=50)
|
||||
self.pid_history = deque(maxlen=50)
|
||||
|
||||
# Lock for thread safety
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def connect(self):
|
||||
try:
|
||||
self.serial = serial.Serial(
|
||||
port=self.port,
|
||||
baudrate=self.baudrate,
|
||||
timeout=0.1,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE
|
||||
)
|
||||
return True
|
||||
except serial.SerialException as e:
|
||||
print(f"{Colors.RED}Error opening port {self.port}: {e}{Colors.RESET}")
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
if self.serial and self.serial.is_open:
|
||||
self.serial.close()
|
||||
|
||||
def parse_packet(self, line):
|
||||
"""Parse debug packet: $P:pos:tgt:err,I:pid,S:state,F:flags,A:addr,B:btns,D:drv*"""
|
||||
try:
|
||||
if not line.startswith('$') or not '*' in line:
|
||||
return False
|
||||
|
||||
# Remove $ and * markers
|
||||
line = line[1:line.index('*')]
|
||||
|
||||
# State number to name mapping
|
||||
state_names = {
|
||||
0: "IDLE", 1: "PEEL_FWD", 2: "PEEL_BACK", 3: "UNPEEL",
|
||||
4: "DRIVING", 5: "SLACK_REM", 6: "BACKLASH", 7: "SETTLING",
|
||||
8: "COMPLETE", 9: "TIMEOUT"
|
||||
}
|
||||
|
||||
parts = line.split(',')
|
||||
for part in parts:
|
||||
if part.startswith('P:'):
|
||||
values = part[2:].split(':')
|
||||
if len(values) >= 3:
|
||||
self.position = int(values[0])
|
||||
self.target = int(values[1])
|
||||
self.error = int(values[2])
|
||||
self.error_history.append(self.error)
|
||||
if abs(self.error) > abs(self.max_error):
|
||||
self.max_error = self.error
|
||||
if self.error < self.min_error:
|
||||
self.min_error = self.error
|
||||
|
||||
elif part.startswith('I:'):
|
||||
self.pid_output = int(part[2:])
|
||||
self.pid_history.append(self.pid_output)
|
||||
|
||||
elif part.startswith('S:'):
|
||||
state_num = int(part[2:])
|
||||
self.state = state_names.get(state_num, f"UNK({state_num})")
|
||||
|
||||
elif part.startswith('F:'):
|
||||
flags = part[2:]
|
||||
if len(flags) >= 3:
|
||||
self.is_initialized = flags[0] == '1'
|
||||
self.feed_in_progress = flags[1] == '1'
|
||||
self.beefy_tape = flags[2] == '1'
|
||||
|
||||
elif part.startswith('A:'):
|
||||
self.address = int(part[2:], 16)
|
||||
|
||||
elif part.startswith('B:'):
|
||||
btns = part[2:]
|
||||
if len(btns) >= 2:
|
||||
self.sw1_pressed = btns[0] == '1'
|
||||
self.sw2_pressed = btns[1] == '1'
|
||||
|
||||
elif part.startswith('D:'):
|
||||
self.drive_value = int(part[2:])
|
||||
|
||||
self.packet_count += 1
|
||||
self.last_packet_time = time.time()
|
||||
return True
|
||||
|
||||
except (ValueError, IndexError) as e:
|
||||
self.error_count += 1
|
||||
return False
|
||||
|
||||
def read_thread(self):
|
||||
"""Background thread for reading serial data"""
|
||||
buffer = ""
|
||||
while self.running:
|
||||
try:
|
||||
if self.serial and self.serial.in_waiting:
|
||||
data = self.serial.read(self.serial.in_waiting)
|
||||
buffer += data.decode('utf-8', errors='ignore')
|
||||
|
||||
while '\n' in buffer:
|
||||
line, buffer = buffer.split('\n', 1)
|
||||
line = line.strip()
|
||||
if line:
|
||||
with self.lock:
|
||||
self.parse_packet(line)
|
||||
else:
|
||||
time.sleep(0.01)
|
||||
except Exception as e:
|
||||
time.sleep(0.1)
|
||||
|
||||
def make_sparkline(self, data, width=40, min_val=None, max_val=None):
|
||||
"""Create a text sparkline from data"""
|
||||
if not data:
|
||||
return ' ' * width
|
||||
|
||||
chars = '▁▂▃▄▅▆▇█'
|
||||
|
||||
if min_val is None:
|
||||
min_val = min(data)
|
||||
if max_val is None:
|
||||
max_val = max(data)
|
||||
|
||||
if max_val == min_val:
|
||||
return chars[4] * min(len(data), width)
|
||||
|
||||
result = []
|
||||
step = max(1, len(data) // width)
|
||||
for i in range(0, min(len(data), width * step), step):
|
||||
val = data[min(i, len(data)-1)]
|
||||
normalized = (val - min_val) / (max_val - min_val)
|
||||
idx = min(int(normalized * (len(chars) - 1)), len(chars) - 1)
|
||||
result.append(chars[idx])
|
||||
|
||||
return ''.join(result).ljust(width)
|
||||
|
||||
def make_bar(self, value, max_value, width=20, char='█'):
|
||||
"""Create a progress bar"""
|
||||
if max_value == 0:
|
||||
return ' ' * width
|
||||
ratio = min(abs(value) / max_value, 1.0)
|
||||
filled = int(ratio * width)
|
||||
return (char * filled).ljust(width)
|
||||
|
||||
def display(self):
|
||||
"""Display the monitor interface"""
|
||||
move_cursor(1, 1)
|
||||
|
||||
uptime = time.time() - self.start_time
|
||||
pkt_rate = self.packet_count / uptime if uptime > 0 else 0
|
||||
last_pkt_ago = time.time() - self.last_packet_time if self.last_packet_time > 0 else 999
|
||||
|
||||
with self.lock:
|
||||
# Header
|
||||
print(f"{Colors.BOLD}{Colors.CYAN}╔══════════════════════════════════════════════════════════════════════╗{Colors.RESET}")
|
||||
print(f"{Colors.BOLD}{Colors.CYAN}║{Colors.WHITE} PHOTON FEEDER Mk2 DEBUG MONITOR {Colors.CYAN}║{Colors.RESET}")
|
||||
print(f"{Colors.BOLD}{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# Connection status
|
||||
conn_color = Colors.GREEN if last_pkt_ago < 1 else (Colors.YELLOW if last_pkt_ago < 5 else Colors.RED)
|
||||
conn_status = "CONNECTED" if last_pkt_ago < 5 else "NO DATA"
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Port: {Colors.YELLOW}{self.port:10}{Colors.RESET} @ {self.baudrate} baud │ Status: {conn_color}{conn_status:10}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# Position section
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}POSITION{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Current: {Colors.GREEN}{self.position:>10}{Colors.RESET} Target: {Colors.YELLOW}{self.target:>10}{Colors.RESET} Error: {Colors.RED if abs(self.error) > 10 else Colors.GREEN}{self.error:>8}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
# Error sparkline
|
||||
err_spark = self.make_sparkline(list(self.error_history), 50)
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Error: [{Colors.DIM}{err_spark}{Colors.RESET}] {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# PID section
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}PID CONTROL{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
pid_color = Colors.GREEN if self.pid_output >= 0 else Colors.RED
|
||||
pid_dir = "FWD" if self.pid_output >= 0 else "REV"
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Output: {pid_color}{self.pid_output:>6}{Colors.RESET} ({pid_dir}) Drive Value: {Colors.YELLOW}{self.drive_value:>3}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
# PID sparkline
|
||||
pid_spark = self.make_sparkline(list(self.pid_history), 50, -2400, 2400)
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Output: [{Colors.DIM}{pid_spark}{Colors.RESET}] {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
# PWM bar
|
||||
pwm_bar = self.make_bar(self.pid_output, 2400, 30)
|
||||
pwm_pct = abs(self.pid_output) / 2400 * 100
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} PWM: [{pid_color}{pwm_bar}{Colors.RESET}] {pwm_pct:5.1f}% {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# State section
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}FEED STATE{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
state_color = Colors.GREEN if self.state == "IDLE" else (Colors.YELLOW if self.state == "DRIVING" else Colors.CYAN)
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} State: {state_color}{self.state:15}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
# State indicators
|
||||
states = ["IDLE", "PEEL_FWD", "PEEL_BACK", "UNPEEL", "DRIVING", "SLACK_REM", "BACKLASH", "SETTLING"]
|
||||
state_line = " "
|
||||
for s in states:
|
||||
if s == self.state:
|
||||
state_line += f"{Colors.GREEN}[{s[:3]}]{Colors.RESET} "
|
||||
else:
|
||||
state_line += f"{Colors.DIM}[{s[:3]}]{Colors.RESET} "
|
||||
print(f"{Colors.CYAN}║{Colors.RESET}{state_line} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# Flags section
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}STATUS FLAGS{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
init_icon = f"{Colors.GREEN}●{Colors.RESET}" if self.is_initialized else f"{Colors.RED}○{Colors.RESET}"
|
||||
feed_icon = f"{Colors.YELLOW}●{Colors.RESET}" if self.feed_in_progress else f"{Colors.DIM}○{Colors.RESET}"
|
||||
beefy_icon = f"{Colors.MAGENTA}●{Colors.RESET}" if self.beefy_tape else f"{Colors.DIM}○{Colors.RESET}"
|
||||
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {init_icon} Initialized {feed_icon} Feeding {beefy_icon} Beefy Tape {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
# Address and buttons
|
||||
addr_str = f"0x{self.address:02X}" if self.address != 0xFF else "UNSET"
|
||||
sw1_icon = f"{Colors.GREEN}[SW1]{Colors.RESET}" if self.sw1_pressed else f"{Colors.DIM}[SW1]{Colors.RESET}"
|
||||
sw2_icon = f"{Colors.GREEN}[SW2]{Colors.RESET}" if self.sw2_pressed else f"{Colors.DIM}[SW2]{Colors.RESET}"
|
||||
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Address: {Colors.YELLOW}{addr_str:6}{Colors.RESET} Buttons: {sw1_icon} {sw2_icon} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# Statistics
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.BOLD}STATISTICS{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Packets: {Colors.GREEN}{self.packet_count:>8}{Colors.RESET} Errors: {Colors.RED}{self.error_count:>5}{Colors.RESET} Rate: {Colors.CYAN}{pkt_rate:>5.1f}{Colors.RESET}/s {Colors.CYAN}║{Colors.RESET}")
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} Max Err: {Colors.RED}{self.max_error:>8}{Colors.RESET} Min Err: {Colors.RED}{self.min_error:>8}{Colors.RESET} {Colors.CYAN}║{Colors.RESET}")
|
||||
|
||||
print(f"{Colors.CYAN}╠══════════════════════════════════════════════════════════════════════╣{Colors.RESET}")
|
||||
|
||||
# Controls
|
||||
pause_status = f"{Colors.YELLOW}PAUSED{Colors.RESET}" if self.paused else ""
|
||||
print(f"{Colors.CYAN}║{Colors.RESET} {Colors.DIM}[Q]uit [C]lear [P]ause [R]eset Stats{Colors.RESET} {pause_status:20} {Colors.CYAN}║{Colors.RESET}")
|
||||
print(f"{Colors.CYAN}╚══════════════════════════════════════════════════════════════════════╝{Colors.RESET}")
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
def reset_stats(self):
|
||||
with self.lock:
|
||||
self.packet_count = 0
|
||||
self.error_count = 0
|
||||
self.start_time = time.time()
|
||||
self.max_error = 0
|
||||
self.min_error = 0
|
||||
self.error_history.clear()
|
||||
self.pid_history.clear()
|
||||
|
||||
def run(self):
|
||||
if not self.connect():
|
||||
return
|
||||
|
||||
self.running = True
|
||||
|
||||
# Start read thread
|
||||
read_thread = threading.Thread(target=self.read_thread, daemon=True)
|
||||
read_thread.start()
|
||||
|
||||
# Setup terminal
|
||||
clear_screen()
|
||||
hide_cursor()
|
||||
|
||||
# Input handling (non-blocking on Windows requires msvcrt)
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
def get_key():
|
||||
if msvcrt.kbhit():
|
||||
return msvcrt.getch().decode('utf-8', errors='ignore').lower()
|
||||
return None
|
||||
else:
|
||||
import select
|
||||
import tty
|
||||
import termios
|
||||
old_settings = termios.tcgetattr(sys.stdin)
|
||||
tty.setcbreak(sys.stdin.fileno())
|
||||
def get_key():
|
||||
if select.select([sys.stdin], [], [], 0)[0]:
|
||||
return sys.stdin.read(1).lower()
|
||||
return None
|
||||
|
||||
try:
|
||||
while self.running:
|
||||
# Handle input
|
||||
key = get_key()
|
||||
if key == 'q':
|
||||
self.running = False
|
||||
elif key == 'c':
|
||||
clear_screen()
|
||||
elif key == 'p':
|
||||
self.paused = not self.paused
|
||||
elif key == 'r':
|
||||
self.reset_stats()
|
||||
|
||||
# Update display
|
||||
if not self.paused:
|
||||
self.display()
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
self.running = False
|
||||
show_cursor()
|
||||
if os.name != 'nt':
|
||||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
|
||||
self.disconnect()
|
||||
print(f"\n{Colors.CYAN}Disconnected from {self.port}{Colors.RESET}")
|
||||
|
||||
|
||||
def list_ports():
|
||||
"""List available serial ports"""
|
||||
ports = serial.tools.list_ports.comports()
|
||||
if not ports:
|
||||
print(f"{Colors.YELLOW}No serial ports found{Colors.RESET}")
|
||||
return None
|
||||
|
||||
print(f"\n{Colors.CYAN}Available serial ports:{Colors.RESET}")
|
||||
for i, port in enumerate(ports):
|
||||
print(f" {i+1}. {Colors.GREEN}{port.device}{Colors.RESET} - {port.description}")
|
||||
|
||||
print()
|
||||
return ports
|
||||
|
||||
|
||||
def main():
|
||||
print(f"{Colors.BOLD}{Colors.CYAN}Photon Feeder Mk2 Debug Monitor{Colors.RESET}")
|
||||
print(f"{Colors.DIM}{'='*40}{Colors.RESET}\n")
|
||||
|
||||
# Parse arguments
|
||||
port = None
|
||||
baudrate = 115200
|
||||
|
||||
if len(sys.argv) >= 2:
|
||||
port = sys.argv[1]
|
||||
if len(sys.argv) >= 3:
|
||||
try:
|
||||
baudrate = int(sys.argv[2])
|
||||
except ValueError:
|
||||
print(f"{Colors.RED}Invalid baud rate: {sys.argv[2]}{Colors.RESET}")
|
||||
sys.exit(1)
|
||||
|
||||
# If no port specified, list and prompt
|
||||
if port is None:
|
||||
ports = list_ports()
|
||||
if ports:
|
||||
try:
|
||||
choice = input(f"Select port (1-{len(ports)}) or enter name: ").strip()
|
||||
if choice.isdigit():
|
||||
idx = int(choice) - 1
|
||||
if 0 <= idx < len(ports):
|
||||
port = ports[idx].device
|
||||
else:
|
||||
print(f"{Colors.RED}Invalid selection{Colors.RESET}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
port = choice
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print(f"\n{Colors.YELLOW}Cancelled{Colors.RESET}")
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
|
||||
# Run monitor
|
||||
monitor = FeederMonitor(port, baudrate)
|
||||
monitor.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1 @@
|
||||
pyserial>=3.5
|
||||
@@ -0,0 +1,29 @@
|
||||
@echo off
|
||||
title Photon Feeder Mk2 Debug Monitor
|
||||
|
||||
:: Check if Python is available
|
||||
python --version >nul 2>&1
|
||||
if errorlevel 1 (
|
||||
echo Error: Python not found in PATH
|
||||
echo Please install Python from https://python.org
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
:: Check if pyserial is installed
|
||||
python -c "import serial" >nul 2>&1
|
||||
if errorlevel 1 (
|
||||
echo Installing pyserial...
|
||||
pip install pyserial
|
||||
if errorlevel 1 (
|
||||
echo Error: Failed to install pyserial
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
)
|
||||
|
||||
:: Run the monitor
|
||||
cd /d "%~dp0"
|
||||
python feeder_monitor.py %*
|
||||
|
||||
pause
|
||||
Reference in New Issue
Block a user