feat: Add async background part import queue

Implements non-blocking part import to prevent UI freezing when scanning unknown parts.

Features:
- Background import worker thread processes unknown parts
- New "Pending Imports" UI section shows import progress
- User can continue scanning other parts while imports run
- Automatic retry on failure (up to 3 attempts)
- Parts automatically processed when import completes

Changes:
- Added PendingPart and ImportResult data structures
- Added PartImportWorker background thread class
- Replaced blocking find_or_import_part() with async find_part()
- Added pending parts queue UI with status display
- Added _on_import_complete() callback handler
- Added _update_pending_parts_ui() for real-time updates
- Added proper cleanup on window close

Benefits:
- No more 30+ second UI freezes during part imports
- Can scan multiple unknown parts in quick succession
- Visual feedback showing import status for each part
- Automatic error handling and retry logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-29 11:07:50 +07:00
parent 9c742af585
commit 0fdc319774
3 changed files with 398 additions and 41 deletions

View File

@@ -22,10 +22,14 @@ import subprocess
import requests
import yaml
import sys
import threading
import queue
from pathlib import Path
from io import BytesIO
from PIL import Image, ImageTk
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from datetime import datetime
# ============================================================================
# CONFIGURATION & CONSTANTS
@@ -58,6 +62,134 @@ MODE_NAMES = {
'locate': 'Locate Part'
}
# ============================================================================
# IMPORT QUEUE DATA STRUCTURES
# ============================================================================
@dataclass
class PendingPart:
"""Represents a part pending import."""
part_code: str
quantity: Optional[int]
location_id: int
mode: str
timestamp: datetime
retry_count: int = 0
def __str__(self):
return f"{self.part_code} (Qty: {self.quantity or 'N/A'})"
@dataclass
class ImportResult:
"""Result of a part import operation."""
part_code: str
success: bool
part_id: Optional[int] = None
error: Optional[str] = None
class PartImportWorker:
"""Background worker for importing parts."""
def __init__(self, host: str, token: str, callback):
"""
Initialize the import worker.
Args:
host: InvenTree server URL
token: API authentication token
callback: Function to call with ImportResult when import completes
"""
self.host = host
self.token = token
self.callback = callback
self.import_queue = queue.Queue()
self.running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
def queue_import(self, part_code: str):
"""Queue a part for import."""
self.import_queue.put(part_code)
def _worker_loop(self):
"""Main worker loop - processes import queue."""
while self.running:
try:
part_code = self.import_queue.get(timeout=1)
result = self._import_part(part_code)
# Call callback on main thread
self.callback(result)
self.import_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Worker error: {e}")
def _import_part(self, part_code: str) -> ImportResult:
"""
Import a single part.
Args:
part_code: Part code to import
Returns:
ImportResult with success/failure information
"""
try:
# Run the import tool
result = subprocess.run(
['inventree-part-import', part_code],
check=True,
capture_output=True,
text=True,
timeout=30
)
# Search for the part to get its ID
url = f"{self.host}/api/part/"
headers = {'Authorization': f'Token {self.token}'}
r = requests.get(url, headers=headers, params={'search': part_code})
r.raise_for_status()
raw = r.json()
results = raw.get('results', []) if isinstance(raw, dict) else raw
if results:
part_id = results[0].get('pk', results[0].get('id'))
return ImportResult(part_code=part_code, success=True, part_id=part_id)
else:
return ImportResult(
part_code=part_code,
success=False,
error="Part not found after import"
)
except subprocess.TimeoutExpired:
return ImportResult(
part_code=part_code,
success=False,
error="Import timeout (30s)"
)
except subprocess.CalledProcessError as e:
return ImportResult(
part_code=part_code,
success=False,
error=f"Import failed: {e.stderr}"
)
except Exception as e:
return ImportResult(
part_code=part_code,
success=False,
error=str(e)
)
def stop(self):
"""Stop the worker thread."""
self.running = False
if self.worker_thread.is_alive():
self.worker_thread.join(timeout=2)
# ============================================================================
# API FUNCTIONS
# ============================================================================
@@ -108,46 +240,32 @@ def get_locations(host: str, token: str) -> List[Tuple[int, str]]:
return [(loc.get('id', loc.get('pk')), loc.get('name', '')) for loc in locations]
def find_or_import_part(host: str, token: str, part_code: str) -> int:
def find_part(host: str, token: str, part_code: str) -> Optional[int]:
"""
Find part by code or import if not found.
Find part by code (non-blocking).
Args:
host: InvenTree server URL
token: API authentication token
part_code: Part code to search for
Returns:
Part ID
Raises:
RuntimeError: If part cannot be found or imported
Part ID if found, None if not found
"""
url = f"{host}/api/part/"
headers = {'Authorization': f'Token {token}'}
# First, try to find existing part
r = requests.get(url, headers=headers, params={'search': part_code})
r.raise_for_status()
raw = r.json()
results = raw.get('results', []) if isinstance(raw, dict) else raw
if results:
return results[0].get('pk', results[0].get('id'))
# Part not found, try to import
subprocess.run(['inventree-part-import', part_code], check=True)
# Search again after import
r = requests.get(url, headers=headers, params={'search': part_code})
r.raise_for_status()
raw = r.json()
results = raw.get('results', []) if isinstance(raw, dict) else raw
if not results:
raise RuntimeError(f"Unable to import part {part_code}")
return results[0].get('pk', results[0].get('id'))
try:
r = requests.get(url, headers=headers, params={'search': part_code})
r.raise_for_status()
raw = r.json()
results = raw.get('results', []) if isinstance(raw, dict) else raw
if results:
return results[0].get('pk', results[0].get('id'))
return None
except Exception:
return None
def get_part_info(host: str, token: str, part_id: int) -> Dict[str, Any]:
@@ -389,12 +507,19 @@ class StockApp(tk.Tk):
# Connection monitoring
self.server_connected = tk.BooleanVar(value=False)
self.alive_state = tk.BooleanVar(value=False)
# Part import queue
self.pending_parts: Dict[str, PendingPart] = {} # part_code -> PendingPart
self.import_worker = PartImportWorker(self.host, self.token, self._on_import_complete)
# Initialize UI and start monitoring
self._build_ui()
self._start_connection_monitoring()
self.after(100, lambda: self.loc_entry.focus_set())
# Register cleanup on window close
self.protocol("WM_DELETE_WINDOW", self._on_closing)
# ========================================================================
# CONNECTION MONITORING
# ========================================================================
@@ -458,9 +583,10 @@ class StockApp(tk.Tk):
self._create_location_section()
self._create_mode_section()
self._create_scan_section()
self._create_pending_parts_section()
self._create_info_section()
self._create_log_section()
# Bind mode change to status update
self.mode.trace('w', self._update_status)
@@ -558,6 +684,37 @@ class StockApp(tk.Tk):
self.scan_entry.bind("<Return>", lambda e: self.process_scan())
scan_frm.columnconfigure(1, weight=1)
def _create_pending_parts_section(self):
"""Create pending parts import queue section."""
pending_frm = ttk.LabelFrame(self, text="Pending Imports (0)", padding=10)
pending_frm.pack(fill=tk.X, padx=10, pady=(0, 10))
# Create frame for pending parts list
list_frame = ttk.Frame(pending_frm)
list_frame.pack(fill=tk.BOTH, expand=True)
# Create Treeview for pending parts
columns = ('Part Code', 'Qty', 'Mode', 'Status')
self.pending_tree = ttk.Treeview(list_frame, columns=columns, show='headings', height=3)
self.pending_tree.heading('Part Code', text='Part Code')
self.pending_tree.heading('Qty', text='Qty')
self.pending_tree.heading('Mode', text='Mode')
self.pending_tree.heading('Status', text='Status')
self.pending_tree.column('Part Code', width=180)
self.pending_tree.column('Qty', width=60)
self.pending_tree.column('Mode', width=100)
self.pending_tree.column('Status', width=150)
# Scrollbar for pending parts
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.pending_tree.yview)
self.pending_tree.configure(yscrollcommand=scrollbar.set)
self.pending_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Store reference to frame for updating title
self.pending_frame = pending_frm
def _create_info_section(self):
"""Create part information display section."""
info_frm = ttk.LabelFrame(self, text="Part Information", padding=10)
@@ -760,21 +917,29 @@ class StockApp(tk.Tk):
"""Process a scanned part."""
try:
self.log_msg(f"🔎 Looking up part: {part_code}", debug=True)
pid = find_or_import_part(self.host, self.token, part_code)
self.log_msg(f"✅ Found part ID: {pid}", debug=True)
self._show_part_info(pid)
self._execute_stock_operation(pid, part_code, quantity)
# Check if part already exists
pid = find_part(self.host, self.token, part_code)
if pid is not None:
# Part found - process normally
self.log_msg(f"✅ Found part ID: {pid}", debug=True)
self._show_part_info(pid)
self._execute_stock_operation(pid, part_code, quantity)
else:
# Part not found - queue for import
self.log_msg(f"⏳ Part '{part_code}' not found - queuing for import...")
self._queue_part_for_import(part_code, quantity)
except Exception as e:
self.log_msg(f"✖ Part lookup/import failed: {e}")
self.log_msg(f"✖ Part lookup failed: {e}")
import traceback
self.log_msg(f"Traceback: {traceback.format_exc()}", debug=True)
def _execute_stock_operation(self, part_id: int, part_code: str, quantity: Optional[int]):
"""Execute the appropriate stock operation based on current mode."""
mode = self.mode.get()
if mode == 'import':
self._add_stock(part_id, part_code, quantity)
elif mode == 'update':
@@ -784,6 +949,102 @@ class StockApp(tk.Tk):
else: # check mode
self._check_stock(part_id, part_code)
# ========================================================================
# PENDING PARTS MANAGEMENT
# ========================================================================
def _queue_part_for_import(self, part_code: str, quantity: Optional[int]):
"""Queue a part for background import."""
if part_code in self.pending_parts:
self.log_msg(f"⚠ Part '{part_code}' already queued for import")
return
# Create pending part entry
pending_part = PendingPart(
part_code=part_code,
quantity=quantity,
location_id=self.current_loc if self.current_loc else 0,
mode=self.mode.get(),
timestamp=datetime.now()
)
self.pending_parts[part_code] = pending_part
# Queue for import
self.import_worker.queue_import(part_code)
# Update UI
self._update_pending_parts_ui()
self.log_msg(f"📋 Added '{part_code}' to import queue - continue scanning other parts")
def _on_import_complete(self, result: ImportResult):
"""
Handle import completion callback (runs on worker thread).
Schedule UI update on main thread.
"""
self.after(0, lambda: self._process_import_result(result))
def _process_import_result(self, result: ImportResult):
"""Process import result on main thread."""
pending_part = self.pending_parts.get(result.part_code)
if not pending_part:
return
if result.success:
self.log_msg(f"✅ Import complete for '{result.part_code}' (ID: {result.part_id})")
# Execute the pending operation
try:
self._show_part_info(result.part_id)
self._execute_stock_operation(result.part_id, result.part_code, pending_part.quantity)
self.log_msg(f"✔ Processed pending operation for '{result.part_code}'")
except Exception as e:
self.log_msg(f"✖ Error processing '{result.part_code}': {e}")
# Remove from pending
del self.pending_parts[result.part_code]
else:
# Import failed
pending_part.retry_count += 1
if pending_part.retry_count >= 3:
self.log_msg(f"✖ Import failed for '{result.part_code}' after 3 attempts: {result.error}")
del self.pending_parts[result.part_code]
else:
self.log_msg(f"⚠ Import failed for '{result.part_code}': {result.error} (retry {pending_part.retry_count}/3)")
# Retry
self.import_worker.queue_import(result.part_code)
# Update UI
self._update_pending_parts_ui()
def _update_pending_parts_ui(self):
"""Update the pending parts UI display."""
# Clear existing items
for item in self.pending_tree.get_children():
self.pending_tree.delete(item)
# Update frame title with count
count = len(self.pending_parts)
self.pending_frame.config(text=f"Pending Imports ({count})")
# Add pending parts to tree
for part_code, pending_part in self.pending_parts.items():
qty_str = str(pending_part.quantity) if pending_part.quantity else "N/A"
mode_str = MODE_NAMES.get(pending_part.mode, pending_part.mode)
status = f"Importing... (attempt {pending_part.retry_count + 1})"
self.pending_tree.insert('', 'end', values=(
part_code,
qty_str,
mode_str,
status
))
def _on_closing(self):
"""Handle application closing."""
self.log_msg("Shutting down import worker...")
self.import_worker.stop()
self.destroy()
# ========================================================================
# STOCK OPERATIONS
# ========================================================================