From ca4532fa39b5c93104fc17cab2d14a6c42788d0c Mon Sep 17 00:00:00 2001 From: grabowski Date: Mon, 18 Aug 2025 11:28:55 +0700 Subject: [PATCH] inventory management add/del --- main.py | 438 ++++++++++++++++++++++++++++++++++++++- templates/base.html | 13 ++ templates/inventory.html | 281 +++++++++++++++++++++++++ templates/results.html | 229 +++++++++++++++++++- 4 files changed, 948 insertions(+), 13 deletions(-) create mode 100644 templates/inventory.html diff --git a/main.py b/main.py index 0db1351..ce1268e 100644 --- a/main.py +++ b/main.py @@ -4,13 +4,14 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import httpx import asyncio -from typing import Optional, List, Union +from typing import Optional, List, Union, Dict, Any import json import os from dotenv import load_dotenv import discogs_client as discogs from enum import Enum from pydantic import BaseModel +from datetime import datetime # Load environment variables load_dotenv() @@ -32,13 +33,37 @@ class MediaItem(BaseModel): title: str artist: Optional[str] = None author: Optional[str] = None - year: str + year: Union[str, int, None] = None isbn: Optional[str] = None label: Optional[str] = None format: Optional[str] = None cover_url: Optional[str] = None openlibrary_url: Optional[str] = None discogs_url: Optional[str] = None + + class Config: + extra = "ignore" + str_strip_whitespace = True + + def __init__(self, **data): + # Convert year to string if it's not None + if data.get('year') is not None: + data['year'] = str(data['year']) + super().__init__(**data) + +# InvenTree API Models +class StorageLocation(BaseModel): + pk: Optional[int] = None + name: str + description: Optional[str] = None + parent: Optional[int] = None + pathstring: Optional[str] = None + +class CreateItemRequest(BaseModel): + media_item: MediaItem + storage_location_id: int + quantity: float = 1.0 + notes: Optional[str] = None app = FastAPI( title="Media Inventory App", @@ -54,6 +79,46 @@ templates = Jinja2Templates(directory="templates") OPENLIBRARY_SEARCH_URL = "https://openlibrary.org/search.json" DISCOGS_SEARCH_URL = "https://api.discogs.com/database/search" +# InvenTree API Functions +def get_inventree_config() -> Optional[Dict[str, Any]]: + """Get InvenTree API configuration""" + api_url = os.getenv("INVENTREE_API_URL") + api_token = os.getenv("INVENTREE_API_TOKEN") + + if not api_url or not api_token: + return None + + # Ensure URL ends with / + if not api_url.endswith('/'): + api_url += '/' + + return { + "api_url": api_url, + "api_token": api_token, + "timeout": int(os.getenv("INVENTREE_API_TIMEOUT", "30")), + "headers": { + "Authorization": f"Token {api_token}", + "Content-Type": "application/json" + } + } + +async def test_inventree_connection() -> bool: + """Test InvenTree API connection""" + config = get_inventree_config() + if not config: + return False + + try: + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + response = await client.get( + f"{config['api_url']}user/me/", + headers=config["headers"] + ) + return response.status_code == 200 + except Exception as e: + print(f"InvenTree API connection test failed: {e}") + return False + @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Serve the main search form""" @@ -246,6 +311,375 @@ async def search_discogs(query: str, media_type: str, artist: Optional[str] = No "discogs_url": "https://www.discogs.com/settings/developers" }] +# InvenTree Inventory Management Endpoints +@app.get("/inventory", response_class=HTMLResponse) +async def inventory_home(request: Request): + """Serve the inventory management page""" + config = get_inventree_config() + if not config: + return templates.TemplateResponse("inventory.html", { + "request": request, + "error": "InvenTree API not configured. Please check your .env file." + }) + + # Test connection and get locations + if not await test_inventree_connection(): + return templates.TemplateResponse("inventory.html", { + "request": request, + "error": "Failed to connect to InvenTree API. Please check your configuration." + }) + + try: + # Get storage locations for the dropdown + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + response = await client.get( + f"{config['api_url']}stock/location/", + headers=config["headers"] + ) + + if response.status_code == 200: + locations_data = response.json() + + # Handle both paginated and direct list responses + if isinstance(locations_data, dict) and "results" in locations_data: + locations = locations_data.get("results", []) + elif isinstance(locations_data, list): + locations = locations_data + else: + locations = [] + + location_data = [] + for loc in locations: + if isinstance(loc, dict): + location_data.append({ + "pk": loc.get("pk"), + "name": loc.get("name", "Unknown"), + "pathstring": loc.get("pathstring", loc.get("name", "Unknown")) + }) + + return templates.TemplateResponse("inventory.html", { + "request": request, + "locations": location_data + }) + else: + return templates.TemplateResponse("inventory.html", { + "request": request, + "error": f"Failed to fetch storage locations: HTTP {response.status_code}" + }) + + except Exception as e: + return templates.TemplateResponse("inventory.html", { + "request": request, + "error": f"Error connecting to InvenTree: {str(e)}" + }) + +@app.get("/api/storage-locations") +async def get_storage_locations(): + """Get all storage locations from InvenTree""" + config = get_inventree_config() + if not config: + return {"error": "InvenTree API not configured"} + + try: + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + response = await client.get( + f"{config['api_url']}stock/location/", + headers=config["headers"] + ) + + if response.status_code == 200: + data = response.json() + + # Handle both paginated and direct list responses + if isinstance(data, dict) and "results" in data: + locations = data.get("results", []) + elif isinstance(data, list): + locations = data + else: + locations = [] + + return [ + { + "pk": loc.get("pk"), + "name": loc.get("name"), + "description": loc.get("description"), + "pathstring": loc.get("pathstring") + } + for loc in locations if isinstance(loc, dict) + ] + else: + return {"error": f"HTTP {response.status_code}: {response.text}"} + + except Exception as e: + return {"error": str(e)} + +@app.post("/api/storage-locations") +async def create_storage_location(location_data: dict = Body(...)): + """Create a new storage location in InvenTree""" + config = get_inventree_config() + if not config: + return {"error": "InvenTree API not configured"} + + try: + # Ensure description is not None - provide empty string if not provided + if "description" not in location_data or location_data["description"] is None: + location_data["description"] = "" + + # Clean up the data to ensure it's properly formatted + clean_data = { + "name": location_data.get("name", ""), + "description": location_data.get("description", "") + } + + # Add parent if provided + if "parent" in location_data and location_data["parent"]: + clean_data["parent"] = location_data["parent"] + + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + response = await client.post( + f"{config['api_url']}stock/location/", + headers=config["headers"], + json=clean_data + ) + + if response.status_code == 201: + location = response.json() + return {"success": True, "location": {"pk": location.get("pk"), "name": location.get("name")}} + else: + return {"error": f"HTTP {response.status_code}: {response.text}"} + + except Exception as e: + return {"error": f"Error creating location: {str(e)}"} + +@app.delete("/api/storage-locations/{location_id}") +async def delete_storage_location(location_id: int): + """Delete a storage location if it's empty (has no stock items)""" + config = get_inventree_config() + if not config: + return {"error": "InvenTree API not configured"} + + try: + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + # First, check if the location has any stock items + stock_response = await client.get( + f"{config['api_url']}stock/", + headers=config["headers"], + params={"location": location_id} + ) + + if stock_response.status_code == 200: + stock_data = stock_response.json() + + # Handle both paginated and direct list responses for stock items + if isinstance(stock_data, dict) and "results" in stock_data: + stock_items = stock_data.get("results", []) + elif isinstance(stock_data, list): + stock_items = stock_data + else: + stock_items = [] + + # Check if location has any stock items + if len(stock_items) > 0: + return { + "success": False, + "error": "Cannot delete location with stock items", + "message": f"Location contains {len(stock_items)} stock item(s). Move or remove items first." + } + else: + return {"error": f"Failed to check stock items: HTTP {stock_response.status_code}"} + + # If location is empty, proceed with deletion + delete_response = await client.delete( + f"{config['api_url']}stock/location/{location_id}/", + headers=config["headers"] + ) + + if delete_response.status_code == 204: + return {"success": True, "message": "Storage location deleted successfully"} + else: + return {"error": f"Failed to delete location: HTTP {delete_response.status_code}: {delete_response.text}"} + + except Exception as e: + return {"error": f"Error deleting location: {str(e)}"} + +@app.post("/api/create-inventory-item") +async def create_inventory_item(create_request: CreateItemRequest): + """Create a new inventory item from a media search result""" + config = get_inventree_config() + if not config: + return {"success": False, "error": "InvenTree API not configured"} + + try: + print(f"Creating inventory item for: {create_request.media_item.title}") + + # First, get or create the part category for media items + media_category_id = await get_or_create_media_category(config) + print(f"Media category ID: {media_category_id}") + + # Create the part with required fields + part_data = { + "name": create_request.media_item.title, + "description": create_part_description(create_request.media_item) or "Media item", + "active": True, + "purchaseable": False, + "salable": False, + "trackable": False + } + + # Add optional fields + keywords = create_part_keywords(create_request.media_item) + if keywords: + part_data["keywords"] = keywords + + if media_category_id: + part_data["category"] = media_category_id + + if create_request.media_item.openlibrary_url or create_request.media_item.discogs_url: + part_data["link"] = create_request.media_item.openlibrary_url or create_request.media_item.discogs_url + + print(f"Creating part with data: {part_data}") + + # Create the part using REST API + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + part_response = await client.post( + f"{config['api_url']}part/", + headers=config["headers"], + json=part_data + ) + + if part_response.status_code != 201: + error_text = part_response.text + print(f"Part creation failed: HTTP {part_response.status_code} - {error_text}") + return { + "success": False, + "error": f"Part creation failed: HTTP {part_response.status_code}", + "message": f"Failed to create part: {error_text}" + } + + part = part_response.json() + part_id = part.get("pk") + print(f"Part created successfully: {part_id}") + + # Create the stock item + stock_data = { + "part": part_id, + "location": create_request.storage_location_id, + "quantity": create_request.quantity, + "notes": create_request.notes or f"Added from media search on {datetime.now().strftime('%Y-%m-%d')}" + } + + print(f"Creating stock item with data: {stock_data}") + + stock_response = await client.post( + f"{config['api_url']}stock/", + headers=config["headers"], + json=stock_data + ) + + if stock_response.status_code != 201: + error_text = stock_response.text + print(f"Stock item creation failed: HTTP {stock_response.status_code} - {error_text}") + return { + "success": False, + "error": f"Stock item creation failed: HTTP {stock_response.status_code}", + "message": f"Failed to create stock item: {error_text}" + } + + stock_item = stock_response.json() + stock_id = stock_item.get("pk") + print(f"Stock item created successfully: {stock_id}") + + return { + "success": True, + "part_id": part_id, + "stock_id": stock_id, + "message": f"Successfully added '{create_request.media_item.title}' to inventory" + } + + except Exception as e: + print(f"Unexpected error in create_inventory_item: {e}") + return { + "success": False, + "error": str(e), + "message": f"Failed to add item to inventory: {str(e)}" + } + +async def get_or_create_media_category(config: Dict[str, Any]) -> Optional[int]: + """Get or create the 'Media' category for organizing media items""" + try: + async with httpx.AsyncClient(timeout=config["timeout"]) as client: + # Try to find existing media category + response = await client.get( + f"{config['api_url']}part/category/", + headers=config["headers"], + params={"name": "Media"} + ) + + if response.status_code == 200: + data = response.json() + categories = data.get("results", []) + if categories: + return categories[0].get("pk") + + # Create media category if it doesn't exist + category_data = { + "name": "Media", + "description": "Books, vinyl records, CDs, and cassettes" + } + + create_response = await client.post( + f"{config['api_url']}part/category/", + headers=config["headers"], + json=category_data + ) + + if create_response.status_code == 201: + category = create_response.json() + return category.get("pk") + + except Exception as e: + print(f"Error with media category: {e}") + + return None + +def create_part_description(media_item: MediaItem) -> str: + """Create a description for the part based on media item data""" + desc_parts = [] + + if media_item.artist: + desc_parts.append(f"Artist: {media_item.artist}") + elif media_item.author: + desc_parts.append(f"Author: {media_item.author}") + + if media_item.year and media_item.year != "Unknown": + desc_parts.append(f"Year: {media_item.year}") + + if media_item.format: + desc_parts.append(f"Format: {media_item.format}") + + if media_item.label: + desc_parts.append(f"Label: {media_item.label}") + + if media_item.isbn: + desc_parts.append(f"ISBN: {media_item.isbn}") + + return " | ".join(desc_parts) if desc_parts else "Media item" + +def create_part_keywords(media_item: MediaItem) -> str: + """Create keywords for the part based on media item data""" + keywords = [] + + if media_item.artist: + keywords.append(media_item.artist) + if media_item.author: + keywords.append(media_item.author) + if media_item.format: + keywords.append(media_item.format) + if media_item.label: + keywords.append(media_item.label) + + return ", ".join(keywords) + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/templates/base.html b/templates/base.html index cbdc54d..488e039 100644 --- a/templates/base.html +++ b/templates/base.html @@ -11,6 +11,19 @@ diff --git a/templates/inventory.html b/templates/inventory.html new file mode 100644 index 0000000..79149cd --- /dev/null +++ b/templates/inventory.html @@ -0,0 +1,281 @@ +{% extends "base.html" %} + +{% block title %}Inventory Management - Media Inventory App{% endblock %} + +{% block content %} +
+
+

