Files
fastapi-inventory/main.py
grabowski 664f58cefe Consolidate search endpoints for better API organization
- Removed duplicate /api/search endpoint
- Kept /search as the main JSON API endpoint with proper documentation
- Added /search/form for web form submissions
- Updated HTML form to use /search/form endpoint
- Clear separation between API and web interface
- Media type enums still visible in /docs for API usage
- Maintains all functionality while reducing endpoint confusion
2025-08-11 16:50:57 +07:00

252 lines
8.6 KiB
Python

from fastapi import FastAPI, Request, Form, Body
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import httpx
import asyncio
from typing import Optional, List, Union
import json
import os
from dotenv import load_dotenv
import discogs_client as discogs
from enum import Enum
from pydantic import BaseModel
# Load environment variables
load_dotenv()
# Define media types enum for API documentation
class MediaType(str, Enum):
book = "book"
vinyl = "vinyl"
cd = "cd"
cassette = "cassette"
# Pydantic models for API documentation
class SearchRequest(BaseModel):
media_type: MediaType
query: str
artist: Optional[str] = None
class MediaItem(BaseModel):
title: str
artist: Optional[str] = None
author: Optional[str] = None
year: str
isbn: Optional[str] = None
label: Optional[str] = None
format: Optional[str] = None
cover_url: Optional[str] = None
openlibrary_url: Optional[str] = None
discogs_url: Optional[str] = None
app = FastAPI(
title="Media Inventory App",
description="Search for books, vinyl records, CDs, and cassettes using OpenLibrary and Discogs APIs",
version="1.0.0"
)
# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# API endpoints for external services
OPENLIBRARY_SEARCH_URL = "https://openlibrary.org/search.json"
DISCOGS_SEARCH_URL = "https://api.discogs.com/database/search"
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Serve the main search form"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/search", response_model=List[MediaItem])
async def api_search_media(search_request: SearchRequest):
"""
Search for media items via JSON API
**Media Types:**
- `book`: Search books using OpenLibrary API
- `vinyl`: Search vinyl records using Discogs API
- `cd`: Search CDs using Discogs API
- `cassette`: Search cassettes using Discogs API
**Parameters:**
- `media_type`: Type of media to search for (required)
- `query`: Search query - title, album name, or keywords (required)
- `artist`: Artist name for music searches (optional, helps narrow results)
**Returns:**
List of media items with details like title, artist/author, year, and links to source
"""
if not search_request.query.strip():
return []
if search_request.media_type == MediaType.book:
results = await search_openlibrary(search_request.query)
else: # vinyl, cd, cassette
results = await search_discogs(search_request.query, search_request.media_type.value, search_request.artist)
return results
@app.post("/search/form")
async def form_search_media(
request: Request,
media_type: str = Form(...),
query: str = Form(...),
artist: Optional[str] = Form(None)
):
"""Search for media via web form submission - returns HTML page"""
if not query.strip():
return templates.TemplateResponse("index.html", {
"request": request,
"error": "Please enter a search query"
})
try:
if media_type == "book":
results = await search_openlibrary(query)
else: # vinyl, cd, cassette
results = await search_discogs(query, media_type, artist)
return templates.TemplateResponse("results.html", {
"request": request,
"results": results,
"media_type": media_type,
"query": query
})
except Exception as e:
return templates.TemplateResponse("index.html", {
"request": request,
"error": f"Search failed: {str(e)}"
})
async def search_openlibrary(query: str):
"""Search OpenLibrary for books"""
async with httpx.AsyncClient() as client:
params = {
"q": query,
"limit": 10,
"fields": "key,title,author_name,first_publish_year,isbn,cover_i"
}
response = await client.get(OPENLIBRARY_SEARCH_URL, params=params)
response.raise_for_status()
data = response.json()
books = []
for doc in data.get("docs", []):
book = {
"title": doc.get("title", "Unknown Title"),
"author": ", ".join(doc.get("author_name", ["Unknown Author"])),
"year": doc.get("first_publish_year", "Unknown"),
"isbn": doc.get("isbn", [None])[0] if doc.get("isbn") else None,
"cover_url": f"https://covers.openlibrary.org/b/id/{doc.get('cover_i')}-M.jpg" if doc.get('cover_i') else None,
"openlibrary_url": f"https://openlibrary.org{doc.get('key')}" if doc.get('key') else None
}
books.append(book)
return books
async def search_discogs(query: str, media_type: str, artist: Optional[str] = None):
"""Search Discogs for vinyl, CDs, and cassettes using discogs_client"""
# Get API credentials from environment
user_token = os.getenv("DISCOGS_USER_TOKEN")
user_agent = os.getenv("DISCOGS_USER_AGENT", "MediaInventoryApp/1.0")
if not user_token:
# Return a helpful message if no API key is configured
return [{
"title": "Discogs API Key Required",
"artist": "Configuration Needed",
"year": "N/A",
"label": "Please add your Discogs API key to the .env file",
"format": "See README.md for setup instructions",
"cover_url": None,
"discogs_url": "https://www.discogs.com/settings/developers"
}]
try:
# Initialize Discogs client
d = discogs.Client(user_agent, user_token=user_token)
# Map our media types to Discogs format
format_map = {
"vinyl": "Vinyl",
"cd": "CD",
"cassette": "Cassette"
}
# Build search query
search_query = query
if artist:
search_query = f"{artist} {query}"
# Search for releases
search_results = d.search(
search_query,
type='release',
format=format_map.get(media_type, "Vinyl")
)
releases = []
# Limit to first 10 results
for i, result in enumerate(search_results):
if i >= 10:
break
try:
# Extract artist name from title or use separate artist field
title = result.title
artist_name = "Unknown Artist"
if " - " in title:
parts = title.split(" - ", 1)
artist_name = parts[0]
title = parts[1] if len(parts) > 1 else title
# Try to get additional details
try:
year = result.year if hasattr(result, 'year') else "Unknown"
labels = [label.name for label in result.labels] if hasattr(result, 'labels') and result.labels else ["Unknown Label"]
formats = [f.get('name', 'Unknown') for f in result.formats] if hasattr(result, 'formats') and result.formats else ["Unknown Format"]
except:
year = "Unknown"
labels = ["Unknown Label"]
formats = ["Unknown Format"]
release = {
"title": title,
"artist": artist_name,
"year": str(year),
"label": ", ".join(labels),
"format": ", ".join(formats),
"cover_url": result.thumb if hasattr(result, 'thumb') else None,
"discogs_url": result.url if hasattr(result, 'url') else None
}
releases.append(release)
except Exception as e:
# Skip problematic results but continue processing
continue
return releases
except Exception as e:
# Return error information
return [{
"title": "Search Error",
"artist": "Discogs API",
"year": "N/A",
"label": f"Error: {str(e)}",
"format": "Please check your API configuration",
"cover_url": None,
"discogs_url": "https://www.discogs.com/settings/developers"
}]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)