Files
fastapi-inventory/main.py
2025-08-18 16:55:45 +07:00

813 lines
30 KiB
Python

from fastapi import FastAPI, Request, Form, Body
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import httpx
import asyncio
from typing import Optional, List, Union, Dict, Any
import json
import os
from dotenv import load_dotenv
import discogs_client as discogs
from enum import Enum
from pydantic import BaseModel
from datetime import datetime
# Load environment variables
load_dotenv()
# Define media types enum for API documentation
class MediaType(str, Enum):
book = "book"
vinyl = "vinyl"
cd = "cd"
cassette = "cassette"
# Pydantic models for API documentation
class SearchRequest(BaseModel):
media_type: MediaType
query: str
artist: Optional[str] = None
class MediaItem(BaseModel):
title: str
artist: Optional[str] = None
author: Optional[str] = None
year: Union[str, int, None] = None
isbn: Optional[str] = None
label: Optional[str] = None
format: Optional[str] = None
cover_url: Optional[str] = None
openlibrary_url: Optional[str] = None
discogs_url: Optional[str] = None
class Config:
extra = "ignore"
str_strip_whitespace = True
def __init__(self, **data):
# Convert year to string if it's not None
if data.get('year') is not None:
data['year'] = str(data['year'])
super().__init__(**data)
# InvenTree API Models
class StorageLocation(BaseModel):
pk: Optional[int] = None
name: str
description: Optional[str] = None
parent: Optional[int] = None
pathstring: Optional[str] = None
class CreateItemRequest(BaseModel):
media_item: MediaItem
storage_location_id: int
quantity: float = 1.0
notes: Optional[str] = None
app = FastAPI(
title="Media Inventory App",
description="Search for books, vinyl records, CDs, and cassettes using OpenLibrary and Discogs APIs",
version="1.0.0"
)
# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# API endpoints for external services
OPENLIBRARY_SEARCH_URL = "https://openlibrary.org/search.json"
DISCOGS_SEARCH_URL = "https://api.discogs.com/database/search"
# InvenTree API Functions
def get_inventree_config() -> Optional[Dict[str, Any]]:
"""Get InvenTree API configuration"""
api_url = os.getenv("INVENTREE_API_URL")
api_token = os.getenv("INVENTREE_API_TOKEN")
if not api_url or not api_token:
return None
# Ensure URL ends with /
if not api_url.endswith('/'):
api_url += '/'
return {
"api_url": api_url,
"api_token": api_token,
"timeout": int(os.getenv("INVENTREE_API_TIMEOUT", "30")),
"headers": {
"Authorization": f"Token {api_token}",
"Content-Type": "application/json"
}
}
async def test_inventree_connection() -> bool:
"""Test InvenTree API connection"""
config = get_inventree_config()
if not config:
return False
try:
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
response = await client.get(
f"{config['api_url']}user/me/",
headers=config["headers"]
)
return response.status_code == 200
except Exception as e:
print(f"InvenTree API connection test failed: {e}")
return False
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Serve the main search form"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/search", response_model=List[MediaItem])
async def api_search_media(search_request: SearchRequest):
"""
Search for media items via JSON API
**Media Types:**
- `book`: Search books using OpenLibrary API
- `vinyl`: Search vinyl records using Discogs API
- `cd`: Search CDs using Discogs API
- `cassette`: Search cassettes using Discogs API
**Parameters:**
- `media_type`: Type of media to search for (required)
- `query`: Search query - title, album name, or keywords (required)
- `artist`: Artist name for music searches (optional, helps narrow results)
**Returns:**
List of media items with details like title, artist/author, year, and links to source
"""
if not search_request.query.strip():
return []
if search_request.media_type == MediaType.book:
results = await search_openlibrary(search_request.query)
else: # vinyl, cd, cassette
results = await search_discogs(search_request.query, search_request.media_type.value, search_request.artist)
return results
@app.post("/search/form")
async def form_search_media(
request: Request,
media_type: str = Form(...),
query: str = Form(...),
artist: Optional[str] = Form(None)
):
"""Search for media via web form submission - returns HTML page"""
if not query.strip():
return templates.TemplateResponse("index.html", {
"request": request,
"error": "Please enter a search query"
})
try:
if media_type == "book":
results = await search_openlibrary(query)
else: # vinyl, cd, cassette
results = await search_discogs(query, media_type, artist)
return templates.TemplateResponse("results.html", {
"request": request,
"results": results,
"media_type": media_type,
"query": query
})
except Exception as e:
return templates.TemplateResponse("index.html", {
"request": request,
"error": f"Search failed: {str(e)}"
})
async def search_openlibrary(query: str):
"""Search OpenLibrary for books"""
async with httpx.AsyncClient() as client:
params = {
"q": query,
"limit": 10,
"fields": "key,title,author_name,first_publish_year,isbn,cover_i"
}
response = await client.get(OPENLIBRARY_SEARCH_URL, params=params)
response.raise_for_status()
data = response.json()
books = []
for doc in data.get("docs", []):
book = {
"title": doc.get("title", "Unknown Title"),
"author": ", ".join(doc.get("author_name", ["Unknown Author"])),
"year": doc.get("first_publish_year", "Unknown"),
"isbn": doc.get("isbn", [None])[0] if doc.get("isbn") else None,
"cover_url": f"https://covers.openlibrary.org/b/id/{doc.get('cover_i')}-M.jpg" if doc.get('cover_i') else None,
"openlibrary_url": f"https://openlibrary.org{doc.get('key')}" if doc.get('key') else None
}
books.append(book)
return books
async def search_discogs(query: str, media_type: str, artist: Optional[str] = None):
"""Search Discogs for vinyl, CDs, and cassettes using discogs_client"""
# Get API credentials from environment
user_token = os.getenv("DISCOGS_USER_TOKEN")
user_agent = os.getenv("DISCOGS_USER_AGENT", "MediaInventoryApp/1.0")
if not user_token:
# Return a helpful message if no API key is configured
return [{
"title": "Discogs API Key Required",
"artist": "Configuration Needed",
"year": "N/A",
"label": "Please add your Discogs API key to the .env file",
"format": "See README.md for setup instructions",
"cover_url": None,
"discogs_url": "https://www.discogs.com/settings/developers"
}]
try:
# Initialize Discogs client
d = discogs.Client(user_agent, user_token=user_token)
# Map our media types to Discogs format
format_map = {
"vinyl": "Vinyl",
"cd": "CD",
"cassette": "Cassette"
}
# Build search query
search_query = query
if artist:
search_query = f"{artist} {query}"
# Search for releases
search_results = d.search(
search_query,
type='release',
format=format_map.get(media_type, "Vinyl")
)
releases = []
# Limit to first 10 results
for i, result in enumerate(search_results):
if i >= 10:
break
try:
# Extract artist name from title or use separate artist field
title = result.title
artist_name = "Unknown Artist"
if " - " in title:
parts = title.split(" - ", 1)
artist_name = parts[0]
title = parts[1] if len(parts) > 1 else title
# Try to get additional details
try:
year = result.year if hasattr(result, 'year') else "Unknown"
labels = [label.name for label in result.labels] if hasattr(result, 'labels') and result.labels else ["Unknown Label"]
formats = [f.get('name', 'Unknown') for f in result.formats] if hasattr(result, 'formats') and result.formats else ["Unknown Format"]
except:
year = "Unknown"
labels = ["Unknown Label"]
formats = ["Unknown Format"]
release = {
"title": title,
"artist": artist_name,
"year": str(year),
"label": ", ".join(labels),
"format": ", ".join(formats),
"cover_url": result.thumb if hasattr(result, 'thumb') else None,
"discogs_url": result.url if hasattr(result, 'url') else None
}
releases.append(release)
except Exception as e:
# Skip problematic results but continue processing
continue
return releases
except Exception as e:
# Return error information
return [{
"title": "Search Error",
"artist": "Discogs API",
"year": "N/A",
"label": f"Error: {str(e)}",
"format": "Please check your API configuration",
"cover_url": None,
"discogs_url": "https://www.discogs.com/settings/developers"
}]
# InvenTree Inventory Management Endpoints
@app.get("/inventory", response_class=HTMLResponse)
async def inventory_home(request: Request):
"""Serve the inventory management page"""
config = get_inventree_config()
if not config:
return templates.TemplateResponse("inventory.html", {
"request": request,
"error": "InvenTree API not configured. Please check your .env file."
})
# Test connection and get locations
if not await test_inventree_connection():
return templates.TemplateResponse("inventory.html", {
"request": request,
"error": "Failed to connect to InvenTree API. Please check your configuration."
})
try:
# Get storage locations for the dropdown
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
response = await client.get(
f"{config['api_url']}stock/location/",
headers=config["headers"]
)
if response.status_code == 200:
locations_data = response.json()
# Handle both paginated and direct list responses
if isinstance(locations_data, dict) and "results" in locations_data:
locations = locations_data.get("results", [])
elif isinstance(locations_data, list):
locations = locations_data
else:
locations = []
location_data = []
for loc in locations:
if isinstance(loc, dict):
location_data.append({
"pk": loc.get("pk"),
"name": loc.get("name", "Unknown"),
"pathstring": loc.get("pathstring", loc.get("name", "Unknown"))
})
return templates.TemplateResponse("inventory.html", {
"request": request,
"locations": location_data
})
else:
return templates.TemplateResponse("inventory.html", {
"request": request,
"error": f"Failed to fetch storage locations: HTTP {response.status_code}"
})
except Exception as e:
return templates.TemplateResponse("inventory.html", {
"request": request,
"error": f"Error connecting to InvenTree: {str(e)}"
})
@app.get("/api/storage-locations")
async def get_storage_locations():
"""Get all storage locations from InvenTree"""
config = get_inventree_config()
if not config:
return {"error": "InvenTree API not configured"}
try:
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
response = await client.get(
f"{config['api_url']}stock/location/",
headers=config["headers"]
)
if response.status_code == 200:
data = response.json()
# Handle both paginated and direct list responses
if isinstance(data, dict) and "results" in data:
locations = data.get("results", [])
elif isinstance(data, list):
locations = data
else:
locations = []
return [
{
"pk": loc.get("pk"),
"name": loc.get("name"),
"description": loc.get("description"),
"pathstring": loc.get("pathstring")
}
for loc in locations if isinstance(loc, dict)
]
else:
return {"error": f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {"error": str(e)}
@app.post("/api/storage-locations")
async def create_storage_location(location_data: dict = Body(...)):
"""Create a new storage location in InvenTree"""
config = get_inventree_config()
if not config:
return {"error": "InvenTree API not configured"}
try:
# Ensure description is not None - provide empty string if not provided
if "description" not in location_data or location_data["description"] is None:
location_data["description"] = ""
# Clean up the data to ensure it's properly formatted
clean_data = {
"name": location_data.get("name", ""),
"description": location_data.get("description", "")
}
# Add parent if provided
if "parent" in location_data and location_data["parent"]:
clean_data["parent"] = location_data["parent"]
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
response = await client.post(
f"{config['api_url']}stock/location/",
headers=config["headers"],
json=clean_data
)
if response.status_code == 201:
location = response.json()
return {"success": True, "location": {"pk": location.get("pk"), "name": location.get("name")}}
else:
return {"error": f"HTTP {response.status_code}: {response.text}"}
except Exception as e:
return {"error": f"Error creating location: {str(e)}"}
@app.delete("/api/storage-locations/{location_id}")
async def delete_storage_location(location_id: int):
"""Delete a storage location if it's empty (has no stock items)"""
config = get_inventree_config()
if not config:
return {"error": "InvenTree API not configured"}
try:
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
# First, check if the location has any stock items
stock_response = await client.get(
f"{config['api_url']}stock/",
headers=config["headers"],
params={"location": location_id}
)
if stock_response.status_code == 200:
stock_data = stock_response.json()
# Handle both paginated and direct list responses for stock items
if isinstance(stock_data, dict) and "results" in stock_data:
stock_items = stock_data.get("results", [])
elif isinstance(stock_data, list):
stock_items = stock_data
else:
stock_items = []
# Check if location has any stock items
if len(stock_items) > 0:
return {
"success": False,
"error": "Cannot delete location with stock items",
"message": f"Location contains {len(stock_items)} stock item(s). Move or remove items first."
}
else:
return {"error": f"Failed to check stock items: HTTP {stock_response.status_code}"}
# If location is empty, proceed with deletion
delete_response = await client.delete(
f"{config['api_url']}stock/location/{location_id}/",
headers=config["headers"]
)
if delete_response.status_code == 204:
return {"success": True, "message": "Storage location deleted successfully"}
else:
return {"error": f"Failed to delete location: HTTP {delete_response.status_code}: {delete_response.text}"}
except Exception as e:
return {"error": f"Error deleting location: {str(e)}"}
@app.post("/api/create-inventory-item")
async def create_inventory_item(create_request: CreateItemRequest):
"""Create a new inventory item from a media search result"""
config = get_inventree_config()
if not config:
return {"success": False, "error": "InvenTree API not configured"}
try:
print(f"Creating inventory item for: {create_request.media_item.title}")
# First, get or create the part category for media items
media_category_id = await get_or_create_media_category(config)
print(f"Media category ID: {media_category_id}")
# Create the part with required fields
part_data = {
"name": create_request.media_item.title,
"description": create_part_description(create_request.media_item) or "Media item",
"active": True,
"purchaseable": False,
"salable": False,
"trackable": False
}
# Add optional fields
keywords = create_part_keywords(create_request.media_item)
if keywords:
part_data["keywords"] = keywords
if media_category_id:
part_data["category"] = media_category_id
if create_request.media_item.openlibrary_url or create_request.media_item.discogs_url:
part_data["link"] = create_request.media_item.openlibrary_url or create_request.media_item.discogs_url
print(f"Creating part with data: {part_data}")
# Create the part using REST API
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
part_response = await client.post(
f"{config['api_url']}part/",
headers=config["headers"],
json=part_data
)
if part_response.status_code != 201:
error_text = part_response.text
print(f"Part creation failed: HTTP {part_response.status_code} - {error_text}")
return {
"success": False,
"error": f"Part creation failed: HTTP {part_response.status_code}",
"message": f"Failed to create part: {error_text}"
}
part = part_response.json()
part_id = part.get("pk")
print(f"Part created successfully: {part_id}")
# Add custom parameters for books (ISBN and Year)
await add_part_parameters(client, config, part_id, create_request.media_item)
# Create the stock item
stock_data = {
"part": part_id,
"location": create_request.storage_location_id,
"quantity": create_request.quantity,
"notes": create_request.notes or f"Added from media search on {datetime.now().strftime('%Y-%m-%d')}"
}
print(f"Creating stock item with data: {stock_data}")
stock_response = await client.post(
f"{config['api_url']}stock/",
headers=config["headers"],
json=stock_data
)
if stock_response.status_code != 201:
error_text = stock_response.text
print(f"Stock item creation failed: HTTP {stock_response.status_code} - {error_text}")
return {
"success": False,
"error": f"Stock item creation failed: HTTP {stock_response.status_code}",
"message": f"Failed to create stock item: {error_text}"
}
stock_item = stock_response.json()
stock_id = stock_item.get("pk")
print(f"Stock item created successfully: {stock_id}")
return {
"success": True,
"part_id": part_id,
"stock_id": stock_id,
"message": f"Successfully added '{create_request.media_item.title}' to inventory"
}
except Exception as e:
print(f"Unexpected error in create_inventory_item: {e}")
return {
"success": False,
"error": str(e),
"message": f"Failed to add item to inventory: {str(e)}"
}
async def get_or_create_media_category(config: Dict[str, Any]) -> Optional[int]:
"""Get or create the 'Media' category for organizing media items"""
try:
async with httpx.AsyncClient(timeout=config["timeout"]) as client:
# Try to find existing media category
response = await client.get(
f"{config['api_url']}part/category/",
headers=config["headers"],
params={"name": "Media"}
)
if response.status_code == 200:
data = response.json()
categories = data.get("results", [])
if categories:
return categories[0].get("pk")
# Create media category if it doesn't exist
category_data = {
"name": "Media",
"description": "Books, vinyl records, CDs, and cassettes"
}
create_response = await client.post(
f"{config['api_url']}part/category/",
headers=config["headers"],
json=category_data
)
if create_response.status_code == 201:
category = create_response.json()
return category.get("pk")
except Exception as e:
print(f"Error with media category: {e}")
return None
def create_part_description(media_item: MediaItem) -> str:
"""Create a description for the part based on media item data"""
desc_parts = []
if media_item.artist:
desc_parts.append(f"Artist: {media_item.artist}")
elif media_item.author:
desc_parts.append(f"Author: {media_item.author}")
if media_item.year and media_item.year != "Unknown":
desc_parts.append(f"Year: {media_item.year}")
if media_item.format:
desc_parts.append(f"Format: {media_item.format}")
if media_item.label:
desc_parts.append(f"Label: {media_item.label}")
if media_item.isbn:
desc_parts.append(f"ISBN: {media_item.isbn}")
return " | ".join(desc_parts) if desc_parts else "Media item"
def create_part_keywords(media_item: MediaItem) -> str:
"""Create keywords for the part based on media item data"""
keywords = []
if media_item.artist:
keywords.append(media_item.artist)
if media_item.author:
keywords.append(media_item.author)
if media_item.format:
keywords.append(media_item.format)
if media_item.label:
keywords.append(media_item.label)
return ", ".join(keywords)
async def add_part_parameters(client: httpx.AsyncClient, config: Dict[str, Any], part_id: int, media_item: MediaItem):
"""Add custom parameters to a part based on media item data"""
try:
parameters_to_add = []
# Add ISBN for books
if media_item.isbn and media_item.isbn.strip():
parameters_to_add.append({
"name": "ISBN",
"value": media_item.isbn.strip()
})
# Add Year for all media types
if media_item.year and str(media_item.year).strip() and str(media_item.year) != "Unknown":
parameters_to_add.append({
"name": "Year",
"value": str(media_item.year).strip()
})
# Add Format for music items
if media_item.format and media_item.format.strip():
parameters_to_add.append({
"name": "Format",
"value": media_item.format.strip()
})
# Add Label for music items
if media_item.label and media_item.label.strip():
parameters_to_add.append({
"name": "Label",
"value": media_item.label.strip()
})
# Add Artist for music items
if media_item.artist and media_item.artist.strip():
parameters_to_add.append({
"name": "Artist",
"value": media_item.artist.strip()
})
# Add Author for books
if media_item.author and media_item.author.strip():
parameters_to_add.append({
"name": "Author",
"value": media_item.author.strip()
})
# Create each parameter
for param in parameters_to_add:
param_data = {
"part": part_id,
"template": None, # We'll create custom parameters without templates
"data": param["value"]
}
# First, try to find or create a parameter template
template_id = await get_or_create_parameter_template(client, config, param["name"])
if template_id:
param_data["template"] = template_id
print(f"Adding parameter {param['name']}: {param['value']} to part {part_id}")
param_response = await client.post(
f"{config['api_url']}part/parameter/",
headers=config["headers"],
json=param_data
)
if param_response.status_code == 201:
print(f"Successfully added parameter {param['name']}")
else:
print(f"Failed to add parameter {param['name']}: HTTP {param_response.status_code} - {param_response.text}")
except Exception as e:
print(f"Error adding part parameters: {e}")
async def get_or_create_parameter_template(client: httpx.AsyncClient, config: Dict[str, Any], param_name: str) -> Optional[int]:
"""Get or create a parameter template for the given parameter name"""
try:
# Try to find existing parameter template
response = await client.get(
f"{config['api_url']}part/parameter/template/",
headers=config["headers"],
params={"name": param_name}
)
if response.status_code == 200:
data = response.json()
# Handle both paginated and direct list responses
if isinstance(data, dict) and "results" in data:
templates = data.get("results", [])
elif isinstance(data, list):
templates = data
else:
templates = []
if templates:
return templates[0].get("pk")
# Create parameter template if it doesn't exist
template_data = {
"name": param_name,
"units": ""
}
create_response = await client.post(
f"{config['api_url']}part/parameter/template/",
headers=config["headers"],
json=template_data
)
if create_response.status_code == 201:
template = create_response.json()
print(f"Created parameter template: {param_name}")
return template.get("pk")
else:
print(f"Failed to create parameter template {param_name}: HTTP {create_response.status_code}")
except Exception as e:
print(f"Error with parameter template {param_name}: {e}")
return None
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)