Files
inventree-stock-tool/src/stocktool/stock_tool_gui_v2.py
T
grabowski bbc4b1e763 fix: Robust barcode scan handling for ANSI MH10.8.2 and InvenTree 1.x
- Capture scanner control keystrokes (Ctrl+]/^/\/_ → GS/RS/FS/US) in the
  scan input so ANSI MH10.8.2 field separators survive the HTML input
  filter, eliminating the Q-quantity-vs-next-DI ambiguity.
- Fall back to a DI-aware lazy regex when separators are stripped
  (e.g. pasted scans), so Q digits stop at the next data identifier
  instead of greedily eating into 11Z/12Z/etc.
- Make pending-part dicts JSON-serializable by isoformat-ing the
  timestamp; without this the worker's import_complete socket emit
  threw and the entry was never removed from the queue, causing
  every re-scan to 400 with "already queued" forever.
- Make /api/part/import idempotent: a re-scan of an already-queued
  part updates qty/location and returns 200 with already_queued=true
  instead of 400.
- Surface search/queue errors in the client log instead of silently
  swallowing them, and stop treating a 500 from /api/part/search as
  "not found" (which was causing re-queue loops).
- Log full tracebacks for /api/part/search failures and split the
  get_part_info / get_part_parameters error paths so failures can be
  attributed.
- Migrate get_part_parameters to the InvenTree 1.x endpoint
  /api/parameter/?model_type=part.part&model_id=<id>. The old
  /api/part/parameter/?part=<id> returns 404 on this instance, and
  even on the new endpoint the ?part= filter is silently ignored
  (would have returned every parameter in the database).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 14:25:35 +07:00

