A comprehensive barcode scanning application for InvenTree inventory management. Features: - Multi-mode operation (Add/Update/Check/Locate stock) - Smart duplicate prevention when adding stock - Barcode scanning with automatic part code cleaning - Real-time server connection monitoring - Part information display with images - Debug mode for troubleshooting Fixes applied: - Fixed encoding issues with non-ASCII characters in barcodes - Fixed API response handling for list and dict formats - Implemented duplicate prevention using PATCH to update existing stock - Added comprehensive error handling and logging Includes test suite for verification of all fixes.
1149 lines
42 KiB
Python
1149 lines
42 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InvenTree Stock Management Tool
|
||
|
||
A comprehensive barcode scanning application for InvenTree inventory management.
|
||
Features include stock addition, updates, checking, and part location finding.
|
||
|
||
Requirements:
|
||
- sv-ttk: pip install sv-ttk
|
||
- pillow: pip install pillow
|
||
|
||
Configuration:
|
||
Config file: ~/.config/scan_and_import.yaml
|
||
Required fields: host, token
|
||
"""
|
||
|
||
import tkinter as tk
|
||
from tkinter import ttk, simpledialog, messagebox
|
||
import sv_ttk
|
||
import re
|
||
import subprocess
|
||
import requests
|
||
import yaml
|
||
import sys
|
||
from pathlib import Path
|
||
from io import BytesIO
|
||
from PIL import Image, ImageTk
|
||
from typing import Dict, List, Tuple, Optional, Any
|
||
|
||
# ============================================================================
|
||
# CONFIGURATION & CONSTANTS
|
||
# ============================================================================
|
||
|
||
CONFIG_FILE = Path.home() / '.config' / 'scan_and_import.yaml'
|
||
SEP_RE = re.compile(r'[\x1D\x1E]')
|
||
|
||
# Barcode command mappings
|
||
BARCODE_COMMANDS = {
|
||
# Mode switching
|
||
'import': ['MODE:ADD', 'MODE:IMPORT', 'ADD_STOCK', 'IMPORT'],
|
||
'update': ['MODE:UPDATE', 'UPDATE_STOCK', 'UPDATE'],
|
||
'get': ['MODE:CHECK', 'MODE:GET', 'CHECK_STOCK', 'CHECK'],
|
||
'locate': ['MODE:LOCATE', 'LOCATE_PART', 'LOCATE', 'FIND_PART'],
|
||
|
||
# Debug control
|
||
'debug_on': ['DEBUG:ON', 'DEBUG_ON'],
|
||
'debug_off': ['DEBUG:OFF', 'DEBUG_OFF'],
|
||
|
||
# Location management
|
||
'change_location': ['CHANGE_LOCATION', 'NEW_LOCATION', 'SET_LOCATION', 'LOCATION']
|
||
}
|
||
|
||
# User-friendly mode names
|
||
MODE_NAMES = {
|
||
'import': 'Add Stock',
|
||
'update': 'Update Stock',
|
||
'get': 'Check Stock',
|
||
'locate': 'Locate Part'
|
||
}
|
||
|
||
# ============================================================================
|
||
# API FUNCTIONS
|
||
# ============================================================================
|
||
|
||
def load_config() -> Tuple[str, str]:
|
||
"""
|
||
Load configuration from YAML file.
|
||
|
||
Returns:
|
||
Tuple[str, str]: (host_url, api_token)
|
||
|
||
Raises:
|
||
SystemExit: If config file is missing or invalid
|
||
"""
|
||
if not CONFIG_FILE.exists():
|
||
messagebox.showerror("Config Error", f"Config file {CONFIG_FILE} not found.")
|
||
sys.exit(1)
|
||
|
||
data = yaml.safe_load(CONFIG_FILE.read_text())
|
||
host = data.get('host')
|
||
token = data.get('token')
|
||
|
||
if not host or not token:
|
||
messagebox.showerror("Config Error", f"Both 'host' and 'token' must be set in {CONFIG_FILE}")
|
||
sys.exit(1)
|
||
|
||
return host.rstrip('/'), token
|
||
|
||
|
||
def get_locations(host: str, token: str) -> List[Tuple[int, str]]:
|
||
"""
|
||
Fetch all locations from InvenTree API.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
|
||
Returns:
|
||
List of (location_id, location_name) tuples
|
||
"""
|
||
url = f"{host}/api/stock/location/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
resp = requests.get(url, headers=headers)
|
||
resp.raise_for_status()
|
||
|
||
data = resp.json()
|
||
locations = data.get('results', data) if isinstance(data, dict) else data
|
||
return [(loc.get('id', loc.get('pk')), loc.get('name', '')) for loc in locations]
|
||
|
||
|
||
def find_or_import_part(host: str, token: str, part_code: str) -> int:
|
||
"""
|
||
Find part by code or import if not found.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
part_code: Part code to search for
|
||
|
||
Returns:
|
||
Part ID
|
||
|
||
Raises:
|
||
RuntimeError: If part cannot be found or imported
|
||
"""
|
||
url = f"{host}/api/part/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
|
||
# First, try to find existing part
|
||
r = requests.get(url, headers=headers, params={'search': part_code})
|
||
r.raise_for_status()
|
||
raw = r.json()
|
||
results = raw.get('results', []) if isinstance(raw, dict) else raw
|
||
|
||
if results:
|
||
return results[0].get('pk', results[0].get('id'))
|
||
|
||
# Part not found, try to import
|
||
subprocess.run(['inventree-part-import', part_code], check=True)
|
||
|
||
# Search again after import
|
||
r = requests.get(url, headers=headers, params={'search': part_code})
|
||
r.raise_for_status()
|
||
raw = r.json()
|
||
results = raw.get('results', []) if isinstance(raw, dict) else raw
|
||
|
||
if not results:
|
||
raise RuntimeError(f"Unable to import part {part_code}")
|
||
|
||
return results[0].get('pk', results[0].get('id'))
|
||
|
||
|
||
def get_part_info(host: str, token: str, part_id: int) -> Dict[str, Any]:
|
||
"""
|
||
Get part information from InvenTree API.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
part_id: Part ID to get info for
|
||
|
||
Returns:
|
||
Part information dictionary
|
||
"""
|
||
url = f"{host}/api/part/{part_id}/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
resp = requests.get(url, headers=headers)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
|
||
def get_part_parameters(host: str, token: str, part_id: int) -> List[Dict[str, Any]]:
|
||
"""
|
||
Get part parameters from InvenTree API.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
part_id: Part ID to get parameters for
|
||
|
||
Returns:
|
||
List of parameter dictionaries
|
||
"""
|
||
url = f"{host}/api/part/parameter/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
resp = requests.get(url, headers=headers, params={'part': part_id})
|
||
resp.raise_for_status()
|
||
|
||
data = resp.json()
|
||
return data.get('results', data) if isinstance(data, dict) and 'results' in data else data
|
||
|
||
|
||
def get_part_locations(host: str, token: str, part_id: int) -> List[Dict[str, Any]]:
|
||
"""
|
||
Get all stock locations for a specific part.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
part_id: Part ID to find locations for
|
||
|
||
Returns:
|
||
List of stock item dictionaries
|
||
"""
|
||
url = f"{host}/api/stock/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
resp = requests.get(url, headers=headers, params={'part': part_id})
|
||
resp.raise_for_status()
|
||
|
||
data = resp.json()
|
||
return data.get('results', data) if isinstance(data, dict) and 'results' in data else data
|
||
|
||
|
||
def get_location_details(host: str, token: str, location_id: int) -> Dict[str, Any]:
|
||
"""
|
||
Get details for a specific location.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
location_id: Location ID to get details for
|
||
|
||
Returns:
|
||
Location information dictionary
|
||
"""
|
||
url = f"{host}/api/stock/location/{location_id}/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
resp = requests.get(url, headers=headers)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
|
||
def find_stock_item(host: str, token: str, part_id: int, loc_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Find stock item for part at specific location.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
part_id: Part ID
|
||
loc_id: Location ID
|
||
|
||
Returns:
|
||
Stock item dictionary or None if not found
|
||
"""
|
||
url = f"{host}/api/stock/"
|
||
headers = {'Authorization': f'Token {token}'}
|
||
resp = requests.get(url, headers=headers, params={'part': part_id, 'location': loc_id})
|
||
resp.raise_for_status()
|
||
|
||
data = resp.json()
|
||
results = data.get('results', data) if isinstance(data, dict) else data
|
||
return results[0] if results else None
|
||
|
||
|
||
def get_stock_level(host: str, token: str, part_id: int, loc_id: int) -> float:
|
||
"""
|
||
Get current stock level for part at location.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
part_id: Part ID
|
||
loc_id: Location ID
|
||
|
||
Returns:
|
||
Stock quantity
|
||
"""
|
||
item = find_stock_item(host, token, part_id, loc_id)
|
||
return item.get('quantity', 0) if item else 0
|
||
|
||
|
||
def lookup_barcode(host: str, token: str, barcode: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Lookup a barcode using the InvenTree barcode API.
|
||
|
||
Args:
|
||
host: InvenTree server URL
|
||
token: API authentication token
|
||
barcode: Barcode string to lookup
|
||
|
||
Returns:
|
||
Barcode lookup result or None if not found
|
||
"""
|
||
url = f"{host}/api/barcode/"
|
||
headers = {'Authorization': f'Token {token}', 'Content-Type': 'application/json'}
|
||
payload = {"barcode": barcode}
|
||
|
||
try:
|
||
resp = requests.post(url, headers=headers, json=payload)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
except requests.exceptions.RequestException:
|
||
return None
|
||
|
||
|
||
def parse_scan(raw: str) -> Tuple[Optional[str], Optional[int]]:
|
||
"""
|
||
Parse scanned barcode to extract part code and quantity.
|
||
|
||
Args:
|
||
raw: Raw barcode string
|
||
|
||
Returns:
|
||
Tuple of (part_code, quantity) or (None, None) if parsing fails
|
||
"""
|
||
def clean_part_code(code: str) -> str:
|
||
"""Clean part code by removing non-ASCII and invalid characters."""
|
||
# Keep only ASCII printable characters (excluding control chars)
|
||
# This removes characters like ¡ and other encoding artifacts
|
||
cleaned = ''.join(char for char in code if 32 <= ord(char) <= 126)
|
||
return cleaned.strip()
|
||
|
||
# Handle JSON-like format
|
||
if raw.startswith('{') and '}' in raw:
|
||
content = raw.strip()[1:-1]
|
||
part = None
|
||
qty = None
|
||
|
||
for kv in content.split(','):
|
||
if ':' not in kv:
|
||
continue
|
||
k, v = kv.split(':', 1)
|
||
key = k.strip().upper()
|
||
val = v.strip()
|
||
|
||
if key == 'PM':
|
||
part = clean_part_code(val)
|
||
elif key == 'QTY':
|
||
try:
|
||
qty = int(val)
|
||
except ValueError:
|
||
pass
|
||
return part, qty
|
||
|
||
# Handle separator-based format
|
||
part = None
|
||
qty = None
|
||
fields = SEP_RE.split(raw)
|
||
|
||
for f in fields:
|
||
if not f:
|
||
continue
|
||
if f.startswith('30P'):
|
||
part = clean_part_code(f[3:])
|
||
elif f.lower().startswith('1p'):
|
||
part = clean_part_code(f[2:])
|
||
elif f.lower().startswith('q') and f[1:].isdigit():
|
||
qty = int(f[1:])
|
||
|
||
return part, qty
|
||
|
||
# ============================================================================
|
||
# MAIN APPLICATION CLASS
|
||
# ============================================================================
|
||
|
||
class StockApp(tk.Tk):
|
||
"""
|
||
Main application class for InvenTree Stock Tool.
|
||
|
||
Provides a GUI interface for barcode-based inventory management including:
|
||
- Stock addition and updates
|
||
- Stock level checking
|
||
- Part location finding
|
||
- Server connection monitoring
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""Initialize the application."""
|
||
super().__init__()
|
||
self.title("InvenTree Stock Tool")
|
||
self.geometry("750x700")
|
||
sv_ttk.set_theme("dark")
|
||
|
||
# Load configuration
|
||
self.host, self.token = load_config()
|
||
self.headers = {
|
||
'Authorization': f'Token {self.token}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
# Application state
|
||
self.mode = tk.StringVar(value='import')
|
||
self.debug_mode = tk.BooleanVar(value=False)
|
||
self.current_loc = None
|
||
self.locations = []
|
||
self.part_image = None
|
||
|
||
# Connection monitoring
|
||
self.server_connected = tk.BooleanVar(value=False)
|
||
self.alive_state = tk.BooleanVar(value=False)
|
||
|
||
# Initialize UI and start monitoring
|
||
self._build_ui()
|
||
self._start_connection_monitoring()
|
||
self.after(100, lambda: self.loc_entry.focus_set())
|
||
|
||
# ========================================================================
|
||
# CONNECTION MONITORING
|
||
# ========================================================================
|
||
|
||
def _start_connection_monitoring(self):
|
||
"""Start monitoring server connection and alive indicator."""
|
||
self._check_server_connection()
|
||
self._animate_alive_indicator()
|
||
self.after(10000, self._periodic_connection_check)
|
||
|
||
def _check_server_connection(self):
|
||
"""Check if the server is reachable."""
|
||
try:
|
||
response = requests.get(f"{self.host}/api/", headers=self.headers, timeout=5)
|
||
if response.status_code == 200:
|
||
self.server_connected.set(True)
|
||
self._update_connection_status(True)
|
||
if not self.locations:
|
||
self._fetch_locations()
|
||
else:
|
||
self.server_connected.set(False)
|
||
self._update_connection_status(False)
|
||
except Exception as e:
|
||
self.server_connected.set(False)
|
||
self._update_connection_status(False)
|
||
self.log_msg(f"Connection check failed: {e}", debug=True)
|
||
|
||
def _update_connection_status(self, connected: bool):
|
||
"""Update the connection status display."""
|
||
if connected:
|
||
self.server_status.config(text="Connected", foreground="green")
|
||
else:
|
||
self.server_status.config(text="Disconnected", foreground="red")
|
||
|
||
def _animate_alive_indicator(self):
|
||
"""Animate the alive indicator dot."""
|
||
current_state = self.alive_state.get()
|
||
new_state = not current_state
|
||
self.alive_state.set(new_state)
|
||
|
||
if new_state:
|
||
color = "green" if self.server_connected.get() else "orange"
|
||
else:
|
||
color = "gray"
|
||
|
||
self.alive_indicator.config(foreground=color)
|
||
self.after(1000, self._animate_alive_indicator)
|
||
|
||
def _periodic_connection_check(self):
|
||
"""Periodically check server connection."""
|
||
self._check_server_connection()
|
||
self.after(10000, self._periodic_connection_check)
|
||
|
||
# ========================================================================
|
||
# UI CONSTRUCTION
|
||
# ========================================================================
|
||
|
||
def _build_ui(self):
|
||
"""Build the complete user interface."""
|
||
self._create_status_bar()
|
||
self._create_location_section()
|
||
self._create_mode_section()
|
||
self._create_scan_section()
|
||
self._create_info_section()
|
||
self._create_log_section()
|
||
|
||
# Bind mode change to status update
|
||
self.mode.trace('w', self._update_status)
|
||
|
||
def _create_status_bar(self):
|
||
"""Create status bar showing current mode, location, and connection status."""
|
||
status_frame = ttk.Frame(self)
|
||
status_frame.pack(fill=tk.X, padx=10, pady=(10, 5))
|
||
|
||
# Main status label
|
||
self.status_label = ttk.Label(
|
||
status_frame,
|
||
text="Ready - Select location and mode",
|
||
relief="sunken",
|
||
anchor=tk.W,
|
||
font=("TkDefaultFont", 9, "bold")
|
||
)
|
||
self.status_label.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||
|
||
# Connection status frame
|
||
conn_frame = ttk.Frame(status_frame)
|
||
conn_frame.pack(side=tk.RIGHT, padx=(10, 0))
|
||
|
||
# Connection status components (in correct order)
|
||
self.alive_indicator = ttk.Label(
|
||
conn_frame,
|
||
text="●",
|
||
font=("TkDefaultFont", 12),
|
||
foreground="gray"
|
||
)
|
||
self.alive_indicator.pack(side=tk.RIGHT, padx=(5, 0))
|
||
|
||
self.server_status = ttk.Label(
|
||
conn_frame,
|
||
text="Disconnected",
|
||
font=("TkDefaultFont", 9),
|
||
foreground="red"
|
||
)
|
||
self.server_status.pack(side=tk.RIGHT, padx=(5, 0))
|
||
|
||
ttk.Label(conn_frame, text="Server:", font=("TkDefaultFont", 9)).pack(side=tk.RIGHT)
|
||
|
||
def _create_location_section(self):
|
||
"""Create location input section."""
|
||
frm = ttk.LabelFrame(self, text="Location", padding=10)
|
||
frm.pack(fill=tk.X, padx=10, pady=(5, 10))
|
||
|
||
ttk.Label(frm, text="Scan Location:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
|
||
|
||
self.loc_entry = ttk.Entry(frm)
|
||
self.loc_entry.grid(row=0, column=1, sticky=tk.EW)
|
||
self.loc_entry.bind("<Return>", lambda e: self.process_location_scan())
|
||
frm.columnconfigure(1, weight=1)
|
||
|
||
self.loc_label = ttk.Label(frm, text="Current Location: None", anchor=tk.W)
|
||
self.loc_label.grid(row=1, column=0, columnspan=2, sticky=tk.EW, pady=(5, 0))
|
||
|
||
def _create_mode_section(self):
|
||
"""Create mode selection section."""
|
||
mode_frm = ttk.LabelFrame(self, text="Mode", padding=10)
|
||
mode_frm.pack(fill=tk.X, padx=10, pady=(0, 10))
|
||
|
||
# Mode radio buttons
|
||
modes = [
|
||
("Add Stock", 'import'),
|
||
("Update Stock", 'update'),
|
||
("Check Stock", 'get'),
|
||
("Locate Part", 'locate')
|
||
]
|
||
|
||
for i, (text, value) in enumerate(modes):
|
||
ttk.Radiobutton(
|
||
mode_frm,
|
||
text=text,
|
||
variable=self.mode,
|
||
value=value,
|
||
command=self._on_mode_change
|
||
).grid(row=0, column=i, sticky=tk.W, padx=(0, 15))
|
||
|
||
# Debug mode checkbox
|
||
ttk.Checkbutton(
|
||
mode_frm,
|
||
text="Debug Mode",
|
||
variable=self.debug_mode
|
||
).grid(row=0, column=len(modes), sticky=tk.W, padx=(20, 0))
|
||
|
||
def _create_scan_section(self):
|
||
"""Create part scanning section."""
|
||
scan_frm = ttk.LabelFrame(self, text="Part Scanning", padding=10)
|
||
scan_frm.pack(fill=tk.X, padx=10, pady=(0, 10))
|
||
|
||
ttk.Label(scan_frm, text="Scan Part:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
|
||
|
||
self.scan_entry = ttk.Entry(scan_frm, font=("TkDefaultFont", 11))
|
||
self.scan_entry.grid(row=0, column=1, sticky=tk.EW)
|
||
self.scan_entry.bind("<Return>", lambda e: self.process_scan())
|
||
scan_frm.columnconfigure(1, weight=1)
|
||
|
||
def _create_info_section(self):
|
||
"""Create part information display section."""
|
||
info_frm = ttk.LabelFrame(self, text="Part Information", padding=10)
|
||
info_frm.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
|
||
|
||
# Left side - Information table
|
||
table_frm = ttk.Frame(info_frm)
|
||
table_frm.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 10))
|
||
|
||
# Create Treeview for part info table
|
||
columns = ('Property', 'Value')
|
||
self.info_tree = ttk.Treeview(table_frm, columns=columns, show='headings', height=8)
|
||
self.info_tree.heading('Property', text='Property')
|
||
self.info_tree.heading('Value', text='Value')
|
||
self.info_tree.column('Property', width=120, minwidth=80)
|
||
self.info_tree.column('Value', width=300, minwidth=200)
|
||
|
||
# Scrollbar for table
|
||
scrollbar = ttk.Scrollbar(table_frm, orient=tk.VERTICAL, command=self.info_tree.yview)
|
||
self.info_tree.configure(yscrollcommand=scrollbar.set)
|
||
|
||
self.info_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
|
||
# Right side - Image and stock display
|
||
right_frm = ttk.Frame(info_frm)
|
||
right_frm.pack(side=tk.RIGHT)
|
||
|
||
self.stock_label = ttk.Label(
|
||
right_frm,
|
||
text="",
|
||
font=("TkDefaultFont", 16, "bold")
|
||
)
|
||
self.stock_label.pack(pady=(0, 5))
|
||
|
||
self.image_label = ttk.Label(right_frm)
|
||
self.image_label.pack()
|
||
|
||
def _create_log_section(self):
|
||
"""Create log output section."""
|
||
log_frm = ttk.LabelFrame(self, text="Activity Log", padding=10)
|
||
log_frm.pack(fill=tk.BOTH, expand=False, padx=10, pady=(0, 10))
|
||
|
||
self.log = tk.Text(log_frm, height=8, state="disabled", font=("Consolas", 9))
|
||
log_scroll = ttk.Scrollbar(log_frm, orient=tk.VERTICAL, command=self.log.yview)
|
||
self.log.configure(yscrollcommand=log_scroll.set)
|
||
|
||
self.log.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
log_scroll.pack(side=tk.RIGHT, fill=tk.Y)
|
||
|
||
# ========================================================================
|
||
# LOCATION AND STATUS MANAGEMENT
|
||
# ========================================================================
|
||
|
||
def _fetch_locations(self):
|
||
"""Fetch available locations from API."""
|
||
try:
|
||
self.locations = get_locations(self.host, self.token)
|
||
self.log_msg("✅ Loaded locations from InvenTree")
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Failed to load locations: {e}")
|
||
if self.server_connected.get():
|
||
messagebox.showerror("Error", f"Failed to load locations: {e}")
|
||
|
||
def _update_status(self, *args):
|
||
"""Update status bar with current mode and location."""
|
||
mode_name = MODE_NAMES.get(self.mode.get(), self.mode.get())
|
||
|
||
if self.current_loc:
|
||
loc_name = next(
|
||
(name for loc_id, name in self.locations if loc_id == self.current_loc),
|
||
str(self.current_loc)
|
||
)
|
||
status = f"Mode: {mode_name} | Location: {self.current_loc} - {loc_name}"
|
||
else:
|
||
status = f"Mode: {mode_name} | Location: Not set"
|
||
|
||
self.status_label.config(text=status)
|
||
|
||
def process_location_scan(self):
|
||
"""Process scanned input in the location field."""
|
||
raw = self.loc_entry.get().strip()
|
||
self.loc_entry.delete(0, tk.END)
|
||
if not raw:
|
||
return
|
||
|
||
# Check for barcode commands first
|
||
if self._handle_barcode_command(raw):
|
||
return
|
||
|
||
# Normal location processing
|
||
self._process_location_input(raw)
|
||
|
||
def _process_location_input(self, code: str):
|
||
"""Process location input (formerly set_location logic)."""
|
||
location_id = None
|
||
|
||
# Check if it's a location barcode (INV-SL followed by location ID)
|
||
if code.upper().startswith('INV-SL'):
|
||
try:
|
||
location_id = int(code[6:]) # Extract ID after 'INV-SL'
|
||
self.log_msg(f"🔍 Extracted location ID {location_id} from barcode: {code}")
|
||
except (ValueError, IndexError):
|
||
self.log_msg(f"⚠ Invalid location barcode format: '{code}'")
|
||
return
|
||
# Check if it's a simple numeric location ID
|
||
elif code.isdigit():
|
||
location_id = int(code)
|
||
else:
|
||
# Try to find by name match
|
||
for loc_id, name in self.locations:
|
||
if code.lower() in name.lower():
|
||
location_id = loc_id
|
||
break
|
||
|
||
if location_id is not None:
|
||
# Find the location in our list
|
||
for loc_id, name in self.locations:
|
||
if loc_id == location_id:
|
||
self.current_loc = loc_id
|
||
self.loc_label.config(text=f"Current Location: {loc_id} – {name}")
|
||
self._update_status()
|
||
self.scan_entry.focus_set()
|
||
self.log_msg(f"📍 Set location: {loc_id} - {name}")
|
||
return
|
||
|
||
self.log_msg(f"⚠ Location ID {location_id} not found in available locations")
|
||
else:
|
||
self.log_msg(f"⚠ Could not parse location from: '{code}'")
|
||
|
||
def set_location(self):
|
||
"""Legacy method - now redirects to process_location_input."""
|
||
code = self.loc_entry.get().strip()
|
||
self.loc_entry.delete(0, tk.END)
|
||
if code:
|
||
self._process_location_input(code)
|
||
|
||
def _on_mode_change(self):
|
||
"""Handle mode change via radio buttons."""
|
||
self._update_status()
|
||
# Always focus on scan field when mode changes
|
||
self.scan_entry.focus_set()
|
||
|
||
# ========================================================================
|
||
# BARCODE COMMAND HANDLING
|
||
# ========================================================================
|
||
|
||
def _handle_barcode_command(self, raw_input: str) -> bool:
|
||
"""
|
||
Handle special barcode commands.
|
||
|
||
Args:
|
||
raw_input: Raw barcode input string
|
||
|
||
Returns:
|
||
True if command was processed, False otherwise
|
||
"""
|
||
upper_input = raw_input.upper()
|
||
|
||
for mode, commands in BARCODE_COMMANDS.items():
|
||
if upper_input in commands:
|
||
if mode in ['import', 'update', 'get', 'locate']:
|
||
self.mode.set(mode)
|
||
self._update_status()
|
||
self.log_msg(f"🔄 Switched to {MODE_NAMES[mode]} mode")
|
||
self.scan_entry.focus_set()
|
||
return True
|
||
|
||
elif mode == 'debug_on':
|
||
self.debug_mode.set(True)
|
||
self.log_msg("🐛 Debug mode enabled")
|
||
self.scan_entry.focus_set()
|
||
return True
|
||
|
||
elif mode == 'debug_off':
|
||
self.debug_mode.set(False)
|
||
self.log_msg("🔇 Debug mode disabled")
|
||
self.scan_entry.focus_set()
|
||
return True
|
||
|
||
elif mode == 'change_location':
|
||
self._trigger_location_change()
|
||
return True
|
||
|
||
return False
|
||
|
||
def _trigger_location_change(self):
|
||
"""Trigger location change mode."""
|
||
self.log_msg("📍 Location change mode - scan new location barcode")
|
||
self.current_loc = None
|
||
self.loc_label.config(text="Current Location: None - Scan new location")
|
||
self._update_status()
|
||
self.loc_entry.focus_set()
|
||
|
||
# ========================================================================
|
||
# PART PROCESSING
|
||
# ========================================================================
|
||
|
||
def _process_part_scan(self, part_code: str, quantity: Optional[int]):
|
||
"""Process a scanned part."""
|
||
try:
|
||
self.log_msg(f"🔎 Looking up part: {part_code}", debug=True)
|
||
pid = find_or_import_part(self.host, self.token, part_code)
|
||
self.log_msg(f"✅ Found part ID: {pid}", debug=True)
|
||
|
||
self._show_part_info(pid)
|
||
self._execute_stock_operation(pid, part_code, quantity)
|
||
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Part lookup/import failed: {e}")
|
||
import traceback
|
||
self.log_msg(f"Traceback: {traceback.format_exc()}", debug=True)
|
||
|
||
def _execute_stock_operation(self, part_id: int, part_code: str, quantity: Optional[int]):
|
||
"""Execute the appropriate stock operation based on current mode."""
|
||
mode = self.mode.get()
|
||
|
||
if mode == 'import':
|
||
self._add_stock(part_id, part_code, quantity)
|
||
elif mode == 'update':
|
||
self._update_stock(part_id, part_code)
|
||
elif mode == 'locate':
|
||
self._locate_part(part_id, part_code)
|
||
else: # check mode
|
||
self._check_stock(part_id, part_code)
|
||
|
||
# ========================================================================
|
||
# STOCK OPERATIONS
|
||
# ========================================================================
|
||
|
||
def _add_stock(self, part_id: int, part_code: str, quantity: Optional[int]):
|
||
"""Add stock for a part."""
|
||
if quantity is None:
|
||
self.log_msg("⚠ No quantity found in scan.")
|
||
return
|
||
|
||
try:
|
||
# Check if stock item already exists at this location
|
||
existing = find_stock_item(self.host, self.token, part_id, self.current_loc)
|
||
current_stock = get_stock_level(self.host, self.token, part_id, self.current_loc)
|
||
|
||
if existing:
|
||
# Stock item exists - update the quantity by adding to it
|
||
sid = existing.get('pk', existing.get('id'))
|
||
new_stock = current_stock + quantity
|
||
|
||
# Use PATCH to update the quantity
|
||
r = requests.patch(
|
||
f"{self.host}/api/stock/{sid}/",
|
||
headers=self.headers,
|
||
json={'quantity': new_stock}
|
||
)
|
||
r.raise_for_status()
|
||
|
||
self.log_msg(f"✔ Added {quantity}× to existing '{part_code}' (StockItem #{sid})")
|
||
self.log_msg(f"📊 Stock: {current_stock} → {new_stock} (+{quantity})")
|
||
|
||
else:
|
||
# No existing stock - create new stock item
|
||
r = requests.post(
|
||
f"{self.host}/api/stock/",
|
||
headers=self.headers,
|
||
json={'part': part_id, 'location': self.current_loc, 'quantity': quantity}
|
||
)
|
||
r.raise_for_status()
|
||
|
||
# Handle response - might be list or dict
|
||
response_data = r.json()
|
||
if isinstance(response_data, list):
|
||
stock_item = response_data[0] if response_data else {}
|
||
else:
|
||
stock_item = response_data
|
||
|
||
sid = stock_item.get('pk', stock_item.get('id'))
|
||
new_stock = quantity
|
||
|
||
self.log_msg(f"✔ Created new stock item for '{part_code}' → StockItem #{sid}")
|
||
self.log_msg(f"📊 Stock: 0 → {new_stock} (+{quantity})")
|
||
|
||
self.stock_label.config(text=f"Stock: {new_stock}")
|
||
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Error adding stock: {e}")
|
||
import traceback
|
||
self.log_msg(f"Traceback: {traceback.format_exc()}", debug=True)
|
||
|
||
def _update_stock(self, part_id: int, part_code: str):
|
||
"""Update stock for a part."""
|
||
try:
|
||
current_stock = get_stock_level(self.host, self.token, part_id, self.current_loc)
|
||
new_qty = simpledialog.askinteger(
|
||
"Quantity",
|
||
f"Enter current stock level for '{part_code}' (currently: {current_stock}):",
|
||
parent=self
|
||
)
|
||
|
||
if new_qty is None:
|
||
self.scan_entry.focus_set()
|
||
return
|
||
|
||
existing = find_stock_item(self.host, self.token, part_id, self.current_loc)
|
||
if existing:
|
||
sid = existing['pk']
|
||
r = requests.patch(
|
||
f"{self.host}/api/stock/{sid}/",
|
||
headers=self.headers,
|
||
json={'quantity': new_qty}
|
||
)
|
||
r.raise_for_status()
|
||
|
||
change = new_qty - current_stock
|
||
change_str = f"({'+' if change >= 0 else ''}{change})" if change != 0 else "(no change)"
|
||
self.log_msg(f"✔ Updated '{part_code}' stock (StockItem #{sid})")
|
||
self.log_msg(f"📊 Stock: {current_stock} → {new_qty} {change_str}")
|
||
|
||
else:
|
||
r = requests.post(
|
||
f"{self.host}/api/stock/",
|
||
headers=self.headers,
|
||
json={'part': part_id, 'location': self.current_loc, 'quantity': new_qty}
|
||
)
|
||
r.raise_for_status()
|
||
|
||
# Handle response - might be list or dict
|
||
response_data = r.json()
|
||
if isinstance(response_data, list):
|
||
stock_item = response_data[0] if response_data else {}
|
||
else:
|
||
stock_item = response_data
|
||
|
||
sid = stock_item.get('pk', stock_item.get('id'))
|
||
self.log_msg(f"✔ Created '{part_code}' stock → StockItem #{sid}")
|
||
self.log_msg(f"📊 Stock: 0 → {new_qty} (+{new_qty})")
|
||
|
||
self.stock_label.config(text=f"Stock: {new_qty}")
|
||
self.scan_entry.focus_set()
|
||
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Error updating stock: {e}")
|
||
self.scan_entry.focus_set()
|
||
|
||
def _check_stock(self, part_id: int, part_code: str):
|
||
"""Check current stock level for a part."""
|
||
self.log_msg(f"📊 Getting stock level for part {part_id} at location {self.current_loc}", debug=True)
|
||
try:
|
||
current = get_stock_level(self.host, self.token, part_id, self.current_loc)
|
||
self.log_msg(f"ℹ Current stock for '{part_code}' at location {self.current_loc}: {current}")
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Error fetching stock level: {e}")
|
||
|
||
def _locate_part(self, part_id: int, part_code: str):
|
||
"""Locate where this part is stored and show all locations."""
|
||
self.log_msg(f"📍 Locating all instances of '{part_code}'...")
|
||
try:
|
||
# Get all stock locations for this part
|
||
stock_items = get_part_locations(self.host, self.token, part_id)
|
||
|
||
if not stock_items:
|
||
self.log_msg(f"📭 No stock found for '{part_code}' in any location")
|
||
return
|
||
|
||
self.log_msg(f"📦 Found '{part_code}' in {len(stock_items)} location(s):")
|
||
|
||
total_stock = 0
|
||
locations_info = []
|
||
|
||
for item in stock_items:
|
||
loc_id = item.get('location')
|
||
quantity = item.get('quantity', 0)
|
||
total_stock += quantity
|
||
|
||
if loc_id:
|
||
try:
|
||
# Get location details
|
||
location = get_location_details(self.host, self.token, loc_id)
|
||
loc_name = location.get('name', f'Location {loc_id}')
|
||
loc_path = location.get('pathstring', '')
|
||
|
||
if loc_path:
|
||
full_location = f"{loc_path}"
|
||
else:
|
||
full_location = loc_name
|
||
|
||
locations_info.append((loc_id, full_location, quantity))
|
||
self.log_msg(f" 📌 Location {loc_id}: {full_location} - Qty: {quantity}")
|
||
|
||
except Exception as e:
|
||
# Fallback if we can't get location details
|
||
locations_info.append((loc_id, f"Location {loc_id}", quantity))
|
||
self.log_msg(f" 📌 Location {loc_id}: Qty: {quantity}")
|
||
else:
|
||
self.log_msg(f" 📌 Unknown location - Qty: {quantity}")
|
||
|
||
self.log_msg(f"📊 Total stock across all locations: {total_stock}")
|
||
|
||
# Update the table with location information
|
||
if hasattr(self, 'info_tree'):
|
||
# Add location summary to the part info table
|
||
if locations_info:
|
||
self.info_tree.insert('', 'end', values=('Total Locations', len(locations_info)))
|
||
self.info_tree.insert('', 'end', values=('Total Stock', total_stock))
|
||
|
||
# Add each location as a separate row
|
||
for loc_id, loc_name, qty in locations_info:
|
||
self.info_tree.insert('', 'end', values=(f'📍 {loc_name}', f'Qty: {qty}'))
|
||
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Error locating part: {e}")
|
||
import traceback
|
||
self.log_msg(f"Debug traceback: {traceback.format_exc()}", debug=True)
|
||
|
||
# ========================================================================
|
||
# PART INFO DISPLAY
|
||
# ========================================================================
|
||
|
||
def _show_part_info(self, part_id: int):
|
||
"""Display part information in the table."""
|
||
self.log_msg(f"🔍 Loading part info for part ID: {part_id}", debug=True)
|
||
|
||
if not hasattr(self, 'info_tree'):
|
||
self.log_msg("✖ Error: info_tree not found! UI may not be initialized properly.")
|
||
return
|
||
|
||
try:
|
||
# Clear existing table data
|
||
for item in self.info_tree.get_children():
|
||
self.info_tree.delete(item)
|
||
|
||
# Get part info
|
||
info = get_part_info(self.host, self.token, part_id)
|
||
|
||
# Add basic part info to table
|
||
desc = info.get('description', '').strip() or 'No description.'
|
||
self.info_tree.insert('', 'end', values=('Description', desc))
|
||
|
||
if info.get('name'):
|
||
self.info_tree.insert('', 'end', values=('Name', info.get('name')))
|
||
if info.get('IPN'):
|
||
self.info_tree.insert('', 'end', values=('IPN', info.get('IPN')))
|
||
|
||
# Update stock display
|
||
if self.current_loc is not None and self.mode.get() != 'locate':
|
||
try:
|
||
current_stock = get_stock_level(self.host, self.token, part_id, self.current_loc)
|
||
self.stock_label.config(text=f"Stock: {current_stock}")
|
||
except Exception as e:
|
||
self.stock_label.config(text="Stock: ?")
|
||
self.log_msg(f"⚠ Stock level error: {e}", debug=True)
|
||
else:
|
||
self.stock_label.config(text="")
|
||
|
||
# Load and display image
|
||
self._load_part_image(info)
|
||
|
||
# Load and display parameters (only if not in locate mode)
|
||
if self.mode.get() != 'locate':
|
||
self._load_part_parameters(part_id)
|
||
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Error loading part info: {e}")
|
||
|
||
def _load_part_image(self, part_info: Dict[str, Any]):
|
||
"""Load and display part image."""
|
||
self.log_msg("🖼️ Loading image...", debug=True)
|
||
img_url = part_info.get('image') or part_info.get('thumbnail')
|
||
|
||
if img_url:
|
||
if img_url.startswith('/'):
|
||
img_url = f"{self.host}{img_url}"
|
||
try:
|
||
resp = requests.get(img_url, headers={'Authorization': f'Token {self.token}'})
|
||
resp.raise_for_status()
|
||
img = Image.open(BytesIO(resp.content))
|
||
img.thumbnail((256, 256))
|
||
self.part_image = ImageTk.PhotoImage(img)
|
||
self.image_label.config(image=self.part_image)
|
||
self.log_msg("✅ Image loaded successfully", debug=True)
|
||
except Exception as e:
|
||
self.image_label.config(image='')
|
||
self.log_msg(f"⚠ Image load error: {e}", debug=True)
|
||
else:
|
||
self.image_label.config(image='')
|
||
self.log_msg("ℹ No image URL found", debug=True)
|
||
|
||
def _load_part_parameters(self, part_id: int):
|
||
"""Load and display part parameters."""
|
||
self.log_msg(f"🔧 Starting parameter loading for part {part_id}", debug=True)
|
||
try:
|
||
params = get_part_parameters(self.host, self.token, part_id)
|
||
self.log_msg(f"📋 Found {len(params)} parameters for part {part_id}", debug=True)
|
||
|
||
param_names = [p.get('template_detail', {}).get('name') for p in params]
|
||
self.log_msg(f"Available parameters: {param_names}", debug=True)
|
||
|
||
params_displayed = 0
|
||
for p in params:
|
||
template_name = p.get('template_detail', {}).get('name')
|
||
pval = p.get('data')
|
||
if template_name and pval:
|
||
self.info_tree.insert('', 'end', values=(template_name, pval))
|
||
params_displayed += 1
|
||
self.log_msg(f" • {template_name}: {pval}", debug=True)
|
||
|
||
if params_displayed == 0:
|
||
self.log_msg("⚠ No parameters found to display", debug=True)
|
||
else:
|
||
self.log_msg(f"✔ Displayed {params_displayed} parameters", debug=True)
|
||
|
||
except Exception as e:
|
||
self.log_msg(f"✖ Could not load parameters: {e}")
|
||
import traceback
|
||
self.log_msg(f"Debug traceback: {traceback.format_exc()}", debug=True)
|
||
|
||
# ========================================================================
|
||
# UTILITY METHODS
|
||
# ========================================================================
|
||
|
||
def log_msg(self, msg: str, debug: bool = False):
|
||
"""
|
||
Log a message to the activity log.
|
||
|
||
Args:
|
||
msg: Message to log
|
||
debug: If True, only show when debug mode is enabled
|
||
"""
|
||
if debug and not self.debug_mode.get():
|
||
return
|
||
|
||
self.log.config(state="normal")
|
||
self.log.insert(tk.END, msg + "\n")
|
||
self.log.see(tk.END)
|
||
self.log.config(state="disabled")
|
||
|
||
def process_scan(self):
|
||
"""Process a scanned barcode or part code."""
|
||
self.log_msg("🔄 Starting process_scan()", debug=True)
|
||
|
||
# Get and clear the scan input first
|
||
raw = self.scan_entry.get().strip()
|
||
self.scan_entry.delete(0, tk.END)
|
||
if not raw:
|
||
return
|
||
|
||
# Check server connection before processing
|
||
if not self.server_connected.get():
|
||
messagebox.showwarning("No Connection", "No connection to InvenTree server. Please check your connection.")
|
||
return
|
||
|
||
self.log_msg(f"📄 Raw scan input: {raw}", debug=True)
|
||
|
||
# Check for barcode commands first
|
||
if self._handle_barcode_command(raw):
|
||
return
|
||
|
||
# For locate mode, we don't need a current location
|
||
if self.current_loc is None and self.mode.get() != 'locate':
|
||
messagebox.showwarning("No Location", "Please scan a location first.")
|
||
return
|
||
|
||
# Normal part processing
|
||
part, qty = parse_scan(raw)
|
||
self.log_msg(f"🔍 Parsed part: {part}, qty: {qty}", debug=True)
|
||
|
||
if not part:
|
||
self.log_msg("⚠ Could not parse part code.")
|
||
return
|
||
|
||
self._process_part_scan(part, qty)
|
||
self.log_msg("🏁 Finished process_scan()", debug=True)
|
||
|
||
# ============================================================================
|
||
# MAIN ENTRY POINT
|
||
# ============================================================================
|
||
|
||
def main():
|
||
"""Main entry point for the application."""
|
||
try:
|
||
app = StockApp()
|
||
app.mainloop()
|
||
except KeyboardInterrupt:
|
||
print("\nApplication interrupted by user")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
print(f"Fatal error: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |