TheOrb/archive_bot/media.py

1520 lines
58 KiB
Python

from __future__ import annotations
import hashlib
import html
import json
import os
import re
import socket
import urllib.parse
import urllib.parse
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any
from .core import (
BotRuntime,
DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS,
MAX_DISCORD_EMBEDS,
MEDIA_ITEMS_PER_EMBED,
MediaItem,
clean_error,
)
from .discord_api import discord_delete_message, discord_multipart_request
from .storage import (
channel_settings,
jellyfin_settings,
load_state,
save_catalog_url_setting,
save_media_channel_setting,
save_state,
validate_channel_id,
)
TMDB_IMAGE_BASE_URL = "https://image.tmdb.org/t/p/w342"
TMDB_POSTER_CACHE: dict[str, str | None] = {}
def parse_int_text(value: str) -> int | None:
digits = "".join(character for character in value if character.isdigit())
if not digits:
return None
try:
return int(digits)
except ValueError:
return None
def parse_year_text(value: str) -> str | None:
match = re.search(r"(18|19|20|21)\d{2}", value)
return match.group(0) if match else None
def clean_media_text(value: str, limit: int = 220) -> str | None:
cleaned = " ".join(str(value).replace("\r", " ").replace("\n", " ").split())
if not cleaned:
return None
if len(cleaned) <= limit:
return cleaned
return cleaned[: limit - 1].rstrip() + ""
def format_runtime(value: str) -> str | None:
cleaned = clean_media_text(value, 48)
if not cleaned:
return None
if not cleaned.isdigit():
return cleaned
minutes = int(cleaned)
if minutes <= 0:
return None
hours, remainder = divmod(minutes, 60)
if hours and remainder:
return f"{hours}h {remainder}m"
if hours:
return f"{hours}h"
return f"{remainder}m"
def media_item_to_jsonable(item: MediaItem) -> dict[str, Any]:
data: dict[str, Any] = {
"title": item.title,
"mediaType": item.media_type,
"sourceId": item.source_id or "",
"tmdbId": item.tmdb_id or "",
"imdbId": item.imdb_id or "",
"year": item.year or "",
"genres": item.genres or "",
"rating": item.rating or "",
"runtime": item.runtime or "",
"summary": item.summary or "",
}
if item.media_type == "show":
data["seasons"] = item.seasons if item.seasons is not None else ""
data["episodes"] = item.episodes if item.episodes is not None else ""
return data
def media_item_from_data(data: Any, media_type: str) -> MediaItem:
if not isinstance(data, dict):
raise ValueError(f"{media_type.title()} entries must be objects")
title = clean_media_text(str(data.get("title", "")), 120)
if not title:
raise ValueError(f"{media_type.title()} entries must include a title")
year = parse_year_text(str(data.get("year", "")))
seasons = parse_int_text(str(data.get("seasons", ""))) if media_type == "show" else None
episodes = parse_int_text(str(data.get("episodes", ""))) if media_type == "show" else None
return MediaItem(
title=title,
media_type=media_type,
source_id=clean_media_text(str(data.get("sourceId", "")), 120),
tmdb_id=clean_media_text(str(data.get("tmdbId", "")), 120),
imdb_id=clean_media_text(str(data.get("imdbId", "")), 120),
year=year,
genres=clean_media_text(str(data.get("genres", "")), 120),
rating=clean_media_text(str(data.get("rating", "")), 32),
runtime=format_runtime(str(data.get("runtime", ""))) if media_type == "movie" else None,
summary=clean_media_text(str(data.get("summary", ""))),
seasons=seasons,
episodes=episodes,
)
def media_items_from_data(raw_items: Any, media_type: str) -> list[MediaItem]:
if raw_items is None:
return []
if not isinstance(raw_items, list):
raise ValueError(f"{media_type.title()} library must be a list")
deduped: dict[tuple[str, str], MediaItem] = {}
for raw_item in raw_items:
item = media_item_from_data(raw_item, media_type)
deduped[(item.title.casefold(), item.year or "")] = item
return sorted(deduped.values(), key=lambda item: (item.title.casefold(), item.year or ""))
def media_library_to_jsonable(movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]:
return {
"movies": [media_item_to_jsonable(item) for item in movies],
"shows": [media_item_to_jsonable(item) for item in shows],
}
def load_media_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]:
data = load_state(runtime.media_library_path)
return (
media_items_from_data(data.get("movies", []), "movie"),
media_items_from_data(data.get("shows", []), "show"),
)
def save_media_library(runtime: BotRuntime, movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]:
payload = media_library_to_jsonable(movies, shows)
payload["updated_at"] = datetime.now(timezone.utc).isoformat()
save_state(runtime.media_library_path, payload)
return payload
def reset_media_library(runtime: BotRuntime) -> dict[str, Any]:
save_media_library(runtime, [], [])
state = load_state(runtime.settings_path)
for key in (
"jellyfin_last_sync_at",
"jellyfin_last_sync_error",
"jellyfin_last_published_at",
"jellyfin_last_fingerprint",
"jellyfin_last_published_fingerprint",
"jellyfin_last_changes",
):
state.pop(key, None)
state["updated_at"] = datetime.now(timezone.utc).isoformat()
save_state(runtime.settings_path, state)
return {
"library": media_library_to_jsonable([], []),
"movieCount": 0,
"showCount": 0,
"changes": {"added": [], "removed": [], "addedCount": 0, "removedCount": 0},
"jellyfin": jellyfin_settings(runtime),
}
def jellyfin_runtime(runtime_ticks: Any) -> str | None:
try:
ticks = int(runtime_ticks or 0)
except (TypeError, ValueError):
return None
if ticks <= 0:
return None
minutes = round(ticks / 10_000_000 / 60)
if minutes <= 0:
return None
hours, remainder = divmod(minutes, 60)
if hours and remainder:
return f"{hours}h {remainder}m"
if hours:
return f"{hours}h"
return f"{remainder}m"
def jellyfin_item_year(item: dict[str, Any]) -> str | None:
production_year = item.get("ProductionYear")
if production_year:
return parse_year_text(str(production_year))
return parse_year_text(str(item.get("PremiereDate", "")))
def jellyfin_item_summary(item: dict[str, Any]) -> str | None:
return clean_media_text(str(item.get("Overview", "") or item.get("ShortOverview", "")))
def jellyfin_movie_from_item(item: dict[str, Any]) -> MediaItem:
provider_ids = item.get("ProviderIds") if isinstance(item.get("ProviderIds"), dict) else {}
return MediaItem(
title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Movie",
media_type="movie",
source_id=clean_media_text(str(item.get("Id", "")), 120),
tmdb_id=clean_media_text(str(provider_ids.get("Tmdb", "")), 120),
imdb_id=clean_media_text(str(provider_ids.get("Imdb", "")), 120),
year=jellyfin_item_year(item),
genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120),
rating=clean_media_text(str(item.get("OfficialRating", "")), 32),
runtime=jellyfin_runtime(item.get("RunTimeTicks")),
summary=jellyfin_item_summary(item),
)
def jellyfin_show_from_item(item: dict[str, Any]) -> MediaItem:
provider_ids = item.get("ProviderIds") if isinstance(item.get("ProviderIds"), dict) else {}
return MediaItem(
title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Show",
media_type="show",
source_id=clean_media_text(str(item.get("Id", "")), 120),
tmdb_id=clean_media_text(str(provider_ids.get("Tmdb", "")), 120),
imdb_id=clean_media_text(str(provider_ids.get("Imdb", "")), 120),
year=jellyfin_item_year(item),
genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120),
rating=clean_media_text(str(item.get("OfficialRating", "")), 32),
summary=jellyfin_item_summary(item),
seasons=parse_int_text(str(item.get("ChildCount", ""))),
episodes=parse_int_text(str(item.get("RecursiveItemCount", ""))),
)
def jellyfin_request(settings: dict[str, Any], path: str, params: dict[str, Any]) -> dict[str, Any]:
base_url = str(settings.get("url", "")).strip().rstrip("/")
api_key = str(settings.get("apiKey", "")).strip()
if not base_url or not api_key:
raise ValueError("Jellyfin URL and API key are required")
query = urllib.parse.urlencode({key: value for key, value in params.items() if value is not None})
url = f"{base_url}{path}"
if query:
url = f"{url}?{query}"
request = urllib.request.Request(
url,
headers={
"Accept": "application/json",
"User-Agent": "ArchiveStatusBot/1.0",
"X-Emby-Token": api_key,
},
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Jellyfin API failed: HTTP {exc.code} {detail}") from exc
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
raise RuntimeError(f"Jellyfin API failed: {clean_error(exc)}") from exc
def jellyfin_connection_settings(runtime: BotRuntime) -> dict[str, Any]:
state = load_state(runtime.settings_path)
return {
"url": str(state.get("jellyfin_url", "")).strip(),
"apiKey": str(state.get("jellyfin_api_key", "")).strip(),
}
def media_item_duration_seconds(item: MediaItem) -> int:
runtime = str(item.runtime or "").strip().lower()
if not runtime:
return 0
hours = 0
minutes = 0
hour_match = re.search(r"(\d+)\s*h", runtime)
minute_match = re.search(r"(\d+)\s*m", runtime)
if hour_match:
hours = int(hour_match.group(1))
if minute_match:
minutes = int(minute_match.group(1))
elif runtime.isdigit():
minutes = int(runtime)
return max(0, hours * 3600 + minutes * 60)
def jellyfin_stream_url(base_url: str, item_id: str, api_key: str) -> str:
query = urllib.parse.urlencode({"static": "true", "api_key": api_key})
return f"{base_url}/Videos/{urllib.parse.quote(item_id, safe='')}/stream?{query}"
def jellyfin_download_url(base_url: str, item_id: str, api_key: str) -> str:
query = urllib.parse.urlencode({"api_key": api_key})
return f"{base_url}/Items/{urllib.parse.quote(item_id, safe='')}/Download?{query}"
def jellyfin_first_episode(settings: dict[str, Any], series_id: str) -> dict[str, Any] | None:
data = jellyfin_request(
settings,
"/Items",
{
"ParentId": series_id,
"Recursive": "true",
"IncludeItemTypes": "Episode",
"Fields": "RunTimeTicks,ParentIndexNumber,IndexNumber",
"SortBy": "ParentIndexNumber,IndexNumber,SortName",
"SortOrder": "Ascending",
"Limit": 1,
},
)
items = data.get("Items", [])
if not isinstance(items, list):
raise RuntimeError("Jellyfin returned an invalid episode payload")
for item in items:
if isinstance(item, dict) and str(item.get("Id", "")).strip():
return item
return None
def resolve_jellyfin_playback_source(runtime: BotRuntime, item: MediaItem) -> dict[str, Any]:
settings = jellyfin_connection_settings(runtime)
base_url = str(settings.get("url", "")).strip().rstrip("/")
api_key = str(settings.get("apiKey", "")).strip()
if not base_url or not api_key:
raise ValueError("Jellyfin playback is not configured")
if not item.source_id:
raise ValueError(f"Media item has no Jellyfin source ID: {item.title}")
playback_item_id = item.source_id
playback_title = item.title
duration_seconds = media_item_duration_seconds(item)
episode_meta: dict[str, Any] | None = None
if item.media_type == "show":
episode = jellyfin_first_episode(settings, item.source_id)
if episode is None:
raise ValueError(f"No playable episodes found for show: {item.title}")
playback_item_id = clean_media_text(str(episode.get("Id", "")), 120) or item.source_id
season_number = int(episode.get("ParentIndexNumber") or 0)
episode_number = int(episode.get("IndexNumber") or 0)
episode_title = clean_media_text(str(episode.get("Name", "")), 120) or "Episode"
playback_title = f"{item.title} - S{season_number:02d}E{episode_number:02d} - {episode_title}"
ticks = episode.get("RunTimeTicks")
try:
duration_seconds = max(0, round(int(ticks or 0) / 10_000_000))
except (TypeError, ValueError):
duration_seconds = 0
episode_meta = {
"seasonNumber": season_number,
"episodeNumber": episode_number,
"episodeTitle": episode_title,
"seriesId": item.source_id,
}
return {
"itemId": playback_item_id,
"sourceId": item.source_id,
"title": playback_title,
"mediaType": item.media_type,
"streamUrl": jellyfin_stream_url(base_url, playback_item_id, api_key),
"downloadUrl": jellyfin_download_url(base_url, playback_item_id, api_key),
"durationSeconds": duration_seconds,
"posterUrl": media_item_card_payload(item).get("posterUrl", ""),
"year": item.year or "",
"tmdbId": item.tmdb_id or "",
"imdbId": item.imdb_id or "",
"episode": episode_meta,
}
def fetch_jellyfin_library_ids(settings: dict[str, Any], library_names: list[str]) -> list[str]:
if not library_names:
return []
requested = {name.casefold() for name in library_names}
data = jellyfin_request(settings, "/Library/MediaFolders", {})
items = data.get("Items", [])
if not isinstance(items, list):
raise RuntimeError("Jellyfin returned an invalid library folder payload")
matched: list[str] = []
available: list[str] = []
for item in items:
if not isinstance(item, dict):
continue
name = str(item.get("Name", "")).strip()
folder_id = str(item.get("Id", "")).strip()
if name:
available.append(name)
if name.casefold() in requested and folder_id:
matched.append(folder_id)
missing = sorted(set(library_names) - {name for name in available if name.casefold() in requested})
if missing:
raise ValueError(f"Jellyfin library not found: {', '.join(missing)}")
return matched
def fetch_jellyfin_items(settings: dict[str, Any], item_type: str) -> list[dict[str, Any]]:
parent_ids = settings.get("ParentIds")
if isinstance(parent_ids, list) and parent_ids:
all_items: list[dict[str, Any]] = []
for parent_id in parent_ids:
scoped_settings = dict(settings)
scoped_settings["ParentId"] = str(parent_id)
scoped_settings.pop("ParentIds", None)
all_items.extend(fetch_jellyfin_items(scoped_settings, item_type))
return all_items
parent_id_value = str(settings.get("ParentId", "")).strip() or None
items: list[dict[str, Any]] = []
start_index = 0
limit = 200
while True:
data = jellyfin_request(
settings,
"/Items",
{
"Recursive": "true",
"IncludeItemTypes": item_type,
"Fields": "Genres,Overview,OfficialRating,RecursiveItemCount,ChildCount,PremiereDate,RunTimeTicks,ProviderIds",
"SortBy": "SortName",
"SortOrder": "Ascending",
"EnableImages": "false",
"ParentId": parent_id_value,
"StartIndex": start_index,
"Limit": limit,
},
)
page = data.get("Items", [])
if not isinstance(page, list):
raise RuntimeError("Jellyfin returned an invalid Items payload")
items.extend(item for item in page if isinstance(item, dict))
total = int(data.get("TotalRecordCount", len(items)) or len(items))
if not page or len(items) >= total:
return items
start_index += limit
def jellyfin_item_dedupe_key(item: dict[str, Any], item_type: str) -> tuple[str, str]:
provider_ids = item.get("ProviderIds")
if isinstance(provider_ids, dict):
for provider in ("Tmdb", "Imdb", "Tvdb"):
value = str(provider_ids.get(provider, "")).strip().casefold()
if value:
return item_type.casefold(), f"{provider.casefold()}:{value}"
title = clean_media_text(str(item.get("Name", "")), 120) or ""
year = jellyfin_item_year(item) or ""
return item_type.casefold(), f"title:{title.casefold()}:{year}"
def dedupe_jellyfin_items(items: list[dict[str, Any]], item_type: str) -> list[dict[str, Any]]:
provider_deduped: dict[tuple[str, str], dict[str, Any]] = {}
for item in items:
key = jellyfin_item_dedupe_key(item, item_type)
current = provider_deduped.get(key)
if current is None:
provider_deduped[key] = item
continue
current_overview = str(current.get("Overview", "") or current.get("ShortOverview", ""))
next_overview = str(item.get("Overview", "") or item.get("ShortOverview", ""))
if len(next_overview) > len(current_overview):
provider_deduped[key] = item
title_deduped: dict[tuple[str, str], dict[str, Any]] = {}
for item in provider_deduped.values():
title = clean_media_text(str(item.get("Name", "")), 120) or ""
key = (title.casefold(), jellyfin_item_year(item) or "")
current = title_deduped.get(key)
if current is None:
title_deduped[key] = item
continue
current_has_provider = bool(current.get("ProviderIds"))
next_has_provider = bool(item.get("ProviderIds"))
if next_has_provider and not current_has_provider:
title_deduped[key] = item
continue
current_overview = str(current.get("Overview", "") or current.get("ShortOverview", ""))
next_overview = str(item.get("Overview", "") or item.get("ShortOverview", ""))
if len(next_overview) > len(current_overview):
title_deduped[key] = item
return sorted(
title_deduped.values(),
key=lambda item: (str(item.get("SortName", item.get("Name", ""))).casefold(), str(item.get("ProductionYear", ""))),
)
def fetch_jellyfin_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]:
state = load_state(runtime.settings_path)
settings = {
"url": str(state.get("jellyfin_url", "")).strip(),
"apiKey": str(state.get("jellyfin_api_key", "")).strip(),
}
library_names = jellyfin_settings(runtime).get("libraryNames", [])
if isinstance(library_names, list) and library_names:
settings["ParentIds"] = fetch_jellyfin_library_ids(settings, [str(name) for name in library_names])
movie_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Movie"), "Movie")
show_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Series"), "Series")
movies = [jellyfin_movie_from_item(item) for item in movie_items]
shows = [jellyfin_show_from_item(item) for item in show_items]
return (
sorted(movies, key=lambda item: (item.title.casefold(), item.year or "")),
sorted(shows, key=lambda item: (item.title.casefold(), item.year or "")),
)
def media_library_fingerprint(movies: list[MediaItem], shows: list[MediaItem]) -> str:
payload = media_library_to_jsonable(movies, shows)
body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
return hashlib.sha256(body).hexdigest()
def media_item_tracking_key(item: MediaItem) -> str:
return f"{item.media_type}:{item.title.casefold()}:{item.year or ''}"
def media_change_entry(item: MediaItem) -> dict[str, Any]:
return {
"title": item.title,
"year": item.year or "",
"mediaType": item.media_type,
}
def catalog_poster_path(item: MediaItem) -> str:
if not item.source_id:
return ""
return f"/catalog/poster/{urllib.parse.quote(item.source_id, safe='')}"
def tmdb_auth_headers() -> dict[str, str]:
bearer = os.getenv("TMDB_API_READ_TOKEN", "").strip()
if bearer:
return {
"Authorization": f"Bearer {bearer}",
"Accept": "application/json",
}
return {"Accept": "application/json"}
def tmdb_api_key() -> str:
return os.getenv("TMDB_API_KEY", "").strip()
def tmdb_enabled() -> bool:
return bool(os.getenv("TMDB_API_READ_TOKEN", "").strip() or tmdb_api_key())
def tmdb_request(path: str, params: dict[str, Any]) -> dict[str, Any]:
if not tmdb_enabled():
raise RuntimeError("TMDB credentials are not configured")
query_params = {key: value for key, value in params.items() if value not in {None, ""}}
api_key = tmdb_api_key()
if api_key:
query_params["api_key"] = api_key
query = urllib.parse.urlencode(query_params)
url = f"https://api.themoviedb.org/3{path}"
if query:
url = f"{url}?{query}"
request = urllib.request.Request(
url,
headers={
"User-Agent": "ArchiveStatusBot/1.0",
**tmdb_auth_headers(),
},
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=20) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"TMDB API failed: HTTP {exc.code} {detail}") from exc
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
raise RuntimeError(f"TMDB API failed: {clean_error(exc)}") from exc
def tmdb_image_url(file_path: str | None) -> str | None:
if not file_path:
return None
return f"{TMDB_IMAGE_BASE_URL}{file_path}"
def tmdb_cache_key(item: MediaItem) -> str:
return "|".join(
[
item.media_type,
item.source_id or "",
item.tmdb_id or "",
item.imdb_id or "",
item.title.casefold(),
item.year or "",
]
)
def tmdb_search_poster_path(item: MediaItem) -> str | None:
if item.tmdb_id:
endpoint = "/movie" if item.media_type == "movie" else "/tv"
data = tmdb_request(f"{endpoint}/{urllib.parse.quote(item.tmdb_id, safe='')}", {})
return data.get("poster_path")
if item.imdb_id:
data = tmdb_request(
f"/find/{urllib.parse.quote(item.imdb_id, safe='')}",
{"external_source": "imdb_id"},
)
results_key = "movie_results" if item.media_type == "movie" else "tv_results"
results = data.get(results_key, [])
if isinstance(results, list):
for result in results:
if isinstance(result, dict) and result.get("poster_path"):
return str(result["poster_path"])
search_params: dict[str, Any] = {"query": item.title}
year = media_item_year_number(item)
if item.media_type == "movie":
endpoint = "/search/movie"
if year is not None:
search_params["year"] = year
else:
endpoint = "/search/tv"
if year is not None:
search_params["first_air_date_year"] = year
data = tmdb_request(endpoint, search_params)
results = data.get("results", [])
if not isinstance(results, list):
return None
for result in results:
if isinstance(result, dict) and result.get("poster_path"):
return str(result["poster_path"])
return None
def tmdb_fallback_poster_url(item: MediaItem) -> str | None:
if not tmdb_enabled():
return None
cache_key = tmdb_cache_key(item)
if cache_key in TMDB_POSTER_CACHE:
return TMDB_POSTER_CACHE[cache_key]
try:
poster_path = tmdb_search_poster_path(item)
except Exception:
TMDB_POSTER_CACHE[cache_key] = None
return None
url = tmdb_image_url(poster_path)
TMDB_POSTER_CACHE[cache_key] = url
return url
def fetch_jellyfin_image(runtime: BotRuntime, item_id: str) -> tuple[bytes, str] | None:
if not item_id.strip():
return None
state = load_state(runtime.settings_path)
base_url = str(state.get("jellyfin_url", "")).strip().rstrip("/")
api_key = str(state.get("jellyfin_api_key", "")).strip()
if not base_url or not api_key:
return None
url = f"{base_url}/Items/{urllib.parse.quote(item_id, safe='')}/Images/Primary?maxHeight=720&maxWidth=480&quality=90"
request = urllib.request.Request(
url,
headers={
"User-Agent": "ArchiveStatusBot/1.0",
"X-Emby-Token": api_key,
"Accept": "image/*",
},
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
content_type = response.headers.get_content_type() or "image/jpeg"
return response.read(), content_type
except urllib.error.HTTPError as exc:
if exc.code == 404:
return None
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Jellyfin image fetch failed: HTTP {exc.code} {detail}") from exc
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
raise RuntimeError(f"Jellyfin image fetch failed: {clean_error(exc)}") from exc
def catalog_known_source_ids(runtime: BotRuntime) -> set[str]:
movies, shows = load_media_library(runtime)
return {
item.source_id.strip()
for item in [*movies, *shows]
if item.source_id and item.source_id.strip()
}
def catalog_item_for_source_id(runtime: BotRuntime, item_id: str) -> MediaItem | None:
movies, shows = load_media_library(runtime)
for item in [*movies, *shows]:
if item.source_id and item.source_id.strip() == item_id:
return item
return None
def media_library_changes(
previous_movies: list[MediaItem],
previous_shows: list[MediaItem],
current_movies: list[MediaItem],
current_shows: list[MediaItem],
) -> dict[str, Any]:
previous = {media_item_tracking_key(item): item for item in [*previous_movies, *previous_shows]}
current = {media_item_tracking_key(item): item for item in [*current_movies, *current_shows]}
added_keys = sorted(set(current) - set(previous), key=lambda key: (current[key].media_type, current[key].title.casefold()))
removed_keys = sorted(set(previous) - set(current), key=lambda key: (previous[key].media_type, previous[key].title.casefold()))
added = [media_change_entry(current[key]) for key in added_keys]
removed = [media_change_entry(previous[key]) for key in removed_keys]
return {
"added": added,
"removed": removed,
"addedCount": len(added),
"removedCount": len(removed),
}
def update_jellyfin_sync_state(
runtime: BotRuntime,
*,
fingerprint: str | None = None,
error: str | None = None,
published: bool = False,
changes: dict[str, Any] | None = None,
) -> None:
state = load_state(runtime.settings_path)
now = datetime.now(timezone.utc).isoformat()
state["jellyfin_last_sync_at"] = now
state["jellyfin_last_sync_error"] = error
if fingerprint is not None:
state["jellyfin_last_fingerprint"] = fingerprint
if published:
state["jellyfin_last_published_at"] = now
if fingerprint is not None:
state["jellyfin_last_published_fingerprint"] = fingerprint
if changes is not None:
state["jellyfin_last_changes"] = changes
save_state(runtime.settings_path, state)
def sync_jellyfin_library(runtime: BotRuntime, force_publish: bool = False) -> dict[str, Any]:
previous_movies, previous_shows = load_media_library(runtime)
movies, shows = fetch_jellyfin_library(runtime)
changes = media_library_changes(previous_movies, previous_shows, movies, shows)
fingerprint = media_library_fingerprint(movies, shows)
settings_state = load_state(runtime.settings_path)
changed = fingerprint != str(settings_state.get("jellyfin_last_fingerprint", ""))
publish_changed = fingerprint != str(settings_state.get("jellyfin_last_published_fingerprint", ""))
save_media_library(runtime, movies, shows)
published = False
result: dict[str, Any] = {}
if (publish_changed or force_publish) and not runtime.dry_run:
result = publish_media_items(
runtime=runtime,
channel_id=channel_settings(runtime)["mediaChannelId"],
movies_all=movies,
shows_all=shows,
)
published = True
update_jellyfin_sync_state(runtime, fingerprint=fingerprint, published=published, changes=changes)
return {
"changed": changed,
"published": published,
"changes": changes,
"movieCount": len(movies),
"showCount": len(shows),
"libraryNames": jellyfin_settings(runtime).get("libraryNames", []),
"library": media_library_to_jsonable(movies, shows),
"publishResult": result,
"jellyfin": jellyfin_settings(runtime),
}
def maybe_run_jellyfin_sync(runtime: BotRuntime) -> dict[str, Any] | None:
settings = jellyfin_settings(runtime)
if not settings["configured"] or not settings["autoSync"]:
return None
interval = int(os.getenv("JELLYFIN_SYNC_INTERVAL_SECONDS", str(DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS)))
state = load_state(runtime.settings_path)
last_sync = str(state.get("jellyfin_last_sync_at", "")).strip()
if last_sync:
try:
last = datetime.fromisoformat(last_sync)
if (datetime.now(timezone.utc) - last).total_seconds() < interval:
return None
except ValueError:
pass
try:
return sync_jellyfin_library(runtime)
except Exception as exc:
update_jellyfin_sync_state(runtime, error=str(exc)[:240])
raise
def media_item_heading(item: MediaItem) -> str:
if item.year:
return f"{item.title} ({item.year})"
return item.title
def media_item_genres(item: MediaItem) -> list[str]:
if not item.genres:
return []
return [genre.strip() for genre in item.genres.split(",") if genre.strip()]
def media_item_runtime_minutes(item: MediaItem) -> int | None:
if not item.runtime or item.media_type != "movie":
return None
cleaned = item.runtime.lower().replace(" ", "")
match = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?", cleaned)
if match and (match.group(1) or match.group(2)):
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
return (hours * 60) + minutes
if cleaned.isdigit():
return int(cleaned)
return None
def media_item_year_number(item: MediaItem) -> int | None:
if not item.year:
return None
try:
return int(item.year)
except ValueError:
return None
def media_item_search_blob(item: MediaItem) -> str:
fields = [
item.title,
item.year or "",
item.genres or "",
item.rating or "",
item.runtime or "",
item.summary or "",
]
return " ".join(fields).casefold()
def recommendation_presets() -> list[dict[str, str]]:
return [
{"id": "balanced", "label": "Balanced"},
{"id": "quick", "label": "Quick Watch"},
{"id": "deep", "label": "Deep Dive"},
{"id": "recent", "label": "Newer Picks"},
{"id": "comfort", "label": "Comfort Picks"},
]
def recommendation_reason(item: MediaItem, mode: str, genre: str) -> str:
reasons: list[str] = []
if genre and genre.lower() != "all":
reasons.append(f"matches {genre}")
if mode == "quick" and item.media_type == "movie" and item.runtime:
reasons.append(f"short runtime: {item.runtime}")
elif mode == "deep":
if item.media_type == "movie" and item.runtime:
reasons.append(f"long runtime: {item.runtime}")
elif item.media_type == "show" and item.seasons:
reasons.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
elif mode == "recent" and item.year:
reasons.append(f"newer release: {item.year}")
elif mode == "comfort":
if item.genres:
reasons.append(item.genres)
if item.media_type == "show" and item.episodes:
reasons.append(f"{item.episodes} episodes")
else:
if item.genres:
reasons.append(item.genres)
if item.media_type == "movie" and item.runtime:
reasons.append(item.runtime)
if item.media_type == "show" and item.episodes:
reasons.append(f"{item.episodes} episodes")
return " · ".join(reasons[:2]) or "from your library"
def recommendation_score(item: MediaItem, mode: str, genre: str, search: str) -> float:
score = 0.0
genres = {value.casefold() for value in media_item_genres(item)}
runtime_minutes = media_item_runtime_minutes(item)
year = media_item_year_number(item)
blob = media_item_search_blob(item)
if genre and genre.casefold() != "all":
score += 40 if genre.casefold() in genres else -20
if search:
score += 25 if search.casefold() in blob else -30
if mode == "quick":
if item.media_type == "movie":
if runtime_minutes is not None:
score += max(0, 180 - runtime_minutes) / 2
else:
score += 12
if item.episodes:
score += max(0, 48 - item.episodes) / 3
elif mode == "deep":
if item.media_type == "movie":
if runtime_minutes is not None:
score += min(runtime_minutes, 240) / 2
else:
if item.seasons:
score += min(item.seasons * 10, 50)
if item.episodes:
score += min(item.episodes / 3, 30)
elif mode == "recent":
if year is not None:
score += max(0, year - 1990)
elif mode == "comfort":
if item.media_type == "show":
score += 24
if item.episodes:
score += min(item.episodes / 4, 25)
if genres:
score += 6
else:
if genres:
score += 8
if item.summary:
score += 4
if item.media_type == "movie" and runtime_minutes is not None:
score += max(0, 150 - abs(115 - runtime_minutes)) / 10
if item.media_type == "show" and item.episodes:
score += min(item.episodes / 6, 16)
score += (sum(ord(character) for character in item.title.casefold()) % 17) / 10
return score
def available_media_genres(movies: list[MediaItem], shows: list[MediaItem]) -> list[str]:
genres: set[str] = set()
for item in [*movies, *shows]:
genres.update(media_item_genres(item))
return sorted(genres, key=str.casefold)
def media_item_card_payload(item: MediaItem, reason: str = "") -> dict[str, Any]:
return {
"title": item.title,
"mediaType": item.media_type,
"sourceId": item.source_id or "",
"tmdbId": item.tmdb_id or "",
"imdbId": item.imdb_id or "",
"posterUrl": catalog_poster_path(item),
"year": item.year or "",
"genres": item.genres or "",
"rating": item.rating or "",
"runtime": item.runtime or "",
"summary": item.summary or "",
"seasons": item.seasons or 0,
"episodes": item.episodes or 0,
"reason": reason,
}
def recommend_media_items(
movies: list[MediaItem],
shows: list[MediaItem],
*,
media_type: str = "all",
genre: str = "all",
mode: str = "balanced",
search: str = "",
limit: int = 6,
) -> dict[str, Any]:
pools = {
"movie": movies,
"show": shows,
"all": [*movies, *shows],
}
selected_pool = pools.get(media_type, pools["all"])
if not selected_pool:
return {"items": [], "summary": "No media available for recommendations."}
ranked = sorted(
selected_pool,
key=lambda item: (
recommendation_score(item, mode, genre, search),
media_item_year_number(item) or 0,
item.title.casefold(),
),
reverse=True,
)
items = [
media_item_card_payload(item, recommendation_reason(item, mode, genre))
for item in ranked[: max(1, min(limit, 12))]
]
media_label = {"all": "movies and shows", "movie": "movies", "show": "shows"}.get(media_type, "media")
if search:
summary = f"{mode.title()} picks in {media_label} for “{search}”."
elif genre and genre.casefold() != "all":
summary = f"{mode.title()} picks in {media_label} for {genre}."
else:
summary = f"{mode.title()} picks from your {media_label}."
return {"items": items, "summary": summary}
def media_item_value(item: MediaItem) -> str:
details: list[str] = []
if item.genres:
details.append(item.genres)
if item.rating:
details.append(item.rating)
if item.runtime and item.media_type == "movie":
details.append(item.runtime)
if item.media_type == "show":
counts = []
if item.seasons:
counts.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
if item.episodes:
counts.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
if counts:
details.append(" · ".join(counts))
lines = []
if details:
lines.append("".join(details))
if item.summary:
lines.append(item.summary)
return "\n".join(lines)[:1024] or "No details provided."
def media_category_embeds(
title: str,
items: list[MediaItem],
color: int,
total_count: int,
source_name: str,
) -> list[dict[str, Any]]:
embeds: list[dict[str, Any]] = []
omitted = max(total_count - len(items), 0)
page_count = max((len(items) + MEDIA_ITEMS_PER_EMBED - 1) // MEDIA_ITEMS_PER_EMBED, 1)
for page_index in range(page_count):
start = page_index * MEDIA_ITEMS_PER_EMBED
chunk = items[start : start + MEDIA_ITEMS_PER_EMBED]
description = f"{total_count} total from `{source_name}`"
if omitted:
description += f" · showing first {len(items)}"
if page_count > 1:
description += f" · page {page_index + 1}/{page_count}"
fields = [
{
"name": media_item_heading(item)[:256],
"value": media_item_value(item),
"inline": False,
}
for item in chunk
]
embeds.append(
{
"title": title,
"description": description,
"color": color,
"fields": fields,
}
)
return embeds
def render_media_catalog_payloads(
movies: list[MediaItem],
shows: list[MediaItem],
movie_total: int,
show_total: int,
movie_source: str,
show_source: str,
) -> list[dict[str, Any]]:
now = datetime.now(timezone.utc)
embeds: list[dict[str, Any]] = [
{
"title": "The Mithral Archive Media Catalog",
"description": "Current movies and shows available in the archive.",
"color": 0x5865F2,
"fields": [
{"name": "Movies", "value": str(movie_total), "inline": True},
{"name": "Shows", "value": str(show_total), "inline": True},
{"name": "Updated", "value": now.strftime("%Y-%m-%d %H:%M UTC"), "inline": True},
],
"timestamp": now.isoformat(),
}
]
if movies:
embeds.extend(media_category_embeds("Movies", movies, 0xF59E0B, movie_total, movie_source))
if shows:
embeds.extend(media_category_embeds("Shows", shows, 0x10B981, show_total, show_source))
payloads = []
for index in range(0, len(embeds), MAX_DISCORD_EMBEDS):
payloads.append(
{
"content": "**The Mithral Archive Media Catalog**" if index == 0 else "",
"embeds": embeds[index : index + MAX_DISCORD_EMBEDS],
}
)
return payloads
def media_markdown_line(item: MediaItem) -> str:
title = media_item_heading(item)
details: list[str] = []
if item.genres:
details.append(item.genres)
if item.rating:
details.append(item.rating)
if item.runtime and item.media_type == "movie":
details.append(item.runtime)
if item.media_type == "show":
if item.seasons:
details.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
if item.episodes:
details.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
suffix = f" - {'; '.join(details)}" if details else ""
return f"- **{title}**{suffix}"
def render_media_catalog_markdown(movies: list[MediaItem], shows: list[MediaItem]) -> str:
now = datetime.now(timezone.utc)
lines = [
"# The Mithral Archive Media Catalog",
"",
f"Updated: {now.strftime('%Y-%m-%d %H:%M UTC')}",
f"Movies: {len(movies)}",
f"Shows: {len(shows)}",
"",
]
if movies:
lines.extend(["## Movies", ""])
for item in movies:
lines.append(media_markdown_line(item))
if item.summary:
lines.append(f" - {item.summary}")
lines.append("")
if shows:
lines.extend(["## Shows", ""])
for item in shows:
lines.append(media_markdown_line(item))
if item.summary:
lines.append(f" - {item.summary}")
lines.append("")
return "\n".join(lines).strip() + "\n"
def render_catalog_html(runtime: BotRuntime) -> bytes:
movies, shows = load_media_library(runtime)
genres = available_media_genres(movies, shows)
default_recommendations = {
"balanced": recommend_media_items(movies, shows, mode="balanced", limit=6),
"quick": recommend_media_items(movies, shows, mode="quick", limit=6),
"deep": recommend_media_items(movies, shows, mode="deep", limit=6),
}
updated_at = load_state(runtime.media_library_path).get("updated_at")
updated = "Never"
if updated_at:
try:
updated = datetime.fromisoformat(str(updated_at)).strftime("%Y-%m-%d %H:%M UTC")
except ValueError:
updated = str(updated_at)
def item_block(item: MediaItem) -> str:
meta = []
if item.year:
meta.append(item.year)
if item.genres:
meta.append(item.genres)
if item.rating:
meta.append(item.rating)
if item.runtime:
meta.append(item.runtime)
if item.media_type == "show":
if item.seasons:
meta.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
if item.episodes:
meta.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
summary = f"<p>{html.escape(item.summary)}</p>" if item.summary else ""
poster = (
f'<div class="poster"><img src="{html.escape(catalog_poster_path(item))}" alt="{html.escape(item.title)} poster" loading="lazy"></div>'
if item.source_id
else '<div class="poster poster-empty">No poster</div>'
)
return (
f'<article class="item" data-type="{html.escape(item.media_type)}" '
f'data-year="{html.escape(item.year or "")}" '
f'data-genres="{html.escape("|".join(media_item_genres(item)).casefold())}" '
f'data-sort-title="{html.escape(item.title.casefold())}" '
f'data-sort-year="{html.escape(str(media_item_year_number(item) or 0))}" '
f'data-search="{html.escape(media_item_search_blob(item))}">'
f"{poster}"
f'<div class="item-body"><h2>{html.escape(item.title)}</h2>'
f'<div class="meta">{html.escape(" · ".join(meta))}</div>'
f"{summary}</div></article>"
)
items = "\n".join(item_block(item) for item in [*movies, *shows])
genre_options = "\n".join(
f'<option value="{html.escape(genre)}">{html.escape(genre)}</option>' for genre in genres
)
recommendations_json = json.dumps(default_recommendations)
body = f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>The Mithral Archive Media Catalog</title>
<style>
:root {{ color-scheme: dark; --bg: #111214; --panel: #1b1d21; --panel-alt: #15171a; --line: #30343b; --text: #f1f2f4; --muted: #a2a8b3; --action: #e7e9ed; --action-text: #15171a; }}
* {{ box-sizing: border-box; }}
body {{ margin: 0; background: var(--bg); color: var(--text); font: 14px/1.45 ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", sans-serif; }}
header {{ border-bottom: 1px solid var(--line); background: var(--panel-alt); padding: 18px 22px; }}
h1 {{ margin: 0; font-size: 20px; letter-spacing: 0; }}
h2 {{ margin: 0; font-size: 15px; }}
.header-grid {{ display: grid; gap: 16px; }}
.stats {{ display: flex; flex-wrap: wrap; gap: 16px; color: var(--muted); }}
.stat strong {{ display: block; color: var(--text); font-size: 16px; }}
.toolbar {{ display: grid; grid-template-columns: minmax(220px, 1.2fr) repeat(3, minmax(120px, 0.5fr)); gap: 8px; }}
.layout {{ display: grid; grid-template-columns: 320px minmax(0, 1fr); min-height: calc(100vh - 96px); }}
aside {{ border-right: 1px solid var(--line); padding: 18px; background: var(--panel-alt); }}
.sidebar-section + .sidebar-section {{ margin-top: 18px; }}
.sidebar-label {{ margin-bottom: 8px; color: var(--muted); font-size: 12px; font-weight: 650; }}
main {{ min-width: 0; padding: 18px; display: grid; gap: 18px; }}
.recommendations {{ display: grid; gap: 12px; }}
.recommendation-actions {{ display: flex; flex-wrap: wrap; gap: 8px; }}
.recommendation-grid {{ display: grid; gap: 1px; background: var(--line); }}
.recommendation-card {{ display: grid; grid-template-columns: 64px minmax(0, 1fr); gap: 12px; background: var(--panel); padding: 12px 14px; }}
.recommendation-card strong {{ display: block; margin-bottom: 4px; font-size: 14px; }}
.recommendation-poster {{ width: 64px; aspect-ratio: 2 / 3; border: 1px solid var(--line); background: #14161a; overflow: hidden; display: flex; align-items: center; justify-content: center; color: var(--muted); font-size: 11px; }}
.recommendation-poster img {{ width: 100%; height: 100%; object-fit: cover; display: block; }}
.recommendation-card .reason {{ color: var(--muted); margin-top: 6px; }}
input, button, select {{ border: 1px solid var(--line); border-radius: 6px; background: #14161a; color: var(--text); padding: 8px 9px; font: inherit; }}
button.active {{ background: var(--action); color: var(--action-text); border-color: var(--action); }}
.button-row {{ display: flex; flex-wrap: wrap; gap: 8px; }}
.results-note {{ color: var(--muted); }}
.grid {{ display: grid; gap: 1px; background: var(--line); }}
.item {{ background: var(--panel); padding: 13px 14px; display: grid; grid-template-columns: 120px minmax(0, 1fr); gap: 14px; }}
.item[hidden] {{ display: none; }}
.poster {{ width: 120px; aspect-ratio: 2 / 3; border: 1px solid var(--line); background: #14161a; overflow: hidden; display: flex; align-items: center; justify-content: center; color: var(--muted); font-size: 12px; }}
.poster img {{ width: 100%; height: 100%; object-fit: cover; display: block; }}
.poster-empty {{ padding: 12px; text-align: center; }}
.item-body {{ min-width: 0; }}
.meta {{ color: var(--muted); }}
p {{ margin: 8px 0 0; color: #d5d8de; }}
@media (max-width: 980px) {{ .layout {{ grid-template-columns: 1fr; }} aside {{ border-right: 0; border-bottom: 1px solid var(--line); }} }}
@media (max-width: 720px) {{ .toolbar {{ grid-template-columns: 1fr 1fr; }} .toolbar input {{ grid-column: 1 / -1; }} .item {{ grid-template-columns: 1fr; }} .poster {{ width: 96px; }} }}
</style>
</head>
<body>
<header>
<div class="header-grid">
<h1>The Mithral Archive Media Catalog</h1>
<div class="stats">
<div class="stat"><strong>{len(movies)}</strong>Movies</div>
<div class="stat"><strong>{len(shows)}</strong>Shows</div>
<div class="stat"><strong>{html.escape(updated)}</strong>Updated</div>
</div>
<div class="toolbar">
<input id="search" type="search" placeholder="Search title, genre, year, or summary" autocomplete="off">
<select id="typeFilter">
<option value="all">All media</option>
<option value="movie">Movies</option>
<option value="show">Shows</option>
</select>
<select id="genreFilter">
<option value="all">All genres</option>
{genre_options}
</select>
<select id="sortFilter">
<option value="title">Title</option>
<option value="year_desc">Newest year</option>
<option value="year_asc">Oldest year</option>
</select>
</div>
</div>
</header>
<div class="layout">
<aside>
<div class="sidebar-section">
<div class="sidebar-label">Recommendation Mode</div>
<div class="button-row">
<button class="active" type="button" data-recommendation-mode="balanced">Balanced</button>
<button type="button" data-recommendation-mode="quick">Quick</button>
<button type="button" data-recommendation-mode="deep">Deep</button>
</div>
</div>
<div class="sidebar-section recommendations">
<div class="sidebar-label">Recommended From Your Library</div>
<div id="recommendationSummary" class="results-note"></div>
<div id="recommendations" class="recommendation-grid"></div>
</div>
</aside>
<main>
<div id="resultCount" class="results-note"></div>
<div id="items" class="grid">{items}</div>
</main>
</div>
<script>
const search = document.querySelector("#search");
const typeFilter = document.querySelector("#typeFilter");
const genreFilter = document.querySelector("#genreFilter");
const sortFilter = document.querySelector("#sortFilter");
const recommendationButtons = document.querySelectorAll("[data-recommendation-mode]");
const recommendations = {recommendations_json};
let recommendationMode = "balanced";
const itemsRoot = document.querySelector("#items");
const recommendationRoot = document.querySelector("#recommendations");
const recommendationSummary = document.querySelector("#recommendationSummary");
const resultCount = document.querySelector("#resultCount");
function renderRecommendations() {{
const payload = recommendations[recommendationMode] || recommendations.balanced || {{ items: [], summary: "" }};
recommendationSummary.textContent = payload.summary || "";
recommendationRoot.innerHTML = "";
payload.items.forEach((item) => {{
const node = document.createElement("article");
node.className = "recommendation-card";
const counts = item.mediaType === "show"
? [item.seasons ? `${{item.seasons}} season${{item.seasons === 1 ? "" : "s"}}` : "", item.episodes ? `${{item.episodes}} episodes` : ""].filter(Boolean).join(" · ")
: item.runtime;
const meta = [item.year, item.genres, item.rating, counts].filter(Boolean).join(" · ");
const poster = item.posterUrl
? `<div class="recommendation-poster"><img src="${{item.posterUrl}}" alt="${{item.title}} poster" loading="lazy"></div>`
: `<div class="recommendation-poster">No poster</div>`;
node.innerHTML = `${{poster}}<div><strong>${{item.title}}</strong><div class="meta">${{meta}}</div>${{item.summary ? `<p>${{item.summary}}</p>` : ""}}<div class="reason">${{item.reason || ""}}</div></div>`;
recommendationRoot.append(node);
}});
recommendationButtons.forEach((button) => button.classList.toggle("active", button.dataset.recommendationMode === recommendationMode));
}}
function applyFilter() {{
const query = search.value.trim().toLowerCase();
const type = typeFilter.value;
const genre = genreFilter.value.toLowerCase();
const sorted = [...document.querySelectorAll(".item")];
sorted.forEach((item) => {{
const typeMatch = type === "all" || item.dataset.type === type;
const genreMatch = genre === "all" || item.dataset.genres.split("|").includes(genre);
const searchMatch = !query || item.dataset.search.includes(query);
item.hidden = !(typeMatch && genreMatch && searchMatch);
}});
const visible = sorted.filter((item) => !item.hidden);
visible.sort((left, right) => {{
if (sortFilter.value === "year_desc") return Number(right.dataset.sortYear) - Number(left.dataset.sortYear) || left.dataset.sortTitle.localeCompare(right.dataset.sortTitle);
if (sortFilter.value === "year_asc") return Number(left.dataset.sortYear) - Number(right.dataset.sortYear) || left.dataset.sortTitle.localeCompare(right.dataset.sortTitle);
return left.dataset.sortTitle.localeCompare(right.dataset.sortTitle);
}});
visible.forEach((item) => itemsRoot.append(item));
resultCount.textContent = `${{visible.length}} result${{visible.length === 1 ? "" : "s"}}`;
}}
search.addEventListener("input", applyFilter);
typeFilter.addEventListener("change", applyFilter);
genreFilter.addEventListener("change", applyFilter);
sortFilter.addEventListener("change", applyFilter);
recommendationButtons.forEach((button) => button.addEventListener("click", () => {{
recommendationMode = button.dataset.recommendationMode;
renderRecommendations();
}}));
renderRecommendations();
applyFilter();
</script>
</body>
</html>
"""
return body.encode("utf-8")
def publish_media_markdown_message(
token: str,
channel_id: str,
movies: list[MediaItem],
shows: list[MediaItem],
catalog_url: str = "",
) -> str:
markdown = render_media_catalog_markdown(movies, shows)
payload: dict[str, Any] = {
"content": (
"**The Mithral Archive Media Catalog**\n"
f"{len(movies)} movies · {len(shows)} shows\n"
"Attached as a compact Markdown list."
),
"allowed_mentions": {"parse": []},
}
if catalog_url:
payload["components"] = [
{
"type": 1,
"components": [
{
"type": 2,
"style": 5,
"label": "Open Catalog",
"url": catalog_url,
}
],
}
]
message = discord_multipart_request(
"POST",
token,
f"/channels/{channel_id}/messages",
payload,
[("files[0]", "media-catalog.md", "text/markdown; charset=utf-8", markdown.encode("utf-8"))],
)
message_id = str(message.get("id", "")).strip()
if not message_id:
raise RuntimeError("Discord did not return a message id for the media catalog")
return message_id
def publish_media_items(
runtime: BotRuntime,
channel_id: str,
movies_all: list[MediaItem],
shows_all: list[MediaItem],
movie_source: str = "Dashboard library",
show_source: str = "Dashboard library",
) -> dict[str, Any]:
del movie_source, show_source
if runtime.dry_run:
raise RuntimeError("Discord dry run is enabled; media catalog was parsed but not sent")
settings = channel_settings(runtime)
channel = validate_channel_id(channel_id.strip() or settings["mediaChannelId"], "Media")
if not movies_all and not shows_all:
raise ValueError("Add at least one movie or show before publishing")
state = load_state(runtime.media_state_path)
old_channel = str(state.get("channel_id", "")).strip()
existing_ids = [str(message_id) for message_id in state.get("message_ids", []) if str(message_id).strip()]
delete_channel = old_channel or channel
for old_id in existing_ids:
discord_delete_message(runtime.token, delete_channel, old_id)
message_id = publish_media_markdown_message(
runtime.token,
channel,
movies_all,
shows_all,
catalog_url=settings.get("catalogUrl", ""),
)
save_state(
runtime.media_state_path,
{
"channel_id": channel,
"message_ids": [message_id],
"movie_count": len(movies_all),
"show_count": len(shows_all),
"format": "markdown",
"published_at": datetime.now(timezone.utc).isoformat(),
},
)
save_media_channel_setting(runtime, channel)
return {
"channelId": channel,
"messageIds": [message_id],
"movieCount": len(movies_all),
"showCount": len(shows_all),
"displayedMovieCount": len(movies_all),
"displayedShowCount": len(shows_all),
"format": "markdown",
}
def media_catalog_status(runtime: BotRuntime) -> dict[str, Any]:
state = load_state(runtime.media_state_path)
settings = channel_settings(runtime)
movies, shows = load_media_library(runtime)
return {
"channelId": str(state.get("channel_id", "")).strip() or settings["mediaChannelId"],
"channels": settings,
"messageIds": state.get("message_ids", []) if isinstance(state.get("message_ids"), list) else [],
"movieCount": state.get("movie_count"),
"showCount": state.get("show_count"),
"format": state.get("format", "markdown"),
"publishedAt": state.get("published_at"),
"changes": jellyfin_settings(runtime).get("lastChanges"),
"genres": available_media_genres(movies, shows),
"recommendations": {
"balanced": recommend_media_items(movies, shows, mode="balanced", limit=5),
"quick": recommend_media_items(movies, shows, mode="quick", limit=5),
"deep": recommend_media_items(movies, shows, mode="deep", limit=5),
},
"library": media_library_to_jsonable(movies, shows),
}