1467 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InvenTree Stock Management Tool
A comprehensive barcode scanning application for InvenTree inventory management.
Features include stock addition, updates, checking, and part location finding.
Requirements:
- sv-ttk: pip install sv-ttk
- pillow: pip install pillow
Configuration:
Config file: ~/.config/scan_and_import.yaml
Required fields: host, token
"""
import tkinter as tk
from tkinter import ttk, simpledialog, messagebox
import sv_ttk
import re
import subprocess
import requests
import yaml
import sys
import threading
import queue
from pathlib import Path
from io import BytesIO
from PIL import Image, ImageTk
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from datetime import datetime
# ============================================================================
# CONFIGURATION & CONSTANTS
# ============================================================================
CONFIG_FILE = Path.home() / '.config' / 'scan_and_import.yaml'
SEP_RE = re.compile(r'[\x1D\x1E]')
# Barcode command mappings
BARCODE_COMMANDS = {
# Mode switching
'import': ['MODE:ADD', 'MODE:IMPORT', 'ADD_STOCK', 'IMPORT'],
'update': ['MODE:UPDATE', 'UPDATE_STOCK', 'UPDATE'],
'get': ['MODE:CHECK', 'MODE:GET', 'CHECK_STOCK', 'CHECK'],
'locate': ['MODE:LOCATE', 'LOCATE_PART', 'LOCATE', 'FIND_PART'],
# Debug control
'debug_on': ['DEBUG:ON', 'DEBUG_ON'],
'debug_off': ['DEBUG:OFF', 'DEBUG_OFF'],
# Location management
'change_location': ['CHANGE_LOCATION', 'NEW_LOCATION', 'SET_LOCATION', 'LOCATION']
}
# User-friendly mode names
MODE_NAMES = {
'import': 'Add Stock',
'update': 'Update Stock',
'get': 'Check Stock',
'locate': 'Locate Part'
}
# ============================================================================
# IMPORT QUEUE DATA STRUCTURES
# ============================================================================
@dataclass
class PendingPart:
"""Represents a part pending import."""
part_code: str
quantity: Optional[int]
location_id: int
mode: str
timestamp: datetime
retry_count: int = 0
def __str__(self):
return f"{self.part_code} (Qty: {self.quantity or 'N/A'})"
@dataclass
class ImportResult:
"""Result of a part import operation."""
part_code: str
success: bool
part_id: Optional[int] = None
error: Optional[str] = None
class PartImportWorker:
"""Background worker for importing parts."""
def __init__(self, host: str, token: str, callback):
"""
Initialize the import worker.
Args:
host: InvenTree server URL
token: API authentication token
callback: Function to call with ImportResult when import completes
"""
self.host = host
self.token = token
self.callback = callback
self.import_queue = queue.Queue()
self.running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
def queue_import(self, part_code: str):
"""Queue a part for import."""
self.import_queue.put(part_code)
def _worker_loop(self):
"""Main worker loop - processes import queue."""
while self.running:
try:
part_code = self.import_queue.get(timeout=1)
result = self._import_part(part_code)
# Call callback on main thread
self.callback(result)
self.import_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Worker error: {e}")
def _import_part(self, part_code: str) -> ImportResult:
"""
Import a single part.
Args:
part_code: Part code to import
Returns:
ImportResult with success/failure information
"""
try:
# Run the import tool
result = subprocess.run(
['inventree-part-import', part_code],
check=True,
capture_output=True,
text=True,
timeout=30
)
# Search for the part to get its ID
url = f"{self.host}/api/part/"
headers = {'Authorization': f'Token {self.token}'}
r = requests.get(url, headers=headers, params={'search': part_code})
r.raise_for_status()
raw = r.json()
results = raw.get('results', []) if isinstance(raw, dict) else raw
if results:
part_id = results[0].get('pk', results[0].get('id'))
return ImportResult(part_code=part_code, success=True, part_id=part_id)
else:
return ImportResult(
part_code=part_code,
success=False,
error="Part not found after import"
)
except subprocess.TimeoutExpired:
return ImportResult(
part_code=part_code,
success=False,
error="Import timeout (30s)"
)
except subprocess.CalledProcessError as e:
return ImportResult(
part_code=part_code,
success=False,
error=f"Import failed: {e.stderr}"
)
except Exception as e:
return ImportResult(
part_code=part_code,
success=False,
error=str(e)
)
def stop(self):
"""Stop the worker thread."""
self.running = False
if self.worker_thread.is_alive():
self.worker_thread.join(timeout=2)
# ============================================================================
# API FUNCTIONS
# ============================================================================
def load_config() -> Tuple[str, str]:
"""
Load configuration from YAML file.
Returns:
Tuple[str, str]: (host_url, api_token)
Raises:
SystemExit: If config file is missing or invalid
"""
if not CONFIG_FILE.exists():
messagebox.showerror("Config Error", f"Config file {CONFIG_FILE} not found.")
sys.exit(1)
data = yaml.safe_load(CONFIG_FILE.read_text())
host = data.get('host')
token = data.get('token')
if not host or not token:
messagebox.showerror("Config Error", f"Both 'host' and 'token' must be set in {CONFIG_FILE}")
sys.exit(1)
return host.rstrip('/'), token
def get_locations(host: str, token: str) -> List[Tuple[int, str]]:
"""
Fetch all locations from InvenTree API.
Args:
host: InvenTree server URL
token: API authentication token
Returns:
List of (location_id, location_name) tuples
"""
url = f"{host}/api/stock/location/"
headers = {'Authorization': f'Token {token}'}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()
locations = data.get('results', data) if isinstance(data, dict) else data
return [(loc.get('id', loc.get('pk')), loc.get('name', '')) for loc in locations]
def find_part(host: str, token: str, part_code: str) -> Optional[int]:
"""
Find part by code (non-blocking).
Args:
host: InvenTree server URL
token: API authentication token
part_code: Part code to search for
Returns:
Part ID if found, None if not found
"""
url = f"{host}/api/part/"
headers = {'Authorization': f'Token {token}'}
try:
r = requests.get(url, headers=headers, params={'search': part_code})
r.raise_for_status()
raw = r.json()
results = raw.get('results', []) if isinstance(raw, dict) else raw
if results:
return results[0].get('pk', results[0].get('id'))
return None
except Exception:
return None
def get_part_info(host: str, token: str, part_id: int) -> Dict[str, Any]:
"""
Get part information from InvenTree API.
Args:
host: InvenTree server URL
token: API authentication token
part_id: Part ID to get info for
Returns:
Part information dictionary
"""
url = f"{host}/api/part/{part_id}/"
headers = {'Authorization': f'Token {token}'}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.json()
def get_part_parameters(host: str, token: str, part_id: int) -> List[Dict[str, Any]]:
"""
Get part parameters from InvenTree API.
Args:
host: InvenTree server URL
token: API authentication token
part_id: Part ID to get parameters for
Returns:
List of parameter dictionaries
"""
# InvenTree 1.x consolidated parameters under /api/parameter/ keyed by
# generic (model_type, model_id). The old /api/part/parameter/?part=<id>
# endpoint no longer exists, and just `?part=<id>` on the new endpoint
# is silently ignored (returns every parameter in the database).
url = f"{host}/api/parameter/"
headers = {'Authorization': f'Token {token}'}
resp = requests.get(
url,
headers=headers,
params={'model_type': 'part.part', 'model_id': part_id},
)
resp.raise_for_status()
data = resp.json()
return data.get('results', data) if isinstance(data, dict) and 'results' in data else data
def get_part_locations(host: str, token: str, part_id: int) -> List[Dict[str, Any]]:
"""
Get all stock locations for a specific part.
Args:
host: InvenTree server URL
token: API authentication token
part_id: Part ID to find locations for
Returns:
List of stock item dictionaries
"""
url = f"{host}/api/stock/"
headers = {'Authorization': f'Token {token}'}
resp = requests.get(url, headers=headers, params={'part': part_id})
resp.raise_for_status()
data = resp.json()
return data.get('results', data) if isinstance(data, dict) and 'results' in data else data
def get_location_details(host: str, token: str, location_id: int) -> Dict[str, Any]:
"""
Get details for a specific location.
Args:
host: InvenTree server URL
token: API authentication token
location_id: Location ID to get details for
Returns:
Location information dictionary
"""
url = f"{host}/api/stock/location/{location_id}/"
headers = {'Authorization': f'Token {token}'}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.json()
def find_stock_item(host: str, token: str, part_id: int, loc_id: int) -> Optional[Dict[str, Any]]:
"""
Find stock item for part at specific location.
Args:
host: InvenTree server URL
token: API authentication token
part_id: Part ID
loc_id: Location ID
Returns:
Stock item dictionary or None if not found
"""
url = f"{host}/api/stock/"
headers = {'Authorization': f'Token {token}'}
resp = requests.get(url, headers=headers, params={'part': part_id, 'location': loc_id})
resp.raise_for_status()
data = resp.json()
results = data.get('results', data) if isinstance(data, dict) else data
return results[0] if results else None
def get_stock_level(host: str, token: str, part_id: int, loc_id: int) -> float:
"""
Get current stock level for part at location.
Args:
host: InvenTree server URL
token: API authentication token
part_id: Part ID
loc_id: Location ID
Returns:
Stock quantity
"""
item = find_stock_item(host, token, part_id, loc_id)
return item.get('quantity', 0) if item else 0
def lookup_barcode(host: str, token: str, barcode: str) -> Optional[Dict[str, Any]]:
"""
Lookup a barcode using the InvenTree barcode API.
Args:
host: InvenTree server URL
token: API authentication token
barcode: Barcode string to lookup
Returns:
Barcode lookup result or None if not found
"""
url = f"{host}/api/barcode/"
headers = {'Authorization': f'Token {token}', 'Content-Type': 'application/json'}
payload = {"barcode": barcode}
try:
resp = requests.post(url, headers=headers, json=payload)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException:
return None
def parse_scan(raw: str) -> Tuple[Optional[str], Optional[int]]:
"""
Parse scanned barcode to extract part code and quantity.
Supports multiple formats:
- JSON-like: {PM:PART-CODE,QTY:10}
- Separator-based: Fields separated by GS/RS (\x1D, \x1E)
- ANSI MH10.8.2: [)>06P<part>...Q<qty>...
Args:
raw: Raw barcode string
Returns:
Tuple of (part_code, quantity) or (None, None) if parsing fails
"""
def clean_part_code(code: str) -> str:
"""Clean part code by removing non-ASCII and invalid characters."""
# Keep only ASCII printable characters (excluding control chars)
# This removes characters like ¡­ and other encoding artifacts
cleaned = ''.join(char for char in code if 32 <= ord(char) <= 126)
return cleaned.strip()
# Handle JSON-like format
if raw.startswith('{') and '}' in raw:
content = raw.strip()[1:-1]
part = None
qty = None
for kv in content.split(','):
if ':' not in kv:
continue
k, v = kv.split(':', 1)
key = k.strip().upper()
val = v.strip()
if key == 'PM':
part = clean_part_code(val)
elif key == 'QTY':
try:
qty = int(val)
except ValueError:
pass
return part, qty
# Handle ANSI MH10.8.2 format: [)>06P<part>...
if raw.startswith('[)>06'):
part = None
qty = None
# If GS/RS separators survived, parse field-by-field — unambiguous.
fields = SEP_RE.split(raw)
if len(fields) > 1:
for f in fields:
if not f:
continue
if f.startswith('30P'):
part = clean_part_code(f[3:])
elif f.startswith('1P'):
if not part:
part = clean_part_code(f[2:])
elif f.startswith('Q') and f[1:].isdigit():
qty = int(f[1:])
if part or qty:
return part, qty
# Separators stripped — extract part after [)>06P up to next field marker.
if len(raw) > 6 and raw[5] == 'P':
after_p = raw[6:]
markers_to_find = ['1P', '30P']
end_idx = len(after_p)
for marker in markers_to_find:
idx = after_p.find(marker)
if idx != -1 and idx < end_idx:
end_idx = idx
part = clean_part_code(after_p[:end_idx])
# Lazy-match Q digits, stopping at the next data identifier
# (e.g. 11Z, 12Z, 20Z, 1T, 9D, 4L) or end-of-string.
q_match = re.search(r'Q(\d+?)(?=\d{0,2}[A-Z]|$)', raw)
if q_match:
qty = int(q_match.group(1))
if part or qty:
return part, qty
# Handle separator-based format
part = None
qty = None
fields = SEP_RE.split(raw)
for f in fields:
if not f:
continue
if f.startswith('30P'):
part = clean_part_code(f[3:])
elif f.lower().startswith('1p'):
part = clean_part_code(f[2:])
elif f.lower().startswith('q') and f[1:].isdigit():
qty = int(f[1:])
return part, qty
# ============================================================================
# MAIN APPLICATION CLASS
# ============================================================================
class StockApp(tk.Tk):
"""
Main application class for InvenTree Stock Tool.
Provides a GUI interface for barcode-based inventory management including:
- Stock addition and updates
- Stock level checking
- Part location finding
- Server connection monitoring
"""
def __init__(self):
"""Initialize the application."""
super().__init__()
self.title("InvenTree Stock Tool")
self.geometry("750x700")
sv_ttk.set_theme("dark")
# Load configuration
self.host, self.token = load_config()
self.headers = {
'Authorization': f'Token {self.token}',
'Content-Type': 'application/json'
}
# Application state
self.mode = tk.StringVar(value='import')
self.debug_mode = tk.BooleanVar(value=False)
self.current_loc = None
self.locations = []
self.part_image = None
# Connection monitoring
self.server_connected = tk.BooleanVar(value=False)
self.alive_state = tk.BooleanVar(value=False)
# Part import queue
self.pending_parts: Dict[str, PendingPart] = {} # part_code -> PendingPart
self.import_worker = PartImportWorker(self.host, self.token, self._on_import_complete)
# Initialize UI and start monitoring
self._build_ui()
self._start_connection_monitoring()
self.after(100, lambda: self.loc_entry.focus_set())
# Register cleanup on window close
self.protocol("WM_DELETE_WINDOW", self._on_closing)
# ========================================================================
# CONNECTION MONITORING
# ========================================================================
def _start_connection_monitoring(self):
"""Start monitoring server connection and alive indicator."""
self._check_server_connection()
self._animate_alive_indicator()
self.after(10000, self._periodic_connection_check)
def _check_server_connection(self):
"""Check if the server is reachable."""
try:
response = requests.get(f"{self.host}/api/", headers=self.headers, timeout=5)
if response.status_code == 200:
self.server_connected.set(True)
self._update_connection_status(True)
if not self.locations:
self._fetch_locations()
else:
self.server_connected.set(False)
self._update_connection_status(False)
except Exception as e:
self.server_connected.set(False)
self._update_connection_status(False)
self.log_msg(f"Connection check failed: {e}", debug=True)
def _update_connection_status(self, connected: bool):
"""Update the connection status display."""
if connected:
self.server_status.config(text="Connected", foreground="green")
else:
self.server_status.config(text="Disconnected", foreground="red")
def _animate_alive_indicator(self):
"""Animate the alive indicator dot."""
current_state = self.alive_state.get()
new_state = not current_state
self.alive_state.set(new_state)
if new_state:
color = "green" if self.server_connected.get() else "orange"
else:
color = "gray"
self.alive_indicator.config(foreground=color)
self.after(1000, self._animate_alive_indicator)
def _periodic_connection_check(self):
"""Periodically check server connection."""
self._check_server_connection()
self.after(10000, self._periodic_connection_check)
# ========================================================================
# UI CONSTRUCTION
# ========================================================================
def _build_ui(self):
"""Build the complete user interface."""
self._create_status_bar()
self._create_location_section()
self._create_mode_section()
self._create_scan_section()
self._create_pending_parts_section()
self._create_info_section()
self._create_log_section()
# Bind mode change to status update
self.mode.trace('w', self._update_status)
def _create_status_bar(self):
"""Create status bar showing current mode, location, and connection status."""
status_frame = ttk.Frame(self)
status_frame.pack(fill=tk.X, padx=10, pady=(10, 5))
# Main status label
self.status_label = ttk.Label(
status_frame,
text="Ready - Select location and mode",
relief="sunken",
anchor=tk.W,
font=("TkDefaultFont", 9, "bold")
)
self.status_label.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Connection status frame
conn_frame = ttk.Frame(status_frame)
conn_frame.pack(side=tk.RIGHT, padx=(10, 0))
# Connection status components (in correct order)
self.alive_indicator = ttk.Label(
conn_frame,
text="",
font=("TkDefaultFont", 12),
foreground="gray"
)
self.alive_indicator.pack(side=tk.RIGHT, padx=(5, 0))
self.server_status = ttk.Label(
conn_frame,
text="Disconnected",
font=("TkDefaultFont", 9),
foreground="red"
)
self.server_status.pack(side=tk.RIGHT, padx=(5, 0))
ttk.Label(conn_frame, text="Server:", font=("TkDefaultFont", 9)).pack(side=tk.RIGHT)
def _create_location_section(self):
"""Create location input section."""
frm = ttk.LabelFrame(self, text="Location", padding=10)
frm.pack(fill=tk.X, padx=10, pady=(5, 10))
ttk.Label(frm, text="Scan Location:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
self.loc_entry = ttk.Entry(frm)
self.loc_entry.grid(row=0, column=1, sticky=tk.EW)
self.loc_entry.bind("<Return>", lambda e: self.process_location_scan())
frm.columnconfigure(1, weight=1)
self.loc_label = ttk.Label(frm, text="Current Location: None", anchor=tk.W)
self.loc_label.grid(row=1, column=0, columnspan=2, sticky=tk.EW, pady=(5, 0))
def _create_mode_section(self):
"""Create mode selection section."""
mode_frm = ttk.LabelFrame(self, text="Mode", padding=10)
mode_frm.pack(fill=tk.X, padx=10, pady=(0, 10))
# Mode radio buttons
modes = [
("Add Stock", 'import'),
("Update Stock", 'update'),
("Check Stock", 'get'),
("Locate Part", 'locate')
]
for i, (text, value) in enumerate(modes):
ttk.Radiobutton(
mode_frm,
text=text,
variable=self.mode,
value=value,
command=self._on_mode_change
).grid(row=0, column=i, sticky=tk.W, padx=(0, 15))
# Debug mode checkbox
ttk.Checkbutton(
mode_frm,
text="Debug Mode",
variable=self.debug_mode
).grid(row=0, column=len(modes), sticky=tk.W, padx=(20, 0))
def _create_scan_section(self):
"""Create part scanning section."""
scan_frm = ttk.LabelFrame(self, text="Part Scanning", padding=10)
scan_frm.pack(fill=tk.X, padx=10, pady=(0, 10))
ttk.Label(scan_frm, text="Scan Part:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10))
self.scan_entry = ttk.Entry(scan_frm, font=("TkDefaultFont", 11))
self.scan_entry.grid(row=0, column=1, sticky=tk.EW)
self.scan_entry.bind("<Return>", lambda e: self.process_scan())
scan_frm.columnconfigure(1, weight=1)
def _create_pending_parts_section(self):
"""Create pending parts import queue section."""
pending_frm = ttk.LabelFrame(self, text="Pending Imports (0)", padding=10)
pending_frm.pack(fill=tk.X, padx=10, pady=(0, 10))
# Create frame for pending parts list
list_frame = ttk.Frame(pending_frm)
list_frame.pack(fill=tk.BOTH, expand=True)
# Create Treeview for pending parts
columns = ('Part Code', 'Qty', 'Mode', 'Status')
self.pending_tree = ttk.Treeview(list_frame, columns=columns, show='headings', height=3)
self.pending_tree.heading('Part Code', text='Part Code')
self.pending_tree.heading('Qty', text='Qty')
self.pending_tree.heading('Mode', text='Mode')
self.pending_tree.heading('Status', text='Status')
self.pending_tree.column('Part Code', width=180)
self.pending_tree.column('Qty', width=60)
self.pending_tree.column('Mode', width=100)
self.pending_tree.column('Status', width=150)
# Scrollbar for pending parts
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.pending_tree.yview)
self.pending_tree.configure(yscrollcommand=scrollbar.set)
self.pending_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Store reference to frame for updating title
self.pending_frame = pending_frm
def _create_info_section(self):
"""Create part information display section."""
info_frm = ttk.LabelFrame(self, text="Part Information", padding=10)
info_frm.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
# Left side - Information table
table_frm = ttk.Frame(info_frm)
table_frm.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 10))
# Create Treeview for part info table
columns = ('Property', 'Value')
self.info_tree = ttk.Treeview(table_frm, columns=columns, show='headings', height=8)
self.info_tree.heading('Property', text='Property')
self.info_tree.heading('Value', text='Value')
self.info_tree.column('Property', width=120, minwidth=80)
self.info_tree.column('Value', width=300, minwidth=200)
# Scrollbar for table
scrollbar = ttk.Scrollbar(table_frm, orient=tk.VERTICAL, command=self.info_tree.yview)
self.info_tree.configure(yscrollcommand=scrollbar.set)
self.info_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Right side - Image and stock display
right_frm = ttk.Frame(info_frm)
right_frm.pack(side=tk.RIGHT)
self.stock_label = ttk.Label(
right_frm,
text="",
font=("TkDefaultFont", 16, "bold")
)
self.stock_label.pack(pady=(0, 5))
self.image_label = ttk.Label(right_frm)
self.image_label.pack()
def _create_log_section(self):
"""Create log output section."""
log_frm = ttk.LabelFrame(self, text="Activity Log", padding=10)
log_frm.pack(fill=tk.BOTH, expand=False, padx=10, pady=(0, 10))
self.log = tk.Text(log_frm, height=8, state="disabled", font=("Consolas", 9))
log_scroll = ttk.Scrollbar(log_frm, orient=tk.VERTICAL, command=self.log.yview)
self.log.configure(yscrollcommand=log_scroll.set)
self.log.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
log_scroll.pack(side=tk.RIGHT, fill=tk.Y)
# ========================================================================
# LOCATION AND STATUS MANAGEMENT
# ========================================================================
def _fetch_locations(self):
"""Fetch available locations from API."""
try:
self.locations = get_locations(self.host, self.token)
self.log_msg("✅ Loaded locations from InvenTree")
except Exception as e:
self.log_msg(f"✖ Failed to load locations: {e}")
if self.server_connected.get():
messagebox.showerror("Error", f"Failed to load locations: {e}")
def _update_status(self, *args):
"""Update status bar with current mode and location."""
mode_name = MODE_NAMES.get(self.mode.get(), self.mode.get())
if self.current_loc:
loc_name = next(
(name for loc_id, name in self.locations if loc_id == self.current_loc),
str(self.current_loc)
)
status = f"Mode: {mode_name} | Location: {self.current_loc} - {loc_name}"
else:
status = f"Mode: {mode_name} | Location: Not set"
self.status_label.config(text=status)
def process_location_scan(self):
"""Process scanned input in the location field."""
raw = self.loc_entry.get().strip()
self.loc_entry.delete(0, tk.END)
if not raw:
return
# Check for barcode commands first
if self._handle_barcode_command(raw):
return
# Normal location processing
self._process_location_input(raw)
def _process_location_input(self, code: str):
"""Process location input (formerly set_location logic)."""
location_id = None
# Check if it's a location barcode (INV-SL followed by location ID)
if code.upper().startswith('INV-SL'):
try:
location_id = int(code[6:]) # Extract ID after 'INV-SL'
self.log_msg(f"🔍 Extracted location ID {location_id} from barcode: {code}")
except (ValueError, IndexError):
self.log_msg(f"⚠ Invalid location barcode format: '{code}'")
return
# Check if it's a simple numeric location ID
elif code.isdigit():
location_id = int(code)
else:
# Try to find by name match
for loc_id, name in self.locations:
if code.lower() in name.lower():
location_id = loc_id
break
if location_id is not None:
# Find the location in our list
for loc_id, name in self.locations:
if loc_id == location_id:
self.current_loc = loc_id
self.loc_label.config(text=f"Current Location: {loc_id} {name}")
self._update_status()
self.scan_entry.focus_set()
self.log_msg(f"📍 Set location: {loc_id} - {name}")
return
self.log_msg(f"⚠ Location ID {location_id} not found in available locations")
else:
self.log_msg(f"⚠ Could not parse location from: '{code}'")
def set_location(self):
"""Legacy method - now redirects to process_location_input."""
code = self.loc_entry.get().strip()
self.loc_entry.delete(0, tk.END)
if code:
self._process_location_input(code)
def _on_mode_change(self):
"""Handle mode change via radio buttons."""
self._update_status()
# Always focus on scan field when mode changes
self.scan_entry.focus_set()
# ========================================================================
# BARCODE COMMAND HANDLING
# ========================================================================
def _handle_barcode_command(self, raw_input: str) -> bool:
"""
Handle special barcode commands.
Args:
raw_input: Raw barcode input string
Returns:
True if command was processed, False otherwise
"""
upper_input = raw_input.upper()
for mode, commands in BARCODE_COMMANDS.items():
if upper_input in commands:
if mode in ['import', 'update', 'get', 'locate']:
self.mode.set(mode)
self._update_status()
self.log_msg(f"🔄 Switched to {MODE_NAMES[mode]} mode")
self.scan_entry.focus_set()
return True
elif mode == 'debug_on':
self.debug_mode.set(True)
self.log_msg("🐛 Debug mode enabled")
self.scan_entry.focus_set()
return True
elif mode == 'debug_off':
self.debug_mode.set(False)
self.log_msg("🔇 Debug mode disabled")
self.scan_entry.focus_set()
return True
elif mode == 'change_location':
self._trigger_location_change()
return True
return False
def _trigger_location_change(self):
"""Trigger location change mode."""
self.log_msg("📍 Location change mode - scan new location barcode")
self.current_loc = None
self.loc_label.config(text="Current Location: None - Scan new location")
self._update_status()
self.loc_entry.focus_set()
# ========================================================================
# PART PROCESSING
# ========================================================================
def _process_part_scan(self, part_code: str, quantity: Optional[int]):
"""Process a scanned part."""
try:
self.log_msg(f"🔎 Looking up part: {part_code}", debug=True)
# Check if part already exists
pid = find_part(self.host, self.token, part_code)
if pid is not None:
# Part found - process normally
self.log_msg(f"✅ Found part ID: {pid}", debug=True)
self._show_part_info(pid)
self._execute_stock_operation(pid, part_code, quantity)
else:
# Part not found - queue for import
self.log_msg(f"⏳ Part '{part_code}' not found - queuing for import...")
self._queue_part_for_import(part_code, quantity)
except Exception as e:
self.log_msg(f"✖ Part lookup failed: {e}")
import traceback
self.log_msg(f"Traceback: {traceback.format_exc()}", debug=True)
def _execute_stock_operation(self, part_id: int, part_code: str, quantity: Optional[int]):
"""Execute the appropriate stock operation based on current mode."""
mode = self.mode.get()
if mode == 'import':
self._add_stock(part_id, part_code, quantity)
elif mode == 'update':
self._update_stock(part_id, part_code)
elif mode == 'locate':
self._locate_part(part_id, part_code)
else: # check mode
self._check_stock(part_id, part_code)
# ========================================================================
# PENDING PARTS MANAGEMENT
# ========================================================================
def _queue_part_for_import(self, part_code: str, quantity: Optional[int]):
"""Queue a part for background import."""
if part_code in self.pending_parts:
self.log_msg(f"⚠ Part '{part_code}' already queued for import")
return
# Create pending part entry
pending_part = PendingPart(
part_code=part_code,
quantity=quantity,
location_id=self.current_loc if self.current_loc else 0,
mode=self.mode.get(),
timestamp=datetime.now()
)
self.pending_parts[part_code] = pending_part
# Queue for import
self.import_worker.queue_import(part_code)
# Update UI
self._update_pending_parts_ui()
self.log_msg(f"📋 Added '{part_code}' to import queue - continue scanning other parts")
def _on_import_complete(self, result: ImportResult):
"""
Handle import completion callback (runs on worker thread).
Schedule UI update on main thread.
"""
self.after(0, lambda: self._process_import_result(result))
def _process_import_result(self, result: ImportResult):
"""Process import result on main thread."""
pending_part = self.pending_parts.get(result.part_code)
if not pending_part:
return
if result.success:
self.log_msg(f"✅ Import complete for '{result.part_code}' (ID: {result.part_id})")
# Execute the pending operation
try:
self._show_part_info(result.part_id)
self._execute_stock_operation(result.part_id, result.part_code, pending_part.quantity)
self.log_msg(f"✔ Processed pending operation for '{result.part_code}'")
except Exception as e:
self.log_msg(f"✖ Error processing '{result.part_code}': {e}")
# Remove from pending
del self.pending_parts[result.part_code]
else:
# Import failed
pending_part.retry_count += 1
if pending_part.retry_count >= 3:
self.log_msg(f"✖ Import failed for '{result.part_code}' after 3 attempts: {result.error}")
del self.pending_parts[result.part_code]
else:
self.log_msg(f"⚠ Import failed for '{result.part_code}': {result.error} (retry {pending_part.retry_count}/3)")
# Retry
self.import_worker.queue_import(result.part_code)
# Update UI
self._update_pending_parts_ui()
def _update_pending_parts_ui(self):
"""Update the pending parts UI display."""
# Clear existing items
for item in self.pending_tree.get_children():
self.pending_tree.delete(item)
# Update frame title with count
count = len(self.pending_parts)
self.pending_frame.config(text=f"Pending Imports ({count})")
# Add pending parts to tree
for part_code, pending_part in self.pending_parts.items():
qty_str = str(pending_part.quantity) if pending_part.quantity else "N/A"
mode_str = MODE_NAMES.get(pending_part.mode, pending_part.mode)
status = f"Importing... (attempt {pending_part.retry_count + 1})"
self.pending_tree.insert('', 'end', values=(
part_code,
qty_str,
mode_str,
status
))
def _on_closing(self):
"""Handle application closing."""
self.log_msg("Shutting down import worker...")
self.import_worker.stop()
self.destroy()
# ========================================================================
# STOCK OPERATIONS
# ========================================================================
def _add_stock(self, part_id: int, part_code: str, quantity: Optional[int]):
"""Add stock for a part."""
if quantity is None:
self.log_msg("⚠ No quantity found in scan.")
return
try:
# Check if stock item already exists at this location
existing = find_stock_item(self.host, self.token, part_id, self.current_loc)
current_stock = get_stock_level(self.host, self.token, part_id, self.current_loc)
if existing:
# Stock item exists - update the quantity by adding to it
sid = existing.get('pk', existing.get('id'))
new_stock = current_stock + quantity
# Use PATCH to update the quantity
r = requests.patch(
f"{self.host}/api/stock/{sid}/",
headers=self.headers,
json={'quantity': new_stock}
)
r.raise_for_status()
self.log_msg(f"✔ Added {quantity}× to existing '{part_code}' (StockItem #{sid})")
self.log_msg(f"📊 Stock: {current_stock}{new_stock} (+{quantity})")
else:
# No existing stock - create new stock item
r = requests.post(
f"{self.host}/api/stock/",
headers=self.headers,
json={'part': part_id, 'location': self.current_loc, 'quantity': quantity}
)
r.raise_for_status()
# Handle response - might be list or dict
response_data = r.json()
if isinstance(response_data, list):
stock_item = response_data[0] if response_data else {}
else:
stock_item = response_data
sid = stock_item.get('pk', stock_item.get('id'))
new_stock = quantity
self.log_msg(f"✔ Created new stock item for '{part_code}' → StockItem #{sid}")
self.log_msg(f"📊 Stock: 0 → {new_stock} (+{quantity})")
self.stock_label.config(text=f"Stock: {new_stock}")
except Exception as e:
self.log_msg(f"✖ Error adding stock: {e}")
import traceback
self.log_msg(f"Traceback: {traceback.format_exc()}", debug=True)
def _update_stock(self, part_id: int, part_code: str):
"""Update stock for a part."""
try:
current_stock = get_stock_level(self.host, self.token, part_id, self.current_loc)
new_qty = simpledialog.askinteger(
"Quantity",
f"Enter current stock level for '{part_code}' (currently: {current_stock}):",
parent=self
)
if new_qty is None:
self.scan_entry.focus_set()
return
existing = find_stock_item(self.host, self.token, part_id, self.current_loc)
if existing:
sid = existing['pk']
r = requests.patch(
f"{self.host}/api/stock/{sid}/",
headers=self.headers,
json={'quantity': new_qty}
)
r.raise_for_status()
change = new_qty - current_stock
change_str = f"({'+' if change >= 0 else ''}{change})" if change != 0 else "(no change)"
self.log_msg(f"✔ Updated '{part_code}' stock (StockItem #{sid})")
self.log_msg(f"📊 Stock: {current_stock}{new_qty} {change_str}")
else:
r = requests.post(
f"{self.host}/api/stock/",
headers=self.headers,
json={'part': part_id, 'location': self.current_loc, 'quantity': new_qty}
)
r.raise_for_status()
# Handle response - might be list or dict
response_data = r.json()
if isinstance(response_data, list):
stock_item = response_data[0] if response_data else {}
else:
stock_item = response_data
sid = stock_item.get('pk', stock_item.get('id'))
self.log_msg(f"✔ Created '{part_code}' stock → StockItem #{sid}")
self.log_msg(f"📊 Stock: 0 → {new_qty} (+{new_qty})")
self.stock_label.config(text=f"Stock: {new_qty}")
self.scan_entry.focus_set()
except Exception as e:
self.log_msg(f"✖ Error updating stock: {e}")
self.scan_entry.focus_set()
def _check_stock(self, part_id: int, part_code: str):
"""Check current stock level for a part."""
self.log_msg(f"📊 Getting stock level for part {part_id} at location {self.current_loc}", debug=True)
try:
current = get_stock_level(self.host, self.token, part_id, self.current_loc)
self.log_msg(f" Current stock for '{part_code}' at location {self.current_loc}: {current}")
except Exception as e:
self.log_msg(f"✖ Error fetching stock level: {e}")
def _locate_part(self, part_id: int, part_code: str):
"""Locate where this part is stored and show all locations."""
self.log_msg(f"📍 Locating all instances of '{part_code}'...")
try:
# Get all stock locations for this part
stock_items = get_part_locations(self.host, self.token, part_id)
if not stock_items:
self.log_msg(f"📭 No stock found for '{part_code}' in any location")
return
self.log_msg(f"📦 Found '{part_code}' in {len(stock_items)} location(s):")
total_stock = 0
locations_info = []
for item in stock_items:
loc_id = item.get('location')
quantity = item.get('quantity', 0)
total_stock += quantity
if loc_id:
try:
# Get location details
location = get_location_details(self.host, self.token, loc_id)
loc_name = location.get('name', f'Location {loc_id}')
loc_path = location.get('pathstring', '')
if loc_path:
full_location = f"{loc_path}"
else:
full_location = loc_name
locations_info.append((loc_id, full_location, quantity))
self.log_msg(f" 📌 Location {loc_id}: {full_location} - Qty: {quantity}")
except Exception as e:
# Fallback if we can't get location details
locations_info.append((loc_id, f"Location {loc_id}", quantity))
self.log_msg(f" 📌 Location {loc_id}: Qty: {quantity}")
else:
self.log_msg(f" 📌 Unknown location - Qty: {quantity}")
self.log_msg(f"📊 Total stock across all locations: {total_stock}")
# Update the table with location information
if hasattr(self, 'info_tree'):
# Add location summary to the part info table
if locations_info:
self.info_tree.insert('', 'end', values=('Total Locations', len(locations_info)))
self.info_tree.insert('', 'end', values=('Total Stock', total_stock))
# Add each location as a separate row
for loc_id, loc_name, qty in locations_info:
self.info_tree.insert('', 'end', values=(f'📍 {loc_name}', f'Qty: {qty}'))
except Exception as e:
self.log_msg(f"✖ Error locating part: {e}")
import traceback
self.log_msg(f"Debug traceback: {traceback.format_exc()}", debug=True)
# ========================================================================
# PART INFO DISPLAY
# ========================================================================
def _show_part_info(self, part_id: int):
"""Display part information in the table."""
self.log_msg(f"🔍 Loading part info for part ID: {part_id}", debug=True)
if not hasattr(self, 'info_tree'):
self.log_msg("✖ Error: info_tree not found! UI may not be initialized properly.")
return
try:
# Clear existing table data
for item in self.info_tree.get_children():
self.info_tree.delete(item)
# Get part info
info = get_part_info(self.host, self.token, part_id)
# Add basic part info to table
desc = info.get('description', '').strip() or 'No description.'
self.info_tree.insert('', 'end', values=('Description', desc))
if info.get('name'):
self.info_tree.insert('', 'end', values=('Name', info.get('name')))
if info.get('IPN'):
self.info_tree.insert('', 'end', values=('IPN', info.get('IPN')))
# Update stock display
if self.current_loc is not None and self.mode.get() != 'locate':
try:
current_stock = get_stock_level(self.host, self.token, part_id, self.current_loc)
self.stock_label.config(text=f"Stock: {current_stock}")
except Exception as e:
self.stock_label.config(text="Stock: ?")
self.log_msg(f"⚠ Stock level error: {e}", debug=True)
else:
self.stock_label.config(text="")
# Load and display image
self._load_part_image(info)
# Load and display parameters (only if not in locate mode)
if self.mode.get() != 'locate':
self._load_part_parameters(part_id)
except Exception as e:
self.log_msg(f"✖ Error loading part info: {e}")
def _load_part_image(self, part_info: Dict[str, Any]):
"""Load and display part image."""
self.log_msg("🖼️ Loading image...", debug=True)
img_url = part_info.get('image') or part_info.get('thumbnail')
if img_url:
if img_url.startswith('/'):
img_url = f"{self.host}{img_url}"
try:
resp = requests.get(img_url, headers={'Authorization': f'Token {self.token}'})
resp.raise_for_status()
img = Image.open(BytesIO(resp.content))
img.thumbnail((256, 256))
self.part_image = ImageTk.PhotoImage(img)
self.image_label.config(image=self.part_image)
self.log_msg("✅ Image loaded successfully", debug=True)
except Exception as e:
self.image_label.config(image='')
self.log_msg(f"⚠ Image load error: {e}", debug=True)
else:
self.image_label.config(image='')
self.log_msg(" No image URL found", debug=True)
def _load_part_parameters(self, part_id: int):
"""Load and display part parameters."""
self.log_msg(f"🔧 Starting parameter loading for part {part_id}", debug=True)
try:
params = get_part_parameters(self.host, self.token, part_id)
self.log_msg(f"📋 Found {len(params)} parameters for part {part_id}", debug=True)
param_names = [p.get('template_detail', {}).get('name') for p in params]
self.log_msg(f"Available parameters: {param_names}", debug=True)
params_displayed = 0
for p in params:
template_name = p.get('template_detail', {}).get('name')
pval = p.get('data')
if template_name and pval:
self.info_tree.insert('', 'end', values=(template_name, pval))
params_displayed += 1
self.log_msg(f"{template_name}: {pval}", debug=True)
if params_displayed == 0:
self.log_msg("⚠ No parameters found to display", debug=True)
else:
self.log_msg(f"✔ Displayed {params_displayed} parameters", debug=True)
except Exception as e:
self.log_msg(f"✖ Could not load parameters: {e}")
import traceback
self.log_msg(f"Debug traceback: {traceback.format_exc()}", debug=True)
# ========================================================================
# UTILITY METHODS
# ========================================================================
def log_msg(self, msg: str, debug: bool = False):
"""
Log a message to the activity log.
Args:
msg: Message to log
debug: If True, only show when debug mode is enabled
"""
if debug and not self.debug_mode.get():
return
self.log.config(state="normal")
self.log.insert(tk.END, msg + "\n")
self.log.see(tk.END)
self.log.config(state="disabled")
def process_scan(self):
"""Process a scanned barcode or part code."""
self.log_msg("🔄 Starting process_scan()", debug=True)
# Get and clear the scan input first
raw = self.scan_entry.get().strip()
self.scan_entry.delete(0, tk.END)
if not raw:
return
# Check server connection before processing
if not self.server_connected.get():
messagebox.showwarning("No Connection", "No connection to InvenTree server. Please check your connection.")
return
self.log_msg(f"📄 Raw scan input: {raw}", debug=True)
# Check for barcode commands first
if self._handle_barcode_command(raw):
return
# For locate mode, we don't need a current location
if self.current_loc is None and self.mode.get() != 'locate':
messagebox.showwarning("No Location", "Please scan a location first.")
return
# Normal part processing
part, qty = parse_scan(raw)
self.log_msg(f"🔍 Parsed part: {part}, qty: {qty}", debug=True)
if not part:
self.log_msg("⚠ Could not parse part code.")
return
self._process_part_scan(part, qty)
self.log_msg("🏁 Finished process_scan()", debug=True)
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
def main():
"""Main entry point for the application."""
try:
app = StockApp()
app.mainloop()
except KeyboardInterrupt:
print("\nApplication interrupted by user")
sys.exit(0)
except Exception as e:
print(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()