inventory management add/del
This commit is contained in:
438
main.py
438
main.py
@@ -4,13 +4,14 @@ from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.templating import Jinja2Templates
|
||||
import httpx
|
||||
import asyncio
|
||||
from typing import Optional, List, Union
|
||||
from typing import Optional, List, Union, Dict, Any
|
||||
import json
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
import discogs_client as discogs
|
||||
from enum import Enum
|
||||
from pydantic import BaseModel
|
||||
from datetime import datetime
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
@@ -32,13 +33,37 @@ class MediaItem(BaseModel):
|
||||
title: str
|
||||
artist: Optional[str] = None
|
||||
author: Optional[str] = None
|
||||
year: str
|
||||
year: Union[str, int, None] = None
|
||||
isbn: Optional[str] = None
|
||||
label: Optional[str] = None
|
||||
format: Optional[str] = None
|
||||
cover_url: Optional[str] = None
|
||||
openlibrary_url: Optional[str] = None
|
||||
discogs_url: Optional[str] = None
|
||||
|
||||
class Config:
|
||||
extra = "ignore"
|
||||
str_strip_whitespace = True
|
||||
|
||||
def __init__(self, **data):
|
||||
# Convert year to string if it's not None
|
||||
if data.get('year') is not None:
|
||||
data['year'] = str(data['year'])
|
||||
super().__init__(**data)
|
||||
|
||||
# InvenTree API Models
|
||||
class StorageLocation(BaseModel):
|
||||
pk: Optional[int] = None
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
parent: Optional[int] = None
|
||||
pathstring: Optional[str] = None
|
||||
|
||||
class CreateItemRequest(BaseModel):
|
||||
media_item: MediaItem
|
||||
storage_location_id: int
|
||||
quantity: float = 1.0
|
||||
notes: Optional[str] = None
|
||||
|
||||
app = FastAPI(
|
||||
title="Media Inventory App",
|
||||
@@ -54,6 +79,46 @@ templates = Jinja2Templates(directory="templates")
|
||||
OPENLIBRARY_SEARCH_URL = "https://openlibrary.org/search.json"
|
||||
DISCOGS_SEARCH_URL = "https://api.discogs.com/database/search"
|
||||
|
||||
# InvenTree API Functions
|
||||
def get_inventree_config() -> Optional[Dict[str, Any]]:
|
||||
"""Get InvenTree API configuration"""
|
||||
api_url = os.getenv("INVENTREE_API_URL")
|
||||
api_token = os.getenv("INVENTREE_API_TOKEN")
|
||||
|
||||
if not api_url or not api_token:
|
||||
return None
|
||||
|
||||
# Ensure URL ends with /
|
||||
if not api_url.endswith('/'):
|
||||
api_url += '/'
|
||||
|
||||
return {
|
||||
"api_url": api_url,
|
||||
"api_token": api_token,
|
||||
"timeout": int(os.getenv("INVENTREE_API_TIMEOUT", "30")),
|
||||
"headers": {
|
||||
"Authorization": f"Token {api_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
}
|
||||
|
||||
async def test_inventree_connection() -> bool:
|
||||
"""Test InvenTree API connection"""
|
||||
config = get_inventree_config()
|
||||
if not config:
|
||||
return False
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
response = await client.get(
|
||||
f"{config['api_url']}user/me/",
|
||||
headers=config["headers"]
|
||||
)
|
||||
return response.status_code == 200
|
||||
except Exception as e:
|
||||
print(f"InvenTree API connection test failed: {e}")
|
||||
return False
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def home(request: Request):
|
||||
"""Serve the main search form"""
|
||||
@@ -246,6 +311,375 @@ async def search_discogs(query: str, media_type: str, artist: Optional[str] = No
|
||||
"discogs_url": "https://www.discogs.com/settings/developers"
|
||||
}]
|
||||
|
||||
# InvenTree Inventory Management Endpoints
|
||||
@app.get("/inventory", response_class=HTMLResponse)
|
||||
async def inventory_home(request: Request):
|
||||
"""Serve the inventory management page"""
|
||||
config = get_inventree_config()
|
||||
if not config:
|
||||
return templates.TemplateResponse("inventory.html", {
|
||||
"request": request,
|
||||
"error": "InvenTree API not configured. Please check your .env file."
|
||||
})
|
||||
|
||||
# Test connection and get locations
|
||||
if not await test_inventree_connection():
|
||||
return templates.TemplateResponse("inventory.html", {
|
||||
"request": request,
|
||||
"error": "Failed to connect to InvenTree API. Please check your configuration."
|
||||
})
|
||||
|
||||
try:
|
||||
# Get storage locations for the dropdown
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
response = await client.get(
|
||||
f"{config['api_url']}stock/location/",
|
||||
headers=config["headers"]
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
locations_data = response.json()
|
||||
|
||||
# Handle both paginated and direct list responses
|
||||
if isinstance(locations_data, dict) and "results" in locations_data:
|
||||
locations = locations_data.get("results", [])
|
||||
elif isinstance(locations_data, list):
|
||||
locations = locations_data
|
||||
else:
|
||||
locations = []
|
||||
|
||||
location_data = []
|
||||
for loc in locations:
|
||||
if isinstance(loc, dict):
|
||||
location_data.append({
|
||||
"pk": loc.get("pk"),
|
||||
"name": loc.get("name", "Unknown"),
|
||||
"pathstring": loc.get("pathstring", loc.get("name", "Unknown"))
|
||||
})
|
||||
|
||||
return templates.TemplateResponse("inventory.html", {
|
||||
"request": request,
|
||||
"locations": location_data
|
||||
})
|
||||
else:
|
||||
return templates.TemplateResponse("inventory.html", {
|
||||
"request": request,
|
||||
"error": f"Failed to fetch storage locations: HTTP {response.status_code}"
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
return templates.TemplateResponse("inventory.html", {
|
||||
"request": request,
|
||||
"error": f"Error connecting to InvenTree: {str(e)}"
|
||||
})
|
||||
|
||||
@app.get("/api/storage-locations")
|
||||
async def get_storage_locations():
|
||||
"""Get all storage locations from InvenTree"""
|
||||
config = get_inventree_config()
|
||||
if not config:
|
||||
return {"error": "InvenTree API not configured"}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
response = await client.get(
|
||||
f"{config['api_url']}stock/location/",
|
||||
headers=config["headers"]
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
# Handle both paginated and direct list responses
|
||||
if isinstance(data, dict) and "results" in data:
|
||||
locations = data.get("results", [])
|
||||
elif isinstance(data, list):
|
||||
locations = data
|
||||
else:
|
||||
locations = []
|
||||
|
||||
return [
|
||||
{
|
||||
"pk": loc.get("pk"),
|
||||
"name": loc.get("name"),
|
||||
"description": loc.get("description"),
|
||||
"pathstring": loc.get("pathstring")
|
||||
}
|
||||
for loc in locations if isinstance(loc, dict)
|
||||
]
|
||||
else:
|
||||
return {"error": f"HTTP {response.status_code}: {response.text}"}
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@app.post("/api/storage-locations")
|
||||
async def create_storage_location(location_data: dict = Body(...)):
|
||||
"""Create a new storage location in InvenTree"""
|
||||
config = get_inventree_config()
|
||||
if not config:
|
||||
return {"error": "InvenTree API not configured"}
|
||||
|
||||
try:
|
||||
# Ensure description is not None - provide empty string if not provided
|
||||
if "description" not in location_data or location_data["description"] is None:
|
||||
location_data["description"] = ""
|
||||
|
||||
# Clean up the data to ensure it's properly formatted
|
||||
clean_data = {
|
||||
"name": location_data.get("name", ""),
|
||||
"description": location_data.get("description", "")
|
||||
}
|
||||
|
||||
# Add parent if provided
|
||||
if "parent" in location_data and location_data["parent"]:
|
||||
clean_data["parent"] = location_data["parent"]
|
||||
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
response = await client.post(
|
||||
f"{config['api_url']}stock/location/",
|
||||
headers=config["headers"],
|
||||
json=clean_data
|
||||
)
|
||||
|
||||
if response.status_code == 201:
|
||||
location = response.json()
|
||||
return {"success": True, "location": {"pk": location.get("pk"), "name": location.get("name")}}
|
||||
else:
|
||||
return {"error": f"HTTP {response.status_code}: {response.text}"}
|
||||
|
||||
except Exception as e:
|
||||
return {"error": f"Error creating location: {str(e)}"}
|
||||
|
||||
@app.delete("/api/storage-locations/{location_id}")
|
||||
async def delete_storage_location(location_id: int):
|
||||
"""Delete a storage location if it's empty (has no stock items)"""
|
||||
config = get_inventree_config()
|
||||
if not config:
|
||||
return {"error": "InvenTree API not configured"}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
# First, check if the location has any stock items
|
||||
stock_response = await client.get(
|
||||
f"{config['api_url']}stock/",
|
||||
headers=config["headers"],
|
||||
params={"location": location_id}
|
||||
)
|
||||
|
||||
if stock_response.status_code == 200:
|
||||
stock_data = stock_response.json()
|
||||
|
||||
# Handle both paginated and direct list responses for stock items
|
||||
if isinstance(stock_data, dict) and "results" in stock_data:
|
||||
stock_items = stock_data.get("results", [])
|
||||
elif isinstance(stock_data, list):
|
||||
stock_items = stock_data
|
||||
else:
|
||||
stock_items = []
|
||||
|
||||
# Check if location has any stock items
|
||||
if len(stock_items) > 0:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Cannot delete location with stock items",
|
||||
"message": f"Location contains {len(stock_items)} stock item(s). Move or remove items first."
|
||||
}
|
||||
else:
|
||||
return {"error": f"Failed to check stock items: HTTP {stock_response.status_code}"}
|
||||
|
||||
# If location is empty, proceed with deletion
|
||||
delete_response = await client.delete(
|
||||
f"{config['api_url']}stock/location/{location_id}/",
|
||||
headers=config["headers"]
|
||||
)
|
||||
|
||||
if delete_response.status_code == 204:
|
||||
return {"success": True, "message": "Storage location deleted successfully"}
|
||||
else:
|
||||
return {"error": f"Failed to delete location: HTTP {delete_response.status_code}: {delete_response.text}"}
|
||||
|
||||
except Exception as e:
|
||||
return {"error": f"Error deleting location: {str(e)}"}
|
||||
|
||||
@app.post("/api/create-inventory-item")
|
||||
async def create_inventory_item(create_request: CreateItemRequest):
|
||||
"""Create a new inventory item from a media search result"""
|
||||
config = get_inventree_config()
|
||||
if not config:
|
||||
return {"success": False, "error": "InvenTree API not configured"}
|
||||
|
||||
try:
|
||||
print(f"Creating inventory item for: {create_request.media_item.title}")
|
||||
|
||||
# First, get or create the part category for media items
|
||||
media_category_id = await get_or_create_media_category(config)
|
||||
print(f"Media category ID: {media_category_id}")
|
||||
|
||||
# Create the part with required fields
|
||||
part_data = {
|
||||
"name": create_request.media_item.title,
|
||||
"description": create_part_description(create_request.media_item) or "Media item",
|
||||
"active": True,
|
||||
"purchaseable": False,
|
||||
"salable": False,
|
||||
"trackable": False
|
||||
}
|
||||
|
||||
# Add optional fields
|
||||
keywords = create_part_keywords(create_request.media_item)
|
||||
if keywords:
|
||||
part_data["keywords"] = keywords
|
||||
|
||||
if media_category_id:
|
||||
part_data["category"] = media_category_id
|
||||
|
||||
if create_request.media_item.openlibrary_url or create_request.media_item.discogs_url:
|
||||
part_data["link"] = create_request.media_item.openlibrary_url or create_request.media_item.discogs_url
|
||||
|
||||
print(f"Creating part with data: {part_data}")
|
||||
|
||||
# Create the part using REST API
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
part_response = await client.post(
|
||||
f"{config['api_url']}part/",
|
||||
headers=config["headers"],
|
||||
json=part_data
|
||||
)
|
||||
|
||||
if part_response.status_code != 201:
|
||||
error_text = part_response.text
|
||||
print(f"Part creation failed: HTTP {part_response.status_code} - {error_text}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Part creation failed: HTTP {part_response.status_code}",
|
||||
"message": f"Failed to create part: {error_text}"
|
||||
}
|
||||
|
||||
part = part_response.json()
|
||||
part_id = part.get("pk")
|
||||
print(f"Part created successfully: {part_id}")
|
||||
|
||||
# Create the stock item
|
||||
stock_data = {
|
||||
"part": part_id,
|
||||
"location": create_request.storage_location_id,
|
||||
"quantity": create_request.quantity,
|
||||
"notes": create_request.notes or f"Added from media search on {datetime.now().strftime('%Y-%m-%d')}"
|
||||
}
|
||||
|
||||
print(f"Creating stock item with data: {stock_data}")
|
||||
|
||||
stock_response = await client.post(
|
||||
f"{config['api_url']}stock/",
|
||||
headers=config["headers"],
|
||||
json=stock_data
|
||||
)
|
||||
|
||||
if stock_response.status_code != 201:
|
||||
error_text = stock_response.text
|
||||
print(f"Stock item creation failed: HTTP {stock_response.status_code} - {error_text}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Stock item creation failed: HTTP {stock_response.status_code}",
|
||||
"message": f"Failed to create stock item: {error_text}"
|
||||
}
|
||||
|
||||
stock_item = stock_response.json()
|
||||
stock_id = stock_item.get("pk")
|
||||
print(f"Stock item created successfully: {stock_id}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"part_id": part_id,
|
||||
"stock_id": stock_id,
|
||||
"message": f"Successfully added '{create_request.media_item.title}' to inventory"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"Unexpected error in create_inventory_item: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"message": f"Failed to add item to inventory: {str(e)}"
|
||||
}
|
||||
|
||||
async def get_or_create_media_category(config: Dict[str, Any]) -> Optional[int]:
|
||||
"""Get or create the 'Media' category for organizing media items"""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
|
||||
# Try to find existing media category
|
||||
response = await client.get(
|
||||
f"{config['api_url']}part/category/",
|
||||
headers=config["headers"],
|
||||
params={"name": "Media"}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
categories = data.get("results", [])
|
||||
if categories:
|
||||
return categories[0].get("pk")
|
||||
|
||||
# Create media category if it doesn't exist
|
||||
category_data = {
|
||||
"name": "Media",
|
||||
"description": "Books, vinyl records, CDs, and cassettes"
|
||||
}
|
||||
|
||||
create_response = await client.post(
|
||||
f"{config['api_url']}part/category/",
|
||||
headers=config["headers"],
|
||||
json=category_data
|
||||
)
|
||||
|
||||
if create_response.status_code == 201:
|
||||
category = create_response.json()
|
||||
return category.get("pk")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error with media category: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def create_part_description(media_item: MediaItem) -> str:
|
||||
"""Create a description for the part based on media item data"""
|
||||
desc_parts = []
|
||||
|
||||
if media_item.artist:
|
||||
desc_parts.append(f"Artist: {media_item.artist}")
|
||||
elif media_item.author:
|
||||
desc_parts.append(f"Author: {media_item.author}")
|
||||
|
||||
if media_item.year and media_item.year != "Unknown":
|
||||
desc_parts.append(f"Year: {media_item.year}")
|
||||
|
||||
if media_item.format:
|
||||
desc_parts.append(f"Format: {media_item.format}")
|
||||
|
||||
if media_item.label:
|
||||
desc_parts.append(f"Label: {media_item.label}")
|
||||
|
||||
if media_item.isbn:
|
||||
desc_parts.append(f"ISBN: {media_item.isbn}")
|
||||
|
||||
return " | ".join(desc_parts) if desc_parts else "Media item"
|
||||
|
||||
def create_part_keywords(media_item: MediaItem) -> str:
|
||||
"""Create keywords for the part based on media item data"""
|
||||
keywords = []
|
||||
|
||||
if media_item.artist:
|
||||
keywords.append(media_item.artist)
|
||||
if media_item.author:
|
||||
keywords.append(media_item.author)
|
||||
if media_item.format:
|
||||
keywords.append(media_item.format)
|
||||
if media_item.label:
|
||||
keywords.append(media_item.label)
|
||||
|
||||
return ", ".join(keywords)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
|
Reference in New Issue
Block a user