đŸ“Ļ Inventory Management

+

Manage your InvenTree storage locations and inventory items

+ + {% if error %} + + {% else %} + + +
+
+
+
+
📍 Storage Locations
+ +
+
+ {% if locations %} +
+ + + + + + + + + + + {% for location in locations %} + + + + + + + {% endfor %} + +
IDNamePathActions
{{ location.pk }}{{ location.name }}{{ location.pathstring }} + +
+
+ {% else %} +
+

No storage locations found

+ +
+ {% endif %} +
+
+
+ +
+
+
+
â„šī¸ How to Use
+
+
+
    +
  1. Create Locations: Set up storage locations for your media
  2. +
  3. Search Media: Use the search function to find items
  4. +
  5. Add to Inventory: Click "Add to Inventory" on search results
  6. +
  7. Delete Empty Locations: Remove unused locations with the delete button
  8. +
  9. Manage: Use InvenTree web interface for advanced management
  10. +
+
+

+ Connected to:
+ InvenTree API ✅ +

+
+
+
+
+ + {% endif %} +
+
+ + + + + + + + +{% endblock %} \ No newline at end of file diff --git a/templates/results.html b/templates/results.html index 3f5ec47..b189df9 100644 --- a/templates/results.html +++ b/templates/results.html @@ -55,17 +55,25 @@ {% endif %}
- {% if media_type == 'book' and item.openlibrary_url %} - - 📚 View on OpenLibrary - - {% elif media_type != 'book' and item.discogs_url %} - - đŸŽĩ View on Discogs - - {% endif %} +
+ {% if media_type == 'book' and item.openlibrary_url %} + + 📚 View on OpenLibrary + + {% elif media_type != 'book' and item.discogs_url %} + + đŸŽĩ View on Discogs + + {% endif %} + + +
@@ -81,4 +89,203 @@ {% endif %} + + + + + {% endblock %}