Files
inventree-stock-tool/src/stocktool/web/app.py
grabowski ed9a3307ef fix: Add authenticated image proxy for part images
Fixes 'Unauthorized' errors when loading part images in web app.

Problem:
- InvenTree media files require authentication
- Direct image URLs return 401 Unauthorized
- Browser can't send API token with image requests

Solution:
- Added /api/proxy/image endpoint in Flask app
- Proxy fetches images with API token authentication
- Returns image with correct Content-Type header
- Frontend uses proxy URL instead of direct InvenTree URL

Usage:
- Images now load via: /api/proxy/image?url=/media/part_images/...
- Proxy adds Authorization header automatically
- Works for both image and thumbnail URLs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-29 11:36:44 +07:00

482 lines
14 KiB
Python

#!/usr/bin/env python3
"""
InvenTree Stock Tool - Web Application
A Flask-based web interface for barcode scanning and inventory management.
"""
import os
import sys
from pathlib import Path
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
from flask_cors import CORS
import yaml
import requests
from typing import Dict, List, Tuple, Optional
from datetime import datetime
from dataclasses import asdict
# Import core functionality from main app
sys.path.insert(0, str(Path(__file__).parent.parent))
from stock_tool_gui_v2 import (
find_part, get_locations, get_part_info, get_part_parameters,
get_part_locations, get_location_details, find_stock_item,
get_stock_level, parse_scan, lookup_barcode,
PendingPart, ImportResult, PartImportWorker, MODE_NAMES, BARCODE_COMMANDS
)
# Initialize Flask app
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")
# Global state
config = {}
pending_parts: Dict[str, PendingPart] = {}
import_worker: Optional[PartImportWorker] = None
def load_config():
"""Load configuration from YAML file."""
config_file = Path.home() / '.config' / 'scan_and_import.yaml'
if not config_file.exists():
raise FileNotFoundError(f"Config file {config_file} not found")
data = yaml.safe_load(config_file.read_text())
host = data.get('host')
token = data.get('token')
if not host or not token:
raise ValueError("Both 'host' and 'token' must be set in config file")
return {
'host': host.rstrip('/'),
'token': token,
'headers': {
'Authorization': f'Token {token}',
'Content-Type': 'application/json'
}
}
def on_import_complete(result: ImportResult):
"""Handle import completion callback."""
pending_part = pending_parts.get(result.part_code)
if not pending_part:
return
if result.success:
# Emit success via WebSocket
socketio.emit('import_complete', {
'part_code': result.part_code,
'part_id': result.part_id,
'success': True,
'pending_part': asdict(pending_part)
})
# Remove from pending
del pending_parts[result.part_code]
else:
# Handle failure with retry
pending_part.retry_count += 1
if pending_part.retry_count >= 3:
socketio.emit('import_failed', {
'part_code': result.part_code,
'error': result.error,
'retry_count': pending_part.retry_count
})
del pending_parts[result.part_code]
else:
# Retry
import_worker.queue_import(result.part_code)
socketio.emit('import_retry', {
'part_code': result.part_code,
'error': result.error,
'retry_count': pending_part.retry_count
})
# Routes
@app.route('/')
def index():
"""Render main application page."""
return render_template('index.html')
@app.route('/api/config')
def get_config():
"""Get application configuration (without sensitive data)."""
return jsonify({
'host': config['host'],
'modes': MODE_NAMES,
'barcode_commands': BARCODE_COMMANDS
})
@app.route('/api/locations')
def api_get_locations():
"""Get all available locations."""
try:
locations = get_locations(config['host'], config['token'])
return jsonify({
'success': True,
'locations': [{'id': loc_id, 'name': name} for loc_id, name in locations]
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/part/search', methods=['POST'])
def search_part():
"""Search for a part by code."""
data = request.json
part_code = data.get('part_code')
if not part_code:
return jsonify({'success': False, 'error': 'part_code required'}), 400
try:
part_id = find_part(config['host'], config['token'], part_code)
if part_id:
# Get part details
part_info = get_part_info(config['host'], config['token'], part_id)
parameters = get_part_parameters(config['host'], config['token'], part_id)
return jsonify({
'success': True,
'found': True,
'part_id': part_id,
'part_info': part_info,
'parameters': parameters
})
else:
return jsonify({
'success': True,
'found': False
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/part/import', methods=['POST'])
def queue_part_import():
"""Queue a part for background import."""
data = request.json
part_code = data.get('part_code')
quantity = data.get('quantity')
location_id = data.get('location_id')
mode = data.get('mode', 'import')
if not part_code:
return jsonify({'success': False, 'error': 'part_code required'}), 400
# Check if already queued
if part_code in pending_parts:
return jsonify({'success': False, 'error': 'Part already queued for import'}), 400
# Create pending part
pending_part = PendingPart(
part_code=part_code,
quantity=quantity,
location_id=location_id or 0,
mode=mode,
timestamp=datetime.now()
)
pending_parts[part_code] = pending_part
# Queue for import
import_worker.queue_import(part_code)
return jsonify({
'success': True,
'pending_part': asdict(pending_part)
})
@app.route('/api/stock/add', methods=['POST'])
def add_stock():
"""Add stock for a part."""
data = request.json
part_id = data.get('part_id')
location_id = data.get('location_id')
quantity = data.get('quantity')
if not all([part_id, location_id, quantity]):
return jsonify({'success': False, 'error': 'part_id, location_id, and quantity required'}), 400
try:
# Check if stock item already exists
existing = find_stock_item(config['host'], config['token'], part_id, location_id)
current_stock = get_stock_level(config['host'], config['token'], part_id, location_id)
if existing:
# Update existing
sid = existing.get('pk', existing.get('id'))
new_stock = current_stock + quantity
r = requests.patch(
f"{config['host']}/api/stock/{sid}/",
headers=config['headers'],
json={'quantity': new_stock}
)
r.raise_for_status()
return jsonify({
'success': True,
'action': 'updated',
'stock_item_id': sid,
'previous_quantity': current_stock,
'new_quantity': new_stock,
'added': quantity
})
else:
# Create new
r = requests.post(
f"{config['host']}/api/stock/",
headers=config['headers'],
json={'part': part_id, 'location': location_id, 'quantity': quantity}
)
r.raise_for_status()
response_data = r.json()
if isinstance(response_data, list):
stock_item = response_data[0] if response_data else {}
else:
stock_item = response_data
sid = stock_item.get('pk', stock_item.get('id'))
return jsonify({
'success': True,
'action': 'created',
'stock_item_id': sid,
'quantity': quantity
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/stock/update', methods=['POST'])
def update_stock():
"""Update stock quantity for a part."""
data = request.json
part_id = data.get('part_id')
location_id = data.get('location_id')
quantity = data.get('quantity')
if not all([part_id, location_id, quantity is not None]):
return jsonify({'success': False, 'error': 'part_id, location_id, and quantity required'}), 400
try:
existing = find_stock_item(config['host'], config['token'], part_id, location_id)
current_stock = get_stock_level(config['host'], config['token'], part_id, location_id)
if existing:
sid = existing['pk']
r = requests.patch(
f"{config['host']}/api/stock/{sid}/",
headers=config['headers'],
json={'quantity': quantity}
)
r.raise_for_status()
return jsonify({
'success': True,
'action': 'updated',
'stock_item_id': sid,
'previous_quantity': current_stock,
'new_quantity': quantity
})
else:
# Create new
r = requests.post(
f"{config['host']}/api/stock/",
headers=config['headers'],
json={'part': part_id, 'location': location_id, 'quantity': quantity}
)
r.raise_for_status()
response_data = r.json()
if isinstance(response_data, list):
stock_item = response_data[0] if response_data else {}
else:
stock_item = response_data
sid = stock_item.get('pk', stock_item.get('id'))
return jsonify({
'success': True,
'action': 'created',
'stock_item_id': sid,
'quantity': quantity
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/stock/check', methods=['POST'])
def check_stock():
"""Check stock level for a part at location."""
data = request.json
part_id = data.get('part_id')
location_id = data.get('location_id')
if not all([part_id, location_id]):
return jsonify({'success': False, 'error': 'part_id and location_id required'}), 400
try:
quantity = get_stock_level(config['host'], config['token'], part_id, location_id)
return jsonify({
'success': True,
'quantity': quantity
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/part/locate', methods=['POST'])
def locate_part():
"""Find all locations where a part is stored."""
data = request.json
part_id = data.get('part_id')
if not part_id:
return jsonify({'success': False, 'error': 'part_id required'}), 400
try:
stock_items = get_part_locations(config['host'], config['token'], part_id)
locations_info = []
total_stock = 0
for item in stock_items:
loc_id = item.get('location')
quantity = item.get('quantity', 0)
total_stock += quantity
if loc_id:
try:
location = get_location_details(config['host'], config['token'], loc_id)
loc_name = location.get('name', f'Location {loc_id}')
loc_path = location.get('pathstring', '')
locations_info.append({
'location_id': loc_id,
'location_name': loc_name,
'location_path': loc_path,
'quantity': quantity
})
except Exception:
locations_info.append({
'location_id': loc_id,
'location_name': f'Location {loc_id}',
'location_path': '',
'quantity': quantity
})
return jsonify({
'success': True,
'locations': locations_info,
'total_stock': total_stock
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/pending')
def get_pending():
"""Get all pending part imports."""
return jsonify({
'success': True,
'pending': [asdict(p) for p in pending_parts.values()]
})
@app.route('/api/proxy/image')
def proxy_image():
"""Proxy image requests with authentication."""
image_url = request.args.get('url')
if not image_url:
return jsonify({'error': 'url parameter required'}), 400
# If it's a relative URL, make it absolute
if image_url.startswith('/'):
image_url = config['host'] + image_url
try:
# Fetch image with authentication
response = requests.get(image_url, headers={'Authorization': f"Token {config['token']}"})
response.raise_for_status()
# Return image with correct content type
content_type = response.headers.get('Content-Type', 'image/jpeg')
return response.content, 200, {'Content-Type': content_type}
except Exception as e:
return jsonify({'error': str(e)}), 500
# WebSocket events
@socketio.on('connect')
def handle_connect():
"""Handle client connection."""
emit('connected', {'message': 'Connected to InvenTree Stock Tool'})
@socketio.on('disconnect')
def handle_disconnect():
"""Handle client disconnection."""
print('Client disconnected')
@socketio.on('parse_barcode')
def handle_parse_barcode(data):
"""Parse a scanned barcode."""
raw_barcode = data.get('barcode', '')
part_code, quantity = parse_scan(raw_barcode)
emit('barcode_parsed', {
'raw': raw_barcode,
'part_code': part_code,
'quantity': quantity
})
def main():
"""Main entry point for web application."""
global config, import_worker
# Load configuration
try:
config = load_config()
print(f"✅ Connected to InvenTree: {config['host']}")
except Exception as e:
print(f"✖ Configuration error: {e}")
sys.exit(1)
# Initialize import worker
import_worker = PartImportWorker(config['host'], config['token'], on_import_complete)
# Start Flask server
print("🚀 Starting InvenTree Stock Tool Web Server...")
print("📱 Open your browser to: http://localhost:5000")
print("Press Ctrl+C to stop")
try:
socketio.run(app, host='0.0.0.0', port=5000, debug=True, allow_unsafe_werkzeug=True)
except KeyboardInterrupt:
print("\n👋 Shutting down...")
import_worker.stop()
if __name__ == '__main__':
main()