Files
fastapi-inventory/main.py
grabowski d9f184d084 Fix Discogs API integration with proper authentication
- Updated to use python3-discogs-client==2.8 library
- Added environment variable configuration for API keys
- Implemented proper error handling for missing API credentials
- Added .env file for configuration management
- Enhanced search functionality with graceful fallbacks
- Updated README with Discogs API setup instructions
2025-08-11 16:15:15 +07:00

192 lines
6.7 KiB
Python

from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import httpx
import asyncio
from typing import Optional
import json
import os
from dotenv import load_dotenv
import discogs_client as discogs
# Load environment variables
load_dotenv()
app = FastAPI(title="Media Inventory App", description="Search for books, vinyl, CDs, and cassettes")
# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# API endpoints for external services
OPENLIBRARY_SEARCH_URL = "https://openlibrary.org/search.json"
DISCOGS_SEARCH_URL = "https://api.discogs.com/database/search"
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Serve the main search form"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/search")
async def search_media(
request: Request,
media_type: str = Form(...),
query: str = Form(...),
artist: Optional[str] = Form(None)
):
"""Search for media based on type and query"""
if not query.strip():
return templates.TemplateResponse("index.html", {
"request": request,
"error": "Please enter a search query"
})
try:
if media_type == "book":
results = await search_openlibrary(query)
else: # vinyl, cd, cassette
results = await search_discogs(query, media_type, artist)
return templates.TemplateResponse("results.html", {
"request": request,
"results": results,
"media_type": media_type,
"query": query
})
except Exception as e:
return templates.TemplateResponse("index.html", {
"request": request,
"error": f"Search failed: {str(e)}"
})
async def search_openlibrary(query: str):
"""Search OpenLibrary for books"""
async with httpx.AsyncClient() as client:
params = {
"q": query,
"limit": 10,
"fields": "key,title,author_name,first_publish_year,isbn,cover_i"
}
response = await client.get(OPENLIBRARY_SEARCH_URL, params=params)
response.raise_for_status()
data = response.json()
books = []
for doc in data.get("docs", []):
book = {
"title": doc.get("title", "Unknown Title"),
"author": ", ".join(doc.get("author_name", ["Unknown Author"])),
"year": doc.get("first_publish_year", "Unknown"),
"isbn": doc.get("isbn", [None])[0] if doc.get("isbn") else None,
"cover_url": f"https://covers.openlibrary.org/b/id/{doc.get('cover_i')}-M.jpg" if doc.get('cover_i') else None,
"openlibrary_url": f"https://openlibrary.org{doc.get('key')}" if doc.get('key') else None
}
books.append(book)
return books
async def search_discogs(query: str, media_type: str, artist: Optional[str] = None):
"""Search Discogs for vinyl, CDs, and cassettes using discogs_client"""
# Get API credentials from environment
user_token = os.getenv("DISCOGS_USER_TOKEN")
user_agent = os.getenv("DISCOGS_USER_AGENT", "MediaInventoryApp/1.0")
if not user_token:
# Return a helpful message if no API key is configured
return [{
"title": "Discogs API Key Required",
"artist": "Configuration Needed",
"year": "N/A",
"label": "Please add your Discogs API key to the .env file",
"format": "See README.md for setup instructions",
"cover_url": None,
"discogs_url": "https://www.discogs.com/settings/developers"
}]
try:
# Initialize Discogs client
d = discogs.Client(user_agent, user_token=user_token)
# Map our media types to Discogs format
format_map = {
"vinyl": "Vinyl",
"cd": "CD",
"cassette": "Cassette"
}
# Build search query
search_query = query
if artist:
search_query = f"{artist} {query}"
# Search for releases
search_results = d.search(
search_query,
type='release',
format=format_map.get(media_type, "Vinyl")
)
releases = []
# Limit to first 10 results
for i, result in enumerate(search_results):
if i >= 10:
break
try:
# Extract artist name from title or use separate artist field
title = result.title
artist_name = "Unknown Artist"
if " - " in title:
parts = title.split(" - ", 1)
artist_name = parts[0]
title = parts[1] if len(parts) > 1 else title
# Try to get additional details
try:
year = result.year if hasattr(result, 'year') else "Unknown"
labels = [label.name for label in result.labels] if hasattr(result, 'labels') and result.labels else ["Unknown Label"]
formats = [f.get('name', 'Unknown') for f in result.formats] if hasattr(result, 'formats') and result.formats else ["Unknown Format"]
except:
year = "Unknown"
labels = ["Unknown Label"]
formats = ["Unknown Format"]
release = {
"title": title,
"artist": artist_name,
"year": str(year),
"label": ", ".join(labels),
"format": ", ".join(formats),
"cover_url": result.thumb if hasattr(result, 'thumb') else None,
"discogs_url": result.url if hasattr(result, 'url') else None
}
releases.append(release)
except Exception as e:
# Skip problematic results but continue processing
continue
return releases
except Exception as e:
# Return error information
return [{
"title": "Search Error",
"artist": "Discogs API",
"year": "N/A",
"label": f"Error: {str(e)}",
"format": "Please check your API configuration",
"cover_url": None,
"discogs_url": "https://www.discogs.com/settings/developers"
}]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)