1517 lines
58 KiB
Python
1517 lines
58 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import urllib.parse
|
|
import urllib.parse
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from .core import (
|
|
BotRuntime,
|
|
DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS,
|
|
MAX_DISCORD_EMBEDS,
|
|
MEDIA_ITEMS_PER_EMBED,
|
|
MediaItem,
|
|
clean_error,
|
|
)
|
|
from .discord_api import discord_delete_message, discord_multipart_request
|
|
from .storage import (
|
|
channel_settings,
|
|
jellyfin_settings,
|
|
load_state,
|
|
save_catalog_url_setting,
|
|
save_media_channel_setting,
|
|
save_state,
|
|
validate_channel_id,
|
|
)
|
|
|
|
|
|
TMDB_IMAGE_BASE_URL = "https://image.tmdb.org/t/p/w342"
|
|
TMDB_POSTER_CACHE: dict[str, str | None] = {}
|
|
|
|
|
|
def parse_int_text(value: str) -> int | None:
|
|
digits = "".join(character for character in value if character.isdigit())
|
|
if not digits:
|
|
return None
|
|
try:
|
|
return int(digits)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_year_text(value: str) -> str | None:
|
|
match = re.search(r"(18|19|20|21)\d{2}", value)
|
|
return match.group(0) if match else None
|
|
|
|
|
|
def clean_media_text(value: str, limit: int = 220) -> str | None:
|
|
cleaned = " ".join(str(value).replace("\r", " ").replace("\n", " ").split())
|
|
if not cleaned:
|
|
return None
|
|
if len(cleaned) <= limit:
|
|
return cleaned
|
|
return cleaned[: limit - 1].rstrip() + "…"
|
|
|
|
|
|
def format_runtime(value: str) -> str | None:
|
|
cleaned = clean_media_text(value, 48)
|
|
if not cleaned:
|
|
return None
|
|
if not cleaned.isdigit():
|
|
return cleaned
|
|
|
|
minutes = int(cleaned)
|
|
if minutes <= 0:
|
|
return None
|
|
hours, remainder = divmod(minutes, 60)
|
|
if hours and remainder:
|
|
return f"{hours}h {remainder}m"
|
|
if hours:
|
|
return f"{hours}h"
|
|
return f"{remainder}m"
|
|
|
|
|
|
def media_item_to_jsonable(item: MediaItem) -> dict[str, Any]:
|
|
data: dict[str, Any] = {
|
|
"title": item.title,
|
|
"mediaType": item.media_type,
|
|
"sourceId": item.source_id or "",
|
|
"tmdbId": item.tmdb_id or "",
|
|
"imdbId": item.imdb_id or "",
|
|
"year": item.year or "",
|
|
"genres": item.genres or "",
|
|
"rating": item.rating or "",
|
|
"runtime": item.runtime or "",
|
|
"summary": item.summary or "",
|
|
}
|
|
if item.media_type == "show":
|
|
data["seasons"] = item.seasons if item.seasons is not None else ""
|
|
data["episodes"] = item.episodes if item.episodes is not None else ""
|
|
return data
|
|
|
|
|
|
def media_item_from_data(data: Any, media_type: str) -> MediaItem:
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"{media_type.title()} entries must be objects")
|
|
|
|
title = clean_media_text(str(data.get("title", "")), 120)
|
|
if not title:
|
|
raise ValueError(f"{media_type.title()} entries must include a title")
|
|
|
|
year = parse_year_text(str(data.get("year", "")))
|
|
seasons = parse_int_text(str(data.get("seasons", ""))) if media_type == "show" else None
|
|
episodes = parse_int_text(str(data.get("episodes", ""))) if media_type == "show" else None
|
|
|
|
return MediaItem(
|
|
title=title,
|
|
media_type=media_type,
|
|
source_id=clean_media_text(str(data.get("sourceId", "")), 120),
|
|
tmdb_id=clean_media_text(str(data.get("tmdbId", "")), 120),
|
|
imdb_id=clean_media_text(str(data.get("imdbId", "")), 120),
|
|
year=year,
|
|
genres=clean_media_text(str(data.get("genres", "")), 120),
|
|
rating=clean_media_text(str(data.get("rating", "")), 32),
|
|
runtime=format_runtime(str(data.get("runtime", ""))) if media_type == "movie" else None,
|
|
summary=clean_media_text(str(data.get("summary", ""))),
|
|
seasons=seasons,
|
|
episodes=episodes,
|
|
)
|
|
|
|
|
|
def media_items_from_data(raw_items: Any, media_type: str) -> list[MediaItem]:
|
|
if raw_items is None:
|
|
return []
|
|
if not isinstance(raw_items, list):
|
|
raise ValueError(f"{media_type.title()} library must be a list")
|
|
|
|
deduped: dict[tuple[str, str], MediaItem] = {}
|
|
for raw_item in raw_items:
|
|
item = media_item_from_data(raw_item, media_type)
|
|
deduped[(item.title.casefold(), item.year or "")] = item
|
|
|
|
return sorted(deduped.values(), key=lambda item: (item.title.casefold(), item.year or ""))
|
|
|
|
|
|
def media_library_to_jsonable(movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]:
|
|
return {
|
|
"movies": [media_item_to_jsonable(item) for item in movies],
|
|
"shows": [media_item_to_jsonable(item) for item in shows],
|
|
}
|
|
|
|
|
|
def load_media_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]:
|
|
data = load_state(runtime.media_library_path)
|
|
return (
|
|
media_items_from_data(data.get("movies", []), "movie"),
|
|
media_items_from_data(data.get("shows", []), "show"),
|
|
)
|
|
|
|
|
|
def save_media_library(runtime: BotRuntime, movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]:
|
|
payload = media_library_to_jsonable(movies, shows)
|
|
payload["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.media_library_path, payload)
|
|
return payload
|
|
|
|
|
|
def reset_media_library(runtime: BotRuntime) -> dict[str, Any]:
|
|
save_media_library(runtime, [], [])
|
|
state = load_state(runtime.settings_path)
|
|
for key in (
|
|
"jellyfin_last_sync_at",
|
|
"jellyfin_last_sync_error",
|
|
"jellyfin_last_published_at",
|
|
"jellyfin_last_fingerprint",
|
|
"jellyfin_last_published_fingerprint",
|
|
"jellyfin_last_changes",
|
|
):
|
|
state.pop(key, None)
|
|
state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.settings_path, state)
|
|
return {
|
|
"library": media_library_to_jsonable([], []),
|
|
"movieCount": 0,
|
|
"showCount": 0,
|
|
"changes": {"added": [], "removed": [], "addedCount": 0, "removedCount": 0},
|
|
"jellyfin": jellyfin_settings(runtime),
|
|
}
|
|
|
|
|
|
def jellyfin_runtime(runtime_ticks: Any) -> str | None:
|
|
try:
|
|
ticks = int(runtime_ticks or 0)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if ticks <= 0:
|
|
return None
|
|
minutes = round(ticks / 10_000_000 / 60)
|
|
if minutes <= 0:
|
|
return None
|
|
hours, remainder = divmod(minutes, 60)
|
|
if hours and remainder:
|
|
return f"{hours}h {remainder}m"
|
|
if hours:
|
|
return f"{hours}h"
|
|
return f"{remainder}m"
|
|
|
|
|
|
def jellyfin_item_year(item: dict[str, Any]) -> str | None:
|
|
production_year = item.get("ProductionYear")
|
|
if production_year:
|
|
return parse_year_text(str(production_year))
|
|
return parse_year_text(str(item.get("PremiereDate", "")))
|
|
|
|
|
|
def jellyfin_item_summary(item: dict[str, Any]) -> str | None:
|
|
return clean_media_text(str(item.get("Overview", "") or item.get("ShortOverview", "")))
|
|
|
|
|
|
def jellyfin_movie_from_item(item: dict[str, Any]) -> MediaItem:
|
|
provider_ids = item.get("ProviderIds") if isinstance(item.get("ProviderIds"), dict) else {}
|
|
return MediaItem(
|
|
title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Movie",
|
|
media_type="movie",
|
|
source_id=clean_media_text(str(item.get("Id", "")), 120),
|
|
tmdb_id=clean_media_text(str(provider_ids.get("Tmdb", "")), 120),
|
|
imdb_id=clean_media_text(str(provider_ids.get("Imdb", "")), 120),
|
|
year=jellyfin_item_year(item),
|
|
genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120),
|
|
rating=clean_media_text(str(item.get("OfficialRating", "")), 32),
|
|
runtime=jellyfin_runtime(item.get("RunTimeTicks")),
|
|
summary=jellyfin_item_summary(item),
|
|
)
|
|
|
|
|
|
def jellyfin_show_from_item(item: dict[str, Any]) -> MediaItem:
|
|
provider_ids = item.get("ProviderIds") if isinstance(item.get("ProviderIds"), dict) else {}
|
|
return MediaItem(
|
|
title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Show",
|
|
media_type="show",
|
|
source_id=clean_media_text(str(item.get("Id", "")), 120),
|
|
tmdb_id=clean_media_text(str(provider_ids.get("Tmdb", "")), 120),
|
|
imdb_id=clean_media_text(str(provider_ids.get("Imdb", "")), 120),
|
|
year=jellyfin_item_year(item),
|
|
genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120),
|
|
rating=clean_media_text(str(item.get("OfficialRating", "")), 32),
|
|
summary=jellyfin_item_summary(item),
|
|
seasons=parse_int_text(str(item.get("ChildCount", ""))),
|
|
episodes=parse_int_text(str(item.get("RecursiveItemCount", ""))),
|
|
)
|
|
|
|
|
|
def jellyfin_request(settings: dict[str, Any], path: str, params: dict[str, Any]) -> dict[str, Any]:
|
|
base_url = str(settings.get("url", "")).strip().rstrip("/")
|
|
api_key = str(settings.get("apiKey", "")).strip()
|
|
if not base_url or not api_key:
|
|
raise ValueError("Jellyfin URL and API key are required")
|
|
|
|
query = urllib.parse.urlencode({key: value for key, value in params.items() if value is not None})
|
|
url = f"{base_url}{path}"
|
|
if query:
|
|
url = f"{url}?{query}"
|
|
request = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"Accept": "application/json",
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
"X-Emby-Token": api_key,
|
|
},
|
|
method="GET",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Jellyfin API failed: HTTP {exc.code} {detail}") from exc
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
|
|
raise RuntimeError(f"Jellyfin API failed: {clean_error(exc)}") from exc
|
|
|
|
|
|
def jellyfin_connection_settings(runtime: BotRuntime) -> dict[str, Any]:
|
|
state = load_state(runtime.settings_path)
|
|
return {
|
|
"url": str(state.get("jellyfin_url", "")).strip(),
|
|
"apiKey": str(state.get("jellyfin_api_key", "")).strip(),
|
|
}
|
|
|
|
|
|
def media_item_duration_seconds(item: MediaItem) -> int:
|
|
runtime = str(item.runtime or "").strip().lower()
|
|
if not runtime:
|
|
return 0
|
|
|
|
hours = 0
|
|
minutes = 0
|
|
hour_match = re.search(r"(\d+)\s*h", runtime)
|
|
minute_match = re.search(r"(\d+)\s*m", runtime)
|
|
if hour_match:
|
|
hours = int(hour_match.group(1))
|
|
if minute_match:
|
|
minutes = int(minute_match.group(1))
|
|
elif runtime.isdigit():
|
|
minutes = int(runtime)
|
|
return max(0, hours * 3600 + minutes * 60)
|
|
|
|
|
|
def jellyfin_stream_url(base_url: str, item_id: str, api_key: str) -> str:
|
|
query = urllib.parse.urlencode({"static": "true", "api_key": api_key})
|
|
return f"{base_url}/Videos/{urllib.parse.quote(item_id, safe='')}/stream?{query}"
|
|
|
|
|
|
def jellyfin_download_url(base_url: str, item_id: str, api_key: str) -> str:
|
|
query = urllib.parse.urlencode({"api_key": api_key})
|
|
return f"{base_url}/Items/{urllib.parse.quote(item_id, safe='')}/Download?{query}"
|
|
|
|
|
|
def jellyfin_first_episode(settings: dict[str, Any], series_id: str) -> dict[str, Any] | None:
|
|
data = jellyfin_request(
|
|
settings,
|
|
"/Items",
|
|
{
|
|
"ParentId": series_id,
|
|
"Recursive": "true",
|
|
"IncludeItemTypes": "Episode",
|
|
"Fields": "RunTimeTicks,ParentIndexNumber,IndexNumber",
|
|
"SortBy": "ParentIndexNumber,IndexNumber,SortName",
|
|
"SortOrder": "Ascending",
|
|
"Limit": 1,
|
|
},
|
|
)
|
|
items = data.get("Items", [])
|
|
if not isinstance(items, list):
|
|
raise RuntimeError("Jellyfin returned an invalid episode payload")
|
|
for item in items:
|
|
if isinstance(item, dict) and str(item.get("Id", "")).strip():
|
|
return item
|
|
return None
|
|
|
|
|
|
def resolve_jellyfin_playback_source(runtime: BotRuntime, item: MediaItem) -> dict[str, Any]:
|
|
settings = jellyfin_connection_settings(runtime)
|
|
base_url = str(settings.get("url", "")).strip().rstrip("/")
|
|
api_key = str(settings.get("apiKey", "")).strip()
|
|
if not base_url or not api_key:
|
|
raise ValueError("Jellyfin playback is not configured")
|
|
if not item.source_id:
|
|
raise ValueError(f"Media item has no Jellyfin source ID: {item.title}")
|
|
|
|
playback_item_id = item.source_id
|
|
playback_title = item.title
|
|
duration_seconds = media_item_duration_seconds(item)
|
|
episode_meta: dict[str, Any] | None = None
|
|
|
|
if item.media_type == "show":
|
|
episode = jellyfin_first_episode(settings, item.source_id)
|
|
if episode is None:
|
|
raise ValueError(f"No playable episodes found for show: {item.title}")
|
|
playback_item_id = clean_media_text(str(episode.get("Id", "")), 120) or item.source_id
|
|
season_number = int(episode.get("ParentIndexNumber") or 0)
|
|
episode_number = int(episode.get("IndexNumber") or 0)
|
|
episode_title = clean_media_text(str(episode.get("Name", "")), 120) or "Episode"
|
|
playback_title = f"{item.title} - S{season_number:02d}E{episode_number:02d} - {episode_title}"
|
|
ticks = episode.get("RunTimeTicks")
|
|
try:
|
|
duration_seconds = max(0, round(int(ticks or 0) / 10_000_000))
|
|
except (TypeError, ValueError):
|
|
duration_seconds = 0
|
|
episode_meta = {
|
|
"seasonNumber": season_number,
|
|
"episodeNumber": episode_number,
|
|
"episodeTitle": episode_title,
|
|
"seriesId": item.source_id,
|
|
}
|
|
|
|
return {
|
|
"itemId": playback_item_id,
|
|
"sourceId": item.source_id,
|
|
"title": playback_title,
|
|
"mediaType": item.media_type,
|
|
"streamUrl": jellyfin_stream_url(base_url, playback_item_id, api_key),
|
|
"downloadUrl": jellyfin_download_url(base_url, playback_item_id, api_key),
|
|
"durationSeconds": duration_seconds,
|
|
"posterUrl": media_item_card_payload(item).get("posterUrl", ""),
|
|
"year": item.year or "",
|
|
"tmdbId": item.tmdb_id or "",
|
|
"imdbId": item.imdb_id or "",
|
|
"episode": episode_meta,
|
|
}
|
|
|
|
|
|
def fetch_jellyfin_library_ids(settings: dict[str, Any], library_names: list[str]) -> list[str]:
|
|
if not library_names:
|
|
return []
|
|
|
|
requested = {name.casefold() for name in library_names}
|
|
data = jellyfin_request(settings, "/Library/MediaFolders", {})
|
|
items = data.get("Items", [])
|
|
if not isinstance(items, list):
|
|
raise RuntimeError("Jellyfin returned an invalid library folder payload")
|
|
|
|
matched: list[str] = []
|
|
available: list[str] = []
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("Name", "")).strip()
|
|
folder_id = str(item.get("Id", "")).strip()
|
|
if name:
|
|
available.append(name)
|
|
if name.casefold() in requested and folder_id:
|
|
matched.append(folder_id)
|
|
|
|
missing = sorted(set(library_names) - {name for name in available if name.casefold() in requested})
|
|
if missing:
|
|
raise ValueError(f"Jellyfin library not found: {', '.join(missing)}")
|
|
return matched
|
|
|
|
|
|
def fetch_jellyfin_items(settings: dict[str, Any], item_type: str) -> list[dict[str, Any]]:
|
|
parent_ids = settings.get("ParentIds")
|
|
if isinstance(parent_ids, list) and parent_ids:
|
|
all_items: list[dict[str, Any]] = []
|
|
for parent_id in parent_ids:
|
|
scoped_settings = dict(settings)
|
|
scoped_settings["ParentId"] = str(parent_id)
|
|
scoped_settings.pop("ParentIds", None)
|
|
all_items.extend(fetch_jellyfin_items(scoped_settings, item_type))
|
|
return all_items
|
|
|
|
parent_id_value = str(settings.get("ParentId", "")).strip() or None
|
|
items: list[dict[str, Any]] = []
|
|
start_index = 0
|
|
limit = 200
|
|
while True:
|
|
data = jellyfin_request(
|
|
settings,
|
|
"/Items",
|
|
{
|
|
"Recursive": "true",
|
|
"IncludeItemTypes": item_type,
|
|
"Fields": "Genres,Overview,OfficialRating,RecursiveItemCount,ChildCount,PremiereDate,RunTimeTicks,ProviderIds",
|
|
"SortBy": "SortName",
|
|
"SortOrder": "Ascending",
|
|
"EnableImages": "false",
|
|
"ParentId": parent_id_value,
|
|
"StartIndex": start_index,
|
|
"Limit": limit,
|
|
},
|
|
)
|
|
page = data.get("Items", [])
|
|
if not isinstance(page, list):
|
|
raise RuntimeError("Jellyfin returned an invalid Items payload")
|
|
items.extend(item for item in page if isinstance(item, dict))
|
|
total = int(data.get("TotalRecordCount", len(items)) or len(items))
|
|
if not page or len(items) >= total:
|
|
return items
|
|
start_index += limit
|
|
|
|
|
|
def jellyfin_item_dedupe_key(item: dict[str, Any], item_type: str) -> tuple[str, str]:
|
|
provider_ids = item.get("ProviderIds")
|
|
if isinstance(provider_ids, dict):
|
|
for provider in ("Tmdb", "Imdb", "Tvdb"):
|
|
value = str(provider_ids.get(provider, "")).strip().casefold()
|
|
if value:
|
|
return item_type.casefold(), f"{provider.casefold()}:{value}"
|
|
|
|
title = clean_media_text(str(item.get("Name", "")), 120) or ""
|
|
year = jellyfin_item_year(item) or ""
|
|
return item_type.casefold(), f"title:{title.casefold()}:{year}"
|
|
|
|
|
|
def dedupe_jellyfin_items(items: list[dict[str, Any]], item_type: str) -> list[dict[str, Any]]:
|
|
provider_deduped: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for item in items:
|
|
key = jellyfin_item_dedupe_key(item, item_type)
|
|
current = provider_deduped.get(key)
|
|
if current is None:
|
|
provider_deduped[key] = item
|
|
continue
|
|
current_overview = str(current.get("Overview", "") or current.get("ShortOverview", ""))
|
|
next_overview = str(item.get("Overview", "") or item.get("ShortOverview", ""))
|
|
if len(next_overview) > len(current_overview):
|
|
provider_deduped[key] = item
|
|
|
|
title_deduped: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for item in provider_deduped.values():
|
|
title = clean_media_text(str(item.get("Name", "")), 120) or ""
|
|
key = (title.casefold(), jellyfin_item_year(item) or "")
|
|
current = title_deduped.get(key)
|
|
if current is None:
|
|
title_deduped[key] = item
|
|
continue
|
|
current_has_provider = bool(current.get("ProviderIds"))
|
|
next_has_provider = bool(item.get("ProviderIds"))
|
|
if next_has_provider and not current_has_provider:
|
|
title_deduped[key] = item
|
|
continue
|
|
current_overview = str(current.get("Overview", "") or current.get("ShortOverview", ""))
|
|
next_overview = str(item.get("Overview", "") or item.get("ShortOverview", ""))
|
|
if len(next_overview) > len(current_overview):
|
|
title_deduped[key] = item
|
|
|
|
return sorted(
|
|
title_deduped.values(),
|
|
key=lambda item: (str(item.get("SortName", item.get("Name", ""))).casefold(), str(item.get("ProductionYear", ""))),
|
|
)
|
|
|
|
|
|
def fetch_jellyfin_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]:
|
|
state = load_state(runtime.settings_path)
|
|
settings = {
|
|
"url": str(state.get("jellyfin_url", "")).strip(),
|
|
"apiKey": str(state.get("jellyfin_api_key", "")).strip(),
|
|
}
|
|
library_names = jellyfin_settings(runtime).get("libraryNames", [])
|
|
if isinstance(library_names, list) and library_names:
|
|
settings["ParentIds"] = fetch_jellyfin_library_ids(settings, [str(name) for name in library_names])
|
|
movie_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Movie"), "Movie")
|
|
show_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Series"), "Series")
|
|
movies = [jellyfin_movie_from_item(item) for item in movie_items]
|
|
shows = [jellyfin_show_from_item(item) for item in show_items]
|
|
return (
|
|
sorted(movies, key=lambda item: (item.title.casefold(), item.year or "")),
|
|
sorted(shows, key=lambda item: (item.title.casefold(), item.year or "")),
|
|
)
|
|
|
|
|
|
def media_library_fingerprint(movies: list[MediaItem], shows: list[MediaItem]) -> str:
|
|
payload = media_library_to_jsonable(movies, shows)
|
|
body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
|
return hashlib.sha256(body).hexdigest()
|
|
|
|
|
|
def media_item_tracking_key(item: MediaItem) -> str:
|
|
return f"{item.media_type}:{item.title.casefold()}:{item.year or ''}"
|
|
|
|
|
|
def media_change_entry(item: MediaItem) -> dict[str, Any]:
|
|
return {
|
|
"title": item.title,
|
|
"year": item.year or "",
|
|
"mediaType": item.media_type,
|
|
}
|
|
|
|
|
|
def catalog_poster_path(item: MediaItem) -> str:
|
|
if not item.source_id:
|
|
return ""
|
|
return f"/catalog/poster/{urllib.parse.quote(item.source_id, safe='')}"
|
|
|
|
|
|
def tmdb_auth_headers() -> dict[str, str]:
|
|
bearer = os.getenv("TMDB_API_READ_TOKEN", "").strip()
|
|
if bearer:
|
|
return {
|
|
"Authorization": f"Bearer {bearer}",
|
|
"Accept": "application/json",
|
|
}
|
|
return {"Accept": "application/json"}
|
|
|
|
|
|
def tmdb_api_key() -> str:
|
|
return os.getenv("TMDB_API_KEY", "").strip()
|
|
|
|
|
|
def tmdb_enabled() -> bool:
|
|
return bool(os.getenv("TMDB_API_READ_TOKEN", "").strip() or tmdb_api_key())
|
|
|
|
|
|
def tmdb_request(path: str, params: dict[str, Any]) -> dict[str, Any]:
|
|
if not tmdb_enabled():
|
|
raise RuntimeError("TMDB credentials are not configured")
|
|
|
|
query_params = {key: value for key, value in params.items() if value not in {None, ""}}
|
|
api_key = tmdb_api_key()
|
|
if api_key:
|
|
query_params["api_key"] = api_key
|
|
query = urllib.parse.urlencode(query_params)
|
|
url = f"https://api.themoviedb.org/3{path}"
|
|
if query:
|
|
url = f"{url}?{query}"
|
|
request = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
**tmdb_auth_headers(),
|
|
},
|
|
method="GET",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=20) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"TMDB API failed: HTTP {exc.code} {detail}") from exc
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
|
|
raise RuntimeError(f"TMDB API failed: {clean_error(exc)}") from exc
|
|
|
|
|
|
def tmdb_image_url(file_path: str | None) -> str | None:
|
|
if not file_path:
|
|
return None
|
|
return f"{TMDB_IMAGE_BASE_URL}{file_path}"
|
|
|
|
|
|
def tmdb_cache_key(item: MediaItem) -> str:
|
|
return "|".join(
|
|
[
|
|
item.media_type,
|
|
item.source_id or "",
|
|
item.tmdb_id or "",
|
|
item.imdb_id or "",
|
|
item.title.casefold(),
|
|
item.year or "",
|
|
]
|
|
)
|
|
|
|
|
|
def tmdb_search_poster_path(item: MediaItem) -> str | None:
|
|
if item.tmdb_id:
|
|
endpoint = "/movie" if item.media_type == "movie" else "/tv"
|
|
data = tmdb_request(f"{endpoint}/{urllib.parse.quote(item.tmdb_id, safe='')}", {})
|
|
return data.get("poster_path")
|
|
|
|
if item.imdb_id:
|
|
data = tmdb_request(
|
|
f"/find/{urllib.parse.quote(item.imdb_id, safe='')}",
|
|
{"external_source": "imdb_id"},
|
|
)
|
|
results_key = "movie_results" if item.media_type == "movie" else "tv_results"
|
|
results = data.get(results_key, [])
|
|
if isinstance(results, list):
|
|
for result in results:
|
|
if isinstance(result, dict) and result.get("poster_path"):
|
|
return str(result["poster_path"])
|
|
|
|
search_params: dict[str, Any] = {"query": item.title}
|
|
year = media_item_year_number(item)
|
|
if item.media_type == "movie":
|
|
endpoint = "/search/movie"
|
|
if year is not None:
|
|
search_params["year"] = year
|
|
else:
|
|
endpoint = "/search/tv"
|
|
if year is not None:
|
|
search_params["first_air_date_year"] = year
|
|
data = tmdb_request(endpoint, search_params)
|
|
results = data.get("results", [])
|
|
if not isinstance(results, list):
|
|
return None
|
|
for result in results:
|
|
if isinstance(result, dict) and result.get("poster_path"):
|
|
return str(result["poster_path"])
|
|
return None
|
|
|
|
|
|
def tmdb_fallback_poster_url(item: MediaItem) -> str | None:
|
|
if not tmdb_enabled():
|
|
return None
|
|
cache_key = tmdb_cache_key(item)
|
|
if cache_key in TMDB_POSTER_CACHE:
|
|
return TMDB_POSTER_CACHE[cache_key]
|
|
try:
|
|
poster_path = tmdb_search_poster_path(item)
|
|
except Exception:
|
|
TMDB_POSTER_CACHE[cache_key] = None
|
|
return None
|
|
url = tmdb_image_url(poster_path)
|
|
TMDB_POSTER_CACHE[cache_key] = url
|
|
return url
|
|
|
|
|
|
def fetch_jellyfin_image(runtime: BotRuntime, item_id: str) -> tuple[bytes, str] | None:
|
|
if not item_id.strip():
|
|
return None
|
|
state = load_state(runtime.settings_path)
|
|
base_url = str(state.get("jellyfin_url", "")).strip().rstrip("/")
|
|
api_key = str(state.get("jellyfin_api_key", "")).strip()
|
|
if not base_url or not api_key:
|
|
return None
|
|
|
|
url = f"{base_url}/Items/{urllib.parse.quote(item_id, safe='')}/Images/Primary?maxHeight=720&maxWidth=480&quality=90"
|
|
request = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
"X-Emby-Token": api_key,
|
|
"Accept": "image/*",
|
|
},
|
|
method="GET",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
content_type = response.headers.get_content_type() or "image/jpeg"
|
|
return response.read(), content_type
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code == 404:
|
|
return None
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Jellyfin image fetch failed: HTTP {exc.code} {detail}") from exc
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
|
|
raise RuntimeError(f"Jellyfin image fetch failed: {clean_error(exc)}") from exc
|
|
|
|
|
|
def catalog_known_source_ids(runtime: BotRuntime) -> set[str]:
|
|
movies, shows = load_media_library(runtime)
|
|
return {
|
|
item.source_id.strip()
|
|
for item in [*movies, *shows]
|
|
if item.source_id and item.source_id.strip()
|
|
}
|
|
|
|
|
|
def catalog_item_for_source_id(runtime: BotRuntime, item_id: str) -> MediaItem | None:
|
|
movies, shows = load_media_library(runtime)
|
|
for item in [*movies, *shows]:
|
|
if item.source_id and item.source_id.strip() == item_id:
|
|
return item
|
|
return None
|
|
|
|
|
|
def media_library_changes(
|
|
previous_movies: list[MediaItem],
|
|
previous_shows: list[MediaItem],
|
|
current_movies: list[MediaItem],
|
|
current_shows: list[MediaItem],
|
|
) -> dict[str, Any]:
|
|
previous = {media_item_tracking_key(item): item for item in [*previous_movies, *previous_shows]}
|
|
current = {media_item_tracking_key(item): item for item in [*current_movies, *current_shows]}
|
|
added_keys = sorted(set(current) - set(previous), key=lambda key: (current[key].media_type, current[key].title.casefold()))
|
|
removed_keys = sorted(set(previous) - set(current), key=lambda key: (previous[key].media_type, previous[key].title.casefold()))
|
|
added = [media_change_entry(current[key]) for key in added_keys]
|
|
removed = [media_change_entry(previous[key]) for key in removed_keys]
|
|
return {
|
|
"added": added,
|
|
"removed": removed,
|
|
"addedCount": len(added),
|
|
"removedCount": len(removed),
|
|
}
|
|
|
|
|
|
def update_jellyfin_sync_state(
|
|
runtime: BotRuntime,
|
|
*,
|
|
fingerprint: str | None = None,
|
|
error: str | None = None,
|
|
published: bool = False,
|
|
changes: dict[str, Any] | None = None,
|
|
) -> None:
|
|
state = load_state(runtime.settings_path)
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
state["jellyfin_last_sync_at"] = now
|
|
state["jellyfin_last_sync_error"] = error
|
|
if fingerprint is not None:
|
|
state["jellyfin_last_fingerprint"] = fingerprint
|
|
if published:
|
|
state["jellyfin_last_published_at"] = now
|
|
if fingerprint is not None:
|
|
state["jellyfin_last_published_fingerprint"] = fingerprint
|
|
if changes is not None:
|
|
state["jellyfin_last_changes"] = changes
|
|
save_state(runtime.settings_path, state)
|
|
|
|
|
|
def sync_jellyfin_library(runtime: BotRuntime, force_publish: bool = False) -> dict[str, Any]:
|
|
previous_movies, previous_shows = load_media_library(runtime)
|
|
movies, shows = fetch_jellyfin_library(runtime)
|
|
changes = media_library_changes(previous_movies, previous_shows, movies, shows)
|
|
fingerprint = media_library_fingerprint(movies, shows)
|
|
settings_state = load_state(runtime.settings_path)
|
|
changed = fingerprint != str(settings_state.get("jellyfin_last_fingerprint", ""))
|
|
publish_changed = fingerprint != str(settings_state.get("jellyfin_last_published_fingerprint", ""))
|
|
save_media_library(runtime, movies, shows)
|
|
|
|
published = False
|
|
result: dict[str, Any] = {}
|
|
if (publish_changed or force_publish) and not runtime.dry_run:
|
|
result = publish_media_items(
|
|
runtime=runtime,
|
|
channel_id=channel_settings(runtime)["mediaChannelId"],
|
|
movies_all=movies,
|
|
shows_all=shows,
|
|
)
|
|
published = True
|
|
|
|
update_jellyfin_sync_state(runtime, fingerprint=fingerprint, published=published, changes=changes)
|
|
return {
|
|
"changed": changed,
|
|
"published": published,
|
|
"changes": changes,
|
|
"movieCount": len(movies),
|
|
"showCount": len(shows),
|
|
"libraryNames": jellyfin_settings(runtime).get("libraryNames", []),
|
|
"library": media_library_to_jsonable(movies, shows),
|
|
"publishResult": result,
|
|
"jellyfin": jellyfin_settings(runtime),
|
|
}
|
|
|
|
|
|
def maybe_run_jellyfin_sync(runtime: BotRuntime) -> dict[str, Any] | None:
|
|
settings = jellyfin_settings(runtime)
|
|
if not settings["configured"] or not settings["autoSync"]:
|
|
return None
|
|
|
|
interval = int(os.getenv("JELLYFIN_SYNC_INTERVAL_SECONDS", str(DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS)))
|
|
state = load_state(runtime.settings_path)
|
|
last_sync = str(state.get("jellyfin_last_sync_at", "")).strip()
|
|
if last_sync:
|
|
try:
|
|
last = datetime.fromisoformat(last_sync)
|
|
if (datetime.now(timezone.utc) - last).total_seconds() < interval:
|
|
return None
|
|
except ValueError:
|
|
pass
|
|
|
|
try:
|
|
return sync_jellyfin_library(runtime)
|
|
except Exception as exc:
|
|
update_jellyfin_sync_state(runtime, error=str(exc)[:240])
|
|
raise
|
|
|
|
|
|
def media_item_heading(item: MediaItem) -> str:
|
|
if item.year:
|
|
return f"{item.title} ({item.year})"
|
|
return item.title
|
|
|
|
|
|
def media_item_genres(item: MediaItem) -> list[str]:
|
|
if not item.genres:
|
|
return []
|
|
return [genre.strip() for genre in item.genres.split(",") if genre.strip()]
|
|
|
|
|
|
def media_item_runtime_minutes(item: MediaItem) -> int | None:
|
|
if not item.runtime or item.media_type != "movie":
|
|
return None
|
|
cleaned = item.runtime.lower().replace(" ", "")
|
|
match = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?", cleaned)
|
|
if match and (match.group(1) or match.group(2)):
|
|
hours = int(match.group(1) or 0)
|
|
minutes = int(match.group(2) or 0)
|
|
return (hours * 60) + minutes
|
|
if cleaned.isdigit():
|
|
return int(cleaned)
|
|
return None
|
|
|
|
|
|
def media_item_year_number(item: MediaItem) -> int | None:
|
|
if not item.year:
|
|
return None
|
|
try:
|
|
return int(item.year)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def media_item_search_blob(item: MediaItem) -> str:
|
|
fields = [
|
|
item.title,
|
|
item.year or "",
|
|
item.genres or "",
|
|
item.rating or "",
|
|
item.runtime or "",
|
|
item.summary or "",
|
|
]
|
|
return " ".join(fields).casefold()
|
|
|
|
|
|
def recommendation_presets() -> list[dict[str, str]]:
|
|
return [
|
|
{"id": "balanced", "label": "Balanced"},
|
|
{"id": "quick", "label": "Quick Watch"},
|
|
{"id": "deep", "label": "Deep Dive"},
|
|
{"id": "recent", "label": "Newer Picks"},
|
|
{"id": "comfort", "label": "Comfort Picks"},
|
|
]
|
|
|
|
|
|
def recommendation_reason(item: MediaItem, mode: str, genre: str) -> str:
|
|
reasons: list[str] = []
|
|
if genre and genre.lower() != "all":
|
|
reasons.append(f"matches {genre}")
|
|
if mode == "quick" and item.media_type == "movie" and item.runtime:
|
|
reasons.append(f"short runtime: {item.runtime}")
|
|
elif mode == "deep":
|
|
if item.media_type == "movie" and item.runtime:
|
|
reasons.append(f"long runtime: {item.runtime}")
|
|
elif item.media_type == "show" and item.seasons:
|
|
reasons.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
elif mode == "recent" and item.year:
|
|
reasons.append(f"newer release: {item.year}")
|
|
elif mode == "comfort":
|
|
if item.genres:
|
|
reasons.append(item.genres)
|
|
if item.media_type == "show" and item.episodes:
|
|
reasons.append(f"{item.episodes} episodes")
|
|
else:
|
|
if item.genres:
|
|
reasons.append(item.genres)
|
|
if item.media_type == "movie" and item.runtime:
|
|
reasons.append(item.runtime)
|
|
if item.media_type == "show" and item.episodes:
|
|
reasons.append(f"{item.episodes} episodes")
|
|
return " · ".join(reasons[:2]) or "from your library"
|
|
|
|
|
|
def recommendation_score(item: MediaItem, mode: str, genre: str, search: str) -> float:
|
|
score = 0.0
|
|
genres = {value.casefold() for value in media_item_genres(item)}
|
|
runtime_minutes = media_item_runtime_minutes(item)
|
|
year = media_item_year_number(item)
|
|
blob = media_item_search_blob(item)
|
|
|
|
if genre and genre.casefold() != "all":
|
|
score += 40 if genre.casefold() in genres else -20
|
|
|
|
if search:
|
|
score += 25 if search.casefold() in blob else -30
|
|
|
|
if mode == "quick":
|
|
if item.media_type == "movie":
|
|
if runtime_minutes is not None:
|
|
score += max(0, 180 - runtime_minutes) / 2
|
|
else:
|
|
score += 12
|
|
if item.episodes:
|
|
score += max(0, 48 - item.episodes) / 3
|
|
elif mode == "deep":
|
|
if item.media_type == "movie":
|
|
if runtime_minutes is not None:
|
|
score += min(runtime_minutes, 240) / 2
|
|
else:
|
|
if item.seasons:
|
|
score += min(item.seasons * 10, 50)
|
|
if item.episodes:
|
|
score += min(item.episodes / 3, 30)
|
|
elif mode == "recent":
|
|
if year is not None:
|
|
score += max(0, year - 1990)
|
|
elif mode == "comfort":
|
|
if item.media_type == "show":
|
|
score += 24
|
|
if item.episodes:
|
|
score += min(item.episodes / 4, 25)
|
|
if genres:
|
|
score += 6
|
|
else:
|
|
if genres:
|
|
score += 8
|
|
if item.summary:
|
|
score += 4
|
|
if item.media_type == "movie" and runtime_minutes is not None:
|
|
score += max(0, 150 - abs(115 - runtime_minutes)) / 10
|
|
if item.media_type == "show" and item.episodes:
|
|
score += min(item.episodes / 6, 16)
|
|
|
|
score += (sum(ord(character) for character in item.title.casefold()) % 17) / 10
|
|
return score
|
|
|
|
|
|
def available_media_genres(movies: list[MediaItem], shows: list[MediaItem]) -> list[str]:
|
|
genres: set[str] = set()
|
|
for item in [*movies, *shows]:
|
|
genres.update(media_item_genres(item))
|
|
return sorted(genres, key=str.casefold)
|
|
|
|
|
|
def media_item_card_payload(item: MediaItem, reason: str = "") -> dict[str, Any]:
|
|
return {
|
|
"title": item.title,
|
|
"mediaType": item.media_type,
|
|
"posterUrl": catalog_poster_path(item),
|
|
"year": item.year or "",
|
|
"genres": item.genres or "",
|
|
"rating": item.rating or "",
|
|
"runtime": item.runtime or "",
|
|
"summary": item.summary or "",
|
|
"seasons": item.seasons or 0,
|
|
"episodes": item.episodes or 0,
|
|
"reason": reason,
|
|
}
|
|
|
|
|
|
def recommend_media_items(
|
|
movies: list[MediaItem],
|
|
shows: list[MediaItem],
|
|
*,
|
|
media_type: str = "all",
|
|
genre: str = "all",
|
|
mode: str = "balanced",
|
|
search: str = "",
|
|
limit: int = 6,
|
|
) -> dict[str, Any]:
|
|
pools = {
|
|
"movie": movies,
|
|
"show": shows,
|
|
"all": [*movies, *shows],
|
|
}
|
|
selected_pool = pools.get(media_type, pools["all"])
|
|
if not selected_pool:
|
|
return {"items": [], "summary": "No media available for recommendations."}
|
|
|
|
ranked = sorted(
|
|
selected_pool,
|
|
key=lambda item: (
|
|
recommendation_score(item, mode, genre, search),
|
|
media_item_year_number(item) or 0,
|
|
item.title.casefold(),
|
|
),
|
|
reverse=True,
|
|
)
|
|
items = [
|
|
media_item_card_payload(item, recommendation_reason(item, mode, genre))
|
|
for item in ranked[: max(1, min(limit, 12))]
|
|
]
|
|
media_label = {"all": "movies and shows", "movie": "movies", "show": "shows"}.get(media_type, "media")
|
|
if search:
|
|
summary = f"{mode.title()} picks in {media_label} for “{search}”."
|
|
elif genre and genre.casefold() != "all":
|
|
summary = f"{mode.title()} picks in {media_label} for {genre}."
|
|
else:
|
|
summary = f"{mode.title()} picks from your {media_label}."
|
|
return {"items": items, "summary": summary}
|
|
|
|
|
|
def media_item_value(item: MediaItem) -> str:
|
|
details: list[str] = []
|
|
if item.genres:
|
|
details.append(item.genres)
|
|
if item.rating:
|
|
details.append(item.rating)
|
|
if item.runtime and item.media_type == "movie":
|
|
details.append(item.runtime)
|
|
if item.media_type == "show":
|
|
counts = []
|
|
if item.seasons:
|
|
counts.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
counts.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
if counts:
|
|
details.append(" · ".join(counts))
|
|
|
|
lines = []
|
|
if details:
|
|
lines.append(" • ".join(details))
|
|
if item.summary:
|
|
lines.append(item.summary)
|
|
return "\n".join(lines)[:1024] or "No details provided."
|
|
|
|
|
|
def media_category_embeds(
|
|
title: str,
|
|
items: list[MediaItem],
|
|
color: int,
|
|
total_count: int,
|
|
source_name: str,
|
|
) -> list[dict[str, Any]]:
|
|
embeds: list[dict[str, Any]] = []
|
|
omitted = max(total_count - len(items), 0)
|
|
page_count = max((len(items) + MEDIA_ITEMS_PER_EMBED - 1) // MEDIA_ITEMS_PER_EMBED, 1)
|
|
|
|
for page_index in range(page_count):
|
|
start = page_index * MEDIA_ITEMS_PER_EMBED
|
|
chunk = items[start : start + MEDIA_ITEMS_PER_EMBED]
|
|
description = f"{total_count} total from `{source_name}`"
|
|
if omitted:
|
|
description += f" · showing first {len(items)}"
|
|
if page_count > 1:
|
|
description += f" · page {page_index + 1}/{page_count}"
|
|
|
|
fields = [
|
|
{
|
|
"name": media_item_heading(item)[:256],
|
|
"value": media_item_value(item),
|
|
"inline": False,
|
|
}
|
|
for item in chunk
|
|
]
|
|
embeds.append(
|
|
{
|
|
"title": title,
|
|
"description": description,
|
|
"color": color,
|
|
"fields": fields,
|
|
}
|
|
)
|
|
|
|
return embeds
|
|
|
|
|
|
def render_media_catalog_payloads(
|
|
movies: list[MediaItem],
|
|
shows: list[MediaItem],
|
|
movie_total: int,
|
|
show_total: int,
|
|
movie_source: str,
|
|
show_source: str,
|
|
) -> list[dict[str, Any]]:
|
|
now = datetime.now(timezone.utc)
|
|
embeds: list[dict[str, Any]] = [
|
|
{
|
|
"title": "The Mithral Archive Media Catalog",
|
|
"description": "Current movies and shows available in the archive.",
|
|
"color": 0x5865F2,
|
|
"fields": [
|
|
{"name": "Movies", "value": str(movie_total), "inline": True},
|
|
{"name": "Shows", "value": str(show_total), "inline": True},
|
|
{"name": "Updated", "value": now.strftime("%Y-%m-%d %H:%M UTC"), "inline": True},
|
|
],
|
|
"timestamp": now.isoformat(),
|
|
}
|
|
]
|
|
if movies:
|
|
embeds.extend(media_category_embeds("Movies", movies, 0xF59E0B, movie_total, movie_source))
|
|
if shows:
|
|
embeds.extend(media_category_embeds("Shows", shows, 0x10B981, show_total, show_source))
|
|
|
|
payloads = []
|
|
for index in range(0, len(embeds), MAX_DISCORD_EMBEDS):
|
|
payloads.append(
|
|
{
|
|
"content": "**The Mithral Archive Media Catalog**" if index == 0 else "",
|
|
"embeds": embeds[index : index + MAX_DISCORD_EMBEDS],
|
|
}
|
|
)
|
|
return payloads
|
|
|
|
|
|
def media_markdown_line(item: MediaItem) -> str:
|
|
title = media_item_heading(item)
|
|
details: list[str] = []
|
|
if item.genres:
|
|
details.append(item.genres)
|
|
if item.rating:
|
|
details.append(item.rating)
|
|
if item.runtime and item.media_type == "movie":
|
|
details.append(item.runtime)
|
|
if item.media_type == "show":
|
|
if item.seasons:
|
|
details.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
details.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
|
|
suffix = f" - {'; '.join(details)}" if details else ""
|
|
return f"- **{title}**{suffix}"
|
|
|
|
|
|
def render_media_catalog_markdown(movies: list[MediaItem], shows: list[MediaItem]) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
lines = [
|
|
"# The Mithral Archive Media Catalog",
|
|
"",
|
|
f"Updated: {now.strftime('%Y-%m-%d %H:%M UTC')}",
|
|
f"Movies: {len(movies)}",
|
|
f"Shows: {len(shows)}",
|
|
"",
|
|
]
|
|
|
|
if movies:
|
|
lines.extend(["## Movies", ""])
|
|
for item in movies:
|
|
lines.append(media_markdown_line(item))
|
|
if item.summary:
|
|
lines.append(f" - {item.summary}")
|
|
lines.append("")
|
|
|
|
if shows:
|
|
lines.extend(["## Shows", ""])
|
|
for item in shows:
|
|
lines.append(media_markdown_line(item))
|
|
if item.summary:
|
|
lines.append(f" - {item.summary}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines).strip() + "\n"
|
|
|
|
|
|
def render_catalog_html(runtime: BotRuntime) -> bytes:
|
|
movies, shows = load_media_library(runtime)
|
|
genres = available_media_genres(movies, shows)
|
|
default_recommendations = {
|
|
"balanced": recommend_media_items(movies, shows, mode="balanced", limit=6),
|
|
"quick": recommend_media_items(movies, shows, mode="quick", limit=6),
|
|
"deep": recommend_media_items(movies, shows, mode="deep", limit=6),
|
|
}
|
|
updated_at = load_state(runtime.media_library_path).get("updated_at")
|
|
updated = "Never"
|
|
if updated_at:
|
|
try:
|
|
updated = datetime.fromisoformat(str(updated_at)).strftime("%Y-%m-%d %H:%M UTC")
|
|
except ValueError:
|
|
updated = str(updated_at)
|
|
|
|
def item_block(item: MediaItem) -> str:
|
|
meta = []
|
|
if item.year:
|
|
meta.append(item.year)
|
|
if item.genres:
|
|
meta.append(item.genres)
|
|
if item.rating:
|
|
meta.append(item.rating)
|
|
if item.runtime:
|
|
meta.append(item.runtime)
|
|
if item.media_type == "show":
|
|
if item.seasons:
|
|
meta.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
meta.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
summary = f"<p>{html.escape(item.summary)}</p>" if item.summary else ""
|
|
poster = (
|
|
f'<div class="poster"><img src="{html.escape(catalog_poster_path(item))}" alt="{html.escape(item.title)} poster" loading="lazy"></div>'
|
|
if item.source_id
|
|
else '<div class="poster poster-empty">No poster</div>'
|
|
)
|
|
return (
|
|
f'<article class="item" data-type="{html.escape(item.media_type)}" '
|
|
f'data-year="{html.escape(item.year or "")}" '
|
|
f'data-genres="{html.escape("|".join(media_item_genres(item)).casefold())}" '
|
|
f'data-sort-title="{html.escape(item.title.casefold())}" '
|
|
f'data-sort-year="{html.escape(str(media_item_year_number(item) or 0))}" '
|
|
f'data-search="{html.escape(media_item_search_blob(item))}">'
|
|
f"{poster}"
|
|
f'<div class="item-body"><h2>{html.escape(item.title)}</h2>'
|
|
f'<div class="meta">{html.escape(" · ".join(meta))}</div>'
|
|
f"{summary}</div></article>"
|
|
)
|
|
|
|
items = "\n".join(item_block(item) for item in [*movies, *shows])
|
|
genre_options = "\n".join(
|
|
f'<option value="{html.escape(genre)}">{html.escape(genre)}</option>' for genre in genres
|
|
)
|
|
recommendations_json = json.dumps(default_recommendations)
|
|
body = f"""<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>The Mithral Archive Media Catalog</title>
|
|
<style>
|
|
:root {{ color-scheme: dark; --bg: #111214; --panel: #1b1d21; --panel-alt: #15171a; --line: #30343b; --text: #f1f2f4; --muted: #a2a8b3; --action: #e7e9ed; --action-text: #15171a; }}
|
|
* {{ box-sizing: border-box; }}
|
|
body {{ margin: 0; background: var(--bg); color: var(--text); font: 14px/1.45 ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", sans-serif; }}
|
|
header {{ border-bottom: 1px solid var(--line); background: var(--panel-alt); padding: 18px 22px; }}
|
|
h1 {{ margin: 0; font-size: 20px; letter-spacing: 0; }}
|
|
h2 {{ margin: 0; font-size: 15px; }}
|
|
.header-grid {{ display: grid; gap: 16px; }}
|
|
.stats {{ display: flex; flex-wrap: wrap; gap: 16px; color: var(--muted); }}
|
|
.stat strong {{ display: block; color: var(--text); font-size: 16px; }}
|
|
.toolbar {{ display: grid; grid-template-columns: minmax(220px, 1.2fr) repeat(3, minmax(120px, 0.5fr)); gap: 8px; }}
|
|
.layout {{ display: grid; grid-template-columns: 320px minmax(0, 1fr); min-height: calc(100vh - 96px); }}
|
|
aside {{ border-right: 1px solid var(--line); padding: 18px; background: var(--panel-alt); }}
|
|
.sidebar-section + .sidebar-section {{ margin-top: 18px; }}
|
|
.sidebar-label {{ margin-bottom: 8px; color: var(--muted); font-size: 12px; font-weight: 650; }}
|
|
main {{ min-width: 0; padding: 18px; display: grid; gap: 18px; }}
|
|
.recommendations {{ display: grid; gap: 12px; }}
|
|
.recommendation-actions {{ display: flex; flex-wrap: wrap; gap: 8px; }}
|
|
.recommendation-grid {{ display: grid; gap: 1px; background: var(--line); }}
|
|
.recommendation-card {{ display: grid; grid-template-columns: 64px minmax(0, 1fr); gap: 12px; background: var(--panel); padding: 12px 14px; }}
|
|
.recommendation-card strong {{ display: block; margin-bottom: 4px; font-size: 14px; }}
|
|
.recommendation-poster {{ width: 64px; aspect-ratio: 2 / 3; border: 1px solid var(--line); background: #14161a; overflow: hidden; display: flex; align-items: center; justify-content: center; color: var(--muted); font-size: 11px; }}
|
|
.recommendation-poster img {{ width: 100%; height: 100%; object-fit: cover; display: block; }}
|
|
.recommendation-card .reason {{ color: var(--muted); margin-top: 6px; }}
|
|
input, button, select {{ border: 1px solid var(--line); border-radius: 6px; background: #14161a; color: var(--text); padding: 8px 9px; font: inherit; }}
|
|
button.active {{ background: var(--action); color: var(--action-text); border-color: var(--action); }}
|
|
.button-row {{ display: flex; flex-wrap: wrap; gap: 8px; }}
|
|
.results-note {{ color: var(--muted); }}
|
|
.grid {{ display: grid; gap: 1px; background: var(--line); }}
|
|
.item {{ background: var(--panel); padding: 13px 14px; display: grid; grid-template-columns: 120px minmax(0, 1fr); gap: 14px; }}
|
|
.item[hidden] {{ display: none; }}
|
|
.poster {{ width: 120px; aspect-ratio: 2 / 3; border: 1px solid var(--line); background: #14161a; overflow: hidden; display: flex; align-items: center; justify-content: center; color: var(--muted); font-size: 12px; }}
|
|
.poster img {{ width: 100%; height: 100%; object-fit: cover; display: block; }}
|
|
.poster-empty {{ padding: 12px; text-align: center; }}
|
|
.item-body {{ min-width: 0; }}
|
|
.meta {{ color: var(--muted); }}
|
|
p {{ margin: 8px 0 0; color: #d5d8de; }}
|
|
@media (max-width: 980px) {{ .layout {{ grid-template-columns: 1fr; }} aside {{ border-right: 0; border-bottom: 1px solid var(--line); }} }}
|
|
@media (max-width: 720px) {{ .toolbar {{ grid-template-columns: 1fr 1fr; }} .toolbar input {{ grid-column: 1 / -1; }} .item {{ grid-template-columns: 1fr; }} .poster {{ width: 96px; }} }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<header>
|
|
<div class="header-grid">
|
|
<h1>The Mithral Archive Media Catalog</h1>
|
|
<div class="stats">
|
|
<div class="stat"><strong>{len(movies)}</strong>Movies</div>
|
|
<div class="stat"><strong>{len(shows)}</strong>Shows</div>
|
|
<div class="stat"><strong>{html.escape(updated)}</strong>Updated</div>
|
|
</div>
|
|
<div class="toolbar">
|
|
<input id="search" type="search" placeholder="Search title, genre, year, or summary" autocomplete="off">
|
|
<select id="typeFilter">
|
|
<option value="all">All media</option>
|
|
<option value="movie">Movies</option>
|
|
<option value="show">Shows</option>
|
|
</select>
|
|
<select id="genreFilter">
|
|
<option value="all">All genres</option>
|
|
{genre_options}
|
|
</select>
|
|
<select id="sortFilter">
|
|
<option value="title">Title</option>
|
|
<option value="year_desc">Newest year</option>
|
|
<option value="year_asc">Oldest year</option>
|
|
</select>
|
|
</div>
|
|
</div>
|
|
</header>
|
|
<div class="layout">
|
|
<aside>
|
|
<div class="sidebar-section">
|
|
<div class="sidebar-label">Recommendation Mode</div>
|
|
<div class="button-row">
|
|
<button class="active" type="button" data-recommendation-mode="balanced">Balanced</button>
|
|
<button type="button" data-recommendation-mode="quick">Quick</button>
|
|
<button type="button" data-recommendation-mode="deep">Deep</button>
|
|
</div>
|
|
</div>
|
|
<div class="sidebar-section recommendations">
|
|
<div class="sidebar-label">Recommended From Your Library</div>
|
|
<div id="recommendationSummary" class="results-note"></div>
|
|
<div id="recommendations" class="recommendation-grid"></div>
|
|
</div>
|
|
</aside>
|
|
<main>
|
|
<div id="resultCount" class="results-note"></div>
|
|
<div id="items" class="grid">{items}</div>
|
|
</main>
|
|
</div>
|
|
<script>
|
|
const search = document.querySelector("#search");
|
|
const typeFilter = document.querySelector("#typeFilter");
|
|
const genreFilter = document.querySelector("#genreFilter");
|
|
const sortFilter = document.querySelector("#sortFilter");
|
|
const recommendationButtons = document.querySelectorAll("[data-recommendation-mode]");
|
|
const recommendations = {recommendations_json};
|
|
let recommendationMode = "balanced";
|
|
const itemsRoot = document.querySelector("#items");
|
|
const recommendationRoot = document.querySelector("#recommendations");
|
|
const recommendationSummary = document.querySelector("#recommendationSummary");
|
|
const resultCount = document.querySelector("#resultCount");
|
|
function renderRecommendations() {{
|
|
const payload = recommendations[recommendationMode] || recommendations.balanced || {{ items: [], summary: "" }};
|
|
recommendationSummary.textContent = payload.summary || "";
|
|
recommendationRoot.innerHTML = "";
|
|
payload.items.forEach((item) => {{
|
|
const node = document.createElement("article");
|
|
node.className = "recommendation-card";
|
|
const counts = item.mediaType === "show"
|
|
? [item.seasons ? `${{item.seasons}} season${{item.seasons === 1 ? "" : "s"}}` : "", item.episodes ? `${{item.episodes}} episodes` : ""].filter(Boolean).join(" · ")
|
|
: item.runtime;
|
|
const meta = [item.year, item.genres, item.rating, counts].filter(Boolean).join(" · ");
|
|
const poster = item.posterUrl
|
|
? `<div class="recommendation-poster"><img src="${{item.posterUrl}}" alt="${{item.title}} poster" loading="lazy"></div>`
|
|
: `<div class="recommendation-poster">No poster</div>`;
|
|
node.innerHTML = `${{poster}}<div><strong>${{item.title}}</strong><div class="meta">${{meta}}</div>${{item.summary ? `<p>${{item.summary}}</p>` : ""}}<div class="reason">${{item.reason || ""}}</div></div>`;
|
|
recommendationRoot.append(node);
|
|
}});
|
|
recommendationButtons.forEach((button) => button.classList.toggle("active", button.dataset.recommendationMode === recommendationMode));
|
|
}}
|
|
function applyFilter() {{
|
|
const query = search.value.trim().toLowerCase();
|
|
const type = typeFilter.value;
|
|
const genre = genreFilter.value.toLowerCase();
|
|
const sorted = [...document.querySelectorAll(".item")];
|
|
sorted.forEach((item) => {{
|
|
const typeMatch = type === "all" || item.dataset.type === type;
|
|
const genreMatch = genre === "all" || item.dataset.genres.split("|").includes(genre);
|
|
const searchMatch = !query || item.dataset.search.includes(query);
|
|
item.hidden = !(typeMatch && genreMatch && searchMatch);
|
|
}});
|
|
const visible = sorted.filter((item) => !item.hidden);
|
|
visible.sort((left, right) => {{
|
|
if (sortFilter.value === "year_desc") return Number(right.dataset.sortYear) - Number(left.dataset.sortYear) || left.dataset.sortTitle.localeCompare(right.dataset.sortTitle);
|
|
if (sortFilter.value === "year_asc") return Number(left.dataset.sortYear) - Number(right.dataset.sortYear) || left.dataset.sortTitle.localeCompare(right.dataset.sortTitle);
|
|
return left.dataset.sortTitle.localeCompare(right.dataset.sortTitle);
|
|
}});
|
|
visible.forEach((item) => itemsRoot.append(item));
|
|
resultCount.textContent = `${{visible.length}} result${{visible.length === 1 ? "" : "s"}}`;
|
|
}}
|
|
search.addEventListener("input", applyFilter);
|
|
typeFilter.addEventListener("change", applyFilter);
|
|
genreFilter.addEventListener("change", applyFilter);
|
|
sortFilter.addEventListener("change", applyFilter);
|
|
recommendationButtons.forEach((button) => button.addEventListener("click", () => {{
|
|
recommendationMode = button.dataset.recommendationMode;
|
|
renderRecommendations();
|
|
}}));
|
|
renderRecommendations();
|
|
applyFilter();
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
return body.encode("utf-8")
|
|
|
|
|
|
def publish_media_markdown_message(
|
|
token: str,
|
|
channel_id: str,
|
|
movies: list[MediaItem],
|
|
shows: list[MediaItem],
|
|
catalog_url: str = "",
|
|
) -> str:
|
|
markdown = render_media_catalog_markdown(movies, shows)
|
|
payload: dict[str, Any] = {
|
|
"content": (
|
|
"**The Mithral Archive Media Catalog**\n"
|
|
f"{len(movies)} movies · {len(shows)} shows\n"
|
|
"Attached as a compact Markdown list."
|
|
),
|
|
"allowed_mentions": {"parse": []},
|
|
}
|
|
if catalog_url:
|
|
payload["components"] = [
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 2,
|
|
"style": 5,
|
|
"label": "Open Catalog",
|
|
"url": catalog_url,
|
|
}
|
|
],
|
|
}
|
|
]
|
|
message = discord_multipart_request(
|
|
"POST",
|
|
token,
|
|
f"/channels/{channel_id}/messages",
|
|
payload,
|
|
[("files[0]", "media-catalog.md", "text/markdown; charset=utf-8", markdown.encode("utf-8"))],
|
|
)
|
|
message_id = str(message.get("id", "")).strip()
|
|
if not message_id:
|
|
raise RuntimeError("Discord did not return a message id for the media catalog")
|
|
return message_id
|
|
|
|
|
|
def publish_media_items(
|
|
runtime: BotRuntime,
|
|
channel_id: str,
|
|
movies_all: list[MediaItem],
|
|
shows_all: list[MediaItem],
|
|
movie_source: str = "Dashboard library",
|
|
show_source: str = "Dashboard library",
|
|
) -> dict[str, Any]:
|
|
del movie_source, show_source
|
|
if runtime.dry_run:
|
|
raise RuntimeError("Discord dry run is enabled; media catalog was parsed but not sent")
|
|
|
|
settings = channel_settings(runtime)
|
|
channel = validate_channel_id(channel_id.strip() or settings["mediaChannelId"], "Media")
|
|
|
|
if not movies_all and not shows_all:
|
|
raise ValueError("Add at least one movie or show before publishing")
|
|
|
|
state = load_state(runtime.media_state_path)
|
|
old_channel = str(state.get("channel_id", "")).strip()
|
|
existing_ids = [str(message_id) for message_id in state.get("message_ids", []) if str(message_id).strip()]
|
|
delete_channel = old_channel or channel
|
|
for old_id in existing_ids:
|
|
discord_delete_message(runtime.token, delete_channel, old_id)
|
|
|
|
message_id = publish_media_markdown_message(
|
|
runtime.token,
|
|
channel,
|
|
movies_all,
|
|
shows_all,
|
|
catalog_url=settings.get("catalogUrl", ""),
|
|
)
|
|
|
|
save_state(
|
|
runtime.media_state_path,
|
|
{
|
|
"channel_id": channel,
|
|
"message_ids": [message_id],
|
|
"movie_count": len(movies_all),
|
|
"show_count": len(shows_all),
|
|
"format": "markdown",
|
|
"published_at": datetime.now(timezone.utc).isoformat(),
|
|
},
|
|
)
|
|
save_media_channel_setting(runtime, channel)
|
|
|
|
return {
|
|
"channelId": channel,
|
|
"messageIds": [message_id],
|
|
"movieCount": len(movies_all),
|
|
"showCount": len(shows_all),
|
|
"displayedMovieCount": len(movies_all),
|
|
"displayedShowCount": len(shows_all),
|
|
"format": "markdown",
|
|
}
|
|
|
|
|
|
def media_catalog_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
state = load_state(runtime.media_state_path)
|
|
settings = channel_settings(runtime)
|
|
movies, shows = load_media_library(runtime)
|
|
return {
|
|
"channelId": str(state.get("channel_id", "")).strip() or settings["mediaChannelId"],
|
|
"channels": settings,
|
|
"messageIds": state.get("message_ids", []) if isinstance(state.get("message_ids"), list) else [],
|
|
"movieCount": state.get("movie_count"),
|
|
"showCount": state.get("show_count"),
|
|
"format": state.get("format", "markdown"),
|
|
"publishedAt": state.get("published_at"),
|
|
"changes": jellyfin_settings(runtime).get("lastChanges"),
|
|
"genres": available_media_genres(movies, shows),
|
|
"recommendations": {
|
|
"balanced": recommend_media_items(movies, shows, mode="balanced", limit=5),
|
|
"quick": recommend_media_items(movies, shows, mode="quick", limit=5),
|
|
"deep": recommend_media_items(movies, shows, mode="deep", limit=5),
|
|
},
|
|
"library": media_library_to_jsonable(movies, shows),
|
|
}
|