#!/usr/bin/env python3 """Live Discord status message for The Mithral Archive.""" from __future__ import annotations import base64 import asyncio import csv import hashlib import hmac import html import io import json import os import re import secrets import signal import socket import ssl import sys import threading import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from http.cookies import SimpleCookie from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any DISCORD_API = "https://discord.com/api/v10" DEFAULT_INTERVAL_SECONDS = 60 DEFAULT_TIMEOUT_SECONDS = 10 MAX_DISCORD_EMBEDS = 10 MAX_REQUEST_BYTES = 8_000_000 SESSION_COOKIE = "archive_bot_session" PBKDF2_ITERATIONS = 390_000 MEDIA_ITEMS_PER_EMBED = 10 DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS = 900 @dataclass(frozen=True) class Service: name: str group: str url: str display_url: str method: str timeout: float expected_statuses: set[int] expected_min: int expected_max: int keyword: str | None @dataclass(frozen=True) class CheckResult: service: Service ok: bool status: int | None latency_ms: int | None error: str | None @dataclass(frozen=True) class MediaItem: title: str media_type: str year: str | None = None genres: str | None = None rating: str | None = None runtime: str | None = None summary: str | None = None seasons: int | None = None episodes: int | None = None @dataclass(frozen=True) class DashboardAuthConfig: username: str password_hash: str session_ttl_seconds: int cookie_secure: bool @dataclass class DashboardSession: username: str csrf_token: str expires_at: float class DashboardAuth: def __init__(self, config: DashboardAuthConfig) -> None: self.config = config self.lock = threading.Lock() self.sessions: dict[str, DashboardSession] = {} self.failed_logins: dict[str, list[float]] = {} def login_allowed(self, key: str) -> bool: now = time.time() window_start = now - 900 with self.lock: attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start] self.failed_logins[key] = attempts return len(attempts) < 10 def record_failed_login(self, key: str) -> None: now = time.time() with self.lock: self.failed_logins.setdefault(key, []).append(now) def clear_failed_login(self, key: str) -> None: with self.lock: self.failed_logins.pop(key, None) def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None: if not hmac.compare_digest(username, self.config.username): return None if not verify_password_hash(self.config.password_hash, password): return None session_id = secrets.token_urlsafe(32) session = DashboardSession( username=username, csrf_token=secrets.token_urlsafe(32), expires_at=time.time() + self.config.session_ttl_seconds, ) with self.lock: self.sessions[session_id] = session return session_id, session def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None: if not cookie_header: return None cookie = SimpleCookie() cookie.load(cookie_header) morsel = cookie.get(SESSION_COOKIE) if morsel is None: return None session_id = morsel.value now = time.time() with self.lock: session = self.sessions.get(session_id) if session is None: return None if session.expires_at <= now: self.sessions.pop(session_id, None) return None session.expires_at = now + self.config.session_ttl_seconds return session_id, session def logout(self, session_id: str) -> None: with self.lock: self.sessions.pop(session_id, None) class BotRuntime: def __init__( self, token: str, channel_id: str, config_path: Path, state_path: Path, media_state_path: Path, media_library_path: Path, settings_path: Path, dry_run: bool = False, ) -> None: self.token = token self.default_channel_id = channel_id self.config_path = config_path self.state_path = state_path self.media_state_path = media_state_path self.media_library_path = media_library_path self.settings_path = settings_path self.dry_run = dry_run self.lock = threading.Lock() self.last_results: list[CheckResult] = [] self.last_error: str | None = None self.last_message_id: str | None = None self.last_checked_at: datetime | None = None class DiscordGatewayManager: def __init__(self, token: str) -> None: self.token = token self.thread: threading.Thread | None = None self.loop: asyncio.AbstractEventLoop | None = None self.client: Any = None self.ready = threading.Event() self._disconnecting = threading.Event() def start(self) -> None: if self.thread is not None: return self.thread = threading.Thread(target=self._run, name="discord-gateway", daemon=True) self.thread.start() def stop(self) -> None: self._disconnecting.set() if self.loop is not None and self.client is not None: asyncio.run_coroutine_threadsafe(self.client.close(), self.loop) if self.thread is not None: self.thread.join(timeout=15) def _run(self) -> None: try: import discord except ImportError: print("discord.py is not installed. Install requirements.txt to keep the bot online.", file=sys.stderr, flush=True) self.ready.set() return class GatewayClient(discord.Client): def __init__(self, manager: DiscordGatewayManager) -> None: intents = discord.Intents.default() intents.guilds = True super().__init__(intents=intents) self.manager = manager async def on_ready(self) -> None: await self.change_presence(status=discord.Status.online) user = self.user name = user.name if user is not None else "unknown" bot_id = user.id if user is not None else "unknown" print(f"Discord gateway connected as {name} ({bot_id})", flush=True) self.manager.ready.set() async def on_disconnect(self) -> None: self.manager.ready.clear() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.client = GatewayClient(self) try: self.loop.run_until_complete(self.client.start(self.token)) except Exception as exc: if not self._disconnecting.is_set(): print(f"Discord gateway stopped: {exc}", file=sys.stderr, flush=True) finally: self.ready.set() try: pending = asyncio.all_tasks(self.loop) for task in pending: task.cancel() if pending: self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) finally: self.loop.close() def env(name: str, default: str | None = None) -> str: value = os.getenv(name, default) if value is None or not value.strip(): raise SystemExit(f"Missing required environment variable: {name}") return value.strip() def normalize_discord_token(token: str) -> str: cleaned = token.strip().strip("\"'") if cleaned.lower().startswith("bot "): cleaned = cleaned[4:].strip() return cleaned def load_dotenv(path: Path = Path(".env")) -> None: if not path.exists(): return for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() value = value.strip().strip("\"'") if key and key not in os.environ: os.environ[key] = value def load_json(path: Path) -> dict[str, Any]: try: with path.open("r", encoding="utf-8") as handle: data = json.load(handle) except FileNotFoundError as exc: raise ValueError(f"Config file not found: {path}") from exc except PermissionError as exc: raise ValueError(f"Config file is not readable: {path}") from exc except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in {path}: {exc}") from exc if not isinstance(data, dict): raise ValueError(f"Config must be a JSON object: {path}") return data def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]: if raw is None: return set(), 200, 399 if isinstance(raw, str): raw = [part.strip() for part in raw.split(",") if part.strip()] if not isinstance(raw, list): raise ValueError("expectedStatuses must be a list or comma-separated string") exact: set[int] = set() min_status = 999 max_status = 0 for item in raw: if isinstance(item, int): exact.add(item) continue if not isinstance(item, str): raise ValueError("expectedStatuses entries must be integers or ranges") if "-" in item: left, right = item.split("-", 1) try: min_status = min(min_status, int(left)) max_status = max(max_status, int(right)) except ValueError as exc: raise ValueError(f"Invalid expected status range: {item}") from exc continue try: exact.add(int(item)) except ValueError as exc: raise ValueError(f"Invalid expected status value: {item}") from exc if min_status == 999 and max_status == 0: min_status, max_status = 0, -1 return exact, min_status, max_status def services_from_data(data: dict[str, Any]) -> list[Service]: raw_services = data.get("services") if not isinstance(raw_services, list) or not raw_services: raise ValueError("Config must include a non-empty services array") services: list[Service] = [] for index, item in enumerate(raw_services, start=1): if not isinstance(item, dict): raise ValueError(f"Service #{index} must be an object") name = str(item.get("name", "")).strip() url = str(item.get("url", "")).strip() if not name or not url: raise ValueError(f"Service #{index} must include name and url") parsed = urllib.parse.urlparse(url) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError(f"Service {name} has an invalid http(s) URL") exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses")) services.append( Service( name=name, group=str(item.get("group", "Main Services")).strip() or "Main Services", url=url, display_url=str(item.get("displayUrl", url)).strip() or url, method=str(item.get("method", "GET")).strip().upper(), timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)), expected_statuses=exact, expected_min=minimum, expected_max=maximum, keyword=(str(item["keyword"]).strip() if item.get("keyword") else None), ) ) return services def load_services(path: Path) -> list[Service]: return services_from_data(load_json(path)) def save_services_config(path: Path, data: dict[str, Any]) -> None: services_from_data(data) path.parent.mkdir(parents=True, exist_ok=True) temporary = path.with_suffix(f"{path.suffix}.tmp") with temporary.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") try: temporary.replace(path) except OSError: with path.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") temporary.unlink(missing_ok=True) def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]: output: list[dict[str, Any]] = [] for service in services: expected: list[int | str] = sorted(service.expected_statuses) if service.expected_min <= service.expected_max: expected.append(f"{service.expected_min}-{service.expected_max}") item: dict[str, Any] = { "name": service.name, "group": service.group, "url": service.url, "displayUrl": service.display_url, "method": service.method, "timeoutSeconds": service.timeout, "expectedStatuses": expected or ["200-399"], } if service.keyword: item["keyword"] = service.keyword output.append(item) return output def status_expected(service: Service, status: int) -> bool: if status in service.expected_statuses: return True return service.expected_min <= status <= service.expected_max def check_service(service: Service) -> CheckResult: started = time.monotonic() headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")} request = urllib.request.Request(service.url, headers=headers, method=service.method) try: context = ssl.create_default_context() with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response: body = response.read(1_000_000) if service.keyword else b"" status = int(response.status) except urllib.error.HTTPError as exc: status = int(exc.code) latency_ms = int((time.monotonic() - started) * 1000) ok = status_expected(service, status) return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}") except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc: latency_ms = int((time.monotonic() - started) * 1000) return CheckResult(service, False, None, latency_ms, clean_error(exc)) latency_ms = int((time.monotonic() - started) * 1000) ok = status_expected(service, status) if ok and service.keyword: try: text = body.decode("utf-8", errors="ignore") except UnicodeDecodeError: text = "" if service.keyword not in text: ok = False return CheckResult(service, False, status, latency_ms, "keyword missing") return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}") def clean_error(exc: BaseException) -> str: reason = getattr(exc, "reason", None) if reason: return str(reason)[:120] return str(exc)[:120] or exc.__class__.__name__ def discord_request( method: str, token: str, path: str, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: body = None headers = { "Authorization": f"Bot {token}", "User-Agent": "ArchiveStatusBot/1.0", } if payload is not None: body = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" request = urllib.request.Request( f"{DISCORD_API}{path}", data=body, headers=headers, method=method, ) try: with urllib.request.urlopen(request, timeout=20) as response: data = response.read() if not data: return {} return json.loads(data.decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc def discord_multipart_request( method: str, token: str, path: str, payload: dict[str, Any], files: list[tuple[str, str, str, bytes]], ) -> dict[str, Any]: boundary = f"----ArchiveBot{secrets.token_hex(16)}" body = bytearray() def add_part(name: str, content: bytes, content_type: str, filename: str | None = None) -> None: body.extend(f"--{boundary}\r\n".encode("ascii")) disposition = f'Content-Disposition: form-data; name="{name}"' if filename is not None: disposition += f'; filename="{filename}"' body.extend(f"{disposition}\r\n".encode("utf-8")) body.extend(f"Content-Type: {content_type}\r\n\r\n".encode("ascii")) body.extend(content) body.extend(b"\r\n") add_part("payload_json", json.dumps(payload).encode("utf-8"), "application/json") for field_name, filename, content_type, content in files: add_part(field_name, content, content_type, filename) body.extend(f"--{boundary}--\r\n".encode("ascii")) request = urllib.request.Request( f"{DISCORD_API}{path}", data=bytes(body), headers={ "Authorization": f"Bot {token}", "User-Agent": "ArchiveStatusBot/1.0", "Content-Type": f"multipart/form-data; boundary={boundary}", }, method=method, ) try: with urllib.request.urlopen(request, timeout=30) as response: data = response.read() if not data: return {} return json.loads(data.decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc def discord_delete_message(token: str, channel_id: str, message_id: str) -> None: try: discord_request("DELETE", token, f"/channels/{channel_id}/messages/{message_id}") except RuntimeError as exc: print(f"Could not delete old media catalog message {message_id}: {exc}", file=sys.stderr) def discord_bot_identity(token: str) -> dict[str, Any]: return discord_request("GET", token, "/users/@me") def load_state(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: with path.open("r", encoding="utf-8") as handle: data = json.load(handle) except json.JSONDecodeError: return {} return data if isinstance(data, dict) else {} def save_state(path: Path, state: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) temporary = path.with_suffix(f"{path.suffix}.tmp") with temporary.open("w", encoding="utf-8") as handle: json.dump(state, handle, indent=2, sort_keys=True) handle.write("\n") temporary.replace(path) def validate_channel_id(value: str, label: str) -> str: channel_id = value.strip() if not channel_id: raise ValueError(f"{label} channel ID is required") if not channel_id.isdigit(): raise ValueError(f"{label} channel ID must be a Discord numeric channel ID") return channel_id def channel_settings(runtime: BotRuntime) -> dict[str, str]: data = load_state(runtime.settings_path) status_channel = str(data.get("status_channel_id", "")).strip() or runtime.default_channel_id media_channel = str(data.get("media_channel_id", "")).strip() or status_channel catalog_url = str(data.get("catalog_url", "")).strip() or os.getenv("PUBLIC_CATALOG_URL", "").strip() return { "statusChannelId": status_channel, "mediaChannelId": media_channel, "catalogUrl": catalog_url, } def validate_catalog_url(value: str) -> str: catalog_url = value.strip() if not catalog_url: return "" parsed = urllib.parse.urlparse(catalog_url) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError("Catalog URL must be a valid http(s) URL") return catalog_url def save_channel_settings(runtime: BotRuntime, status_channel_id: str, media_channel_id: str, catalog_url: str = "") -> dict[str, str]: status_channel = validate_channel_id(status_channel_id, "Status") media_channel = validate_channel_id(media_channel_id or status_channel, "Media") state = load_state(runtime.settings_path) state.update( { "status_channel_id": status_channel, "media_channel_id": media_channel, "catalog_url": validate_catalog_url(catalog_url), "updated_at": datetime.now(timezone.utc).isoformat(), } ) save_state(runtime.settings_path, state) return channel_settings(runtime) def save_media_channel_setting(runtime: BotRuntime, media_channel_id: str) -> dict[str, str]: media_channel = validate_channel_id(media_channel_id, "Media") state = load_state(runtime.settings_path) state["media_channel_id"] = media_channel state["updated_at"] = datetime.now(timezone.utc).isoformat() save_state(runtime.settings_path, state) return channel_settings(runtime) def jellyfin_settings(runtime: BotRuntime) -> dict[str, Any]: data = load_state(runtime.settings_path) api_key = str(data.get("jellyfin_api_key", "")).strip() return { "url": str(data.get("jellyfin_url", "")).strip(), "configured": bool(api_key), "autoSync": bool(data.get("jellyfin_auto_sync", False)), "lastSyncAt": data.get("jellyfin_last_sync_at"), "lastSyncError": data.get("jellyfin_last_sync_error"), "lastPublishedAt": data.get("jellyfin_last_published_at"), "lastFingerprint": data.get("jellyfin_last_fingerprint"), "lastPublishedFingerprint": data.get("jellyfin_last_published_fingerprint"), } def save_jellyfin_settings(runtime: BotRuntime, data: dict[str, Any]) -> dict[str, Any]: state = load_state(runtime.settings_path) url = str(data.get("url", "")).strip().rstrip("/") if url: parsed = urllib.parse.urlparse(url) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError("Jellyfin URL must be a valid http(s) URL") api_key = str(data.get("apiKey", "")).strip() state["jellyfin_url"] = url if api_key: state["jellyfin_api_key"] = api_key elif data.get("clearApiKey"): state["jellyfin_api_key"] = "" state["jellyfin_auto_sync"] = bool(data.get("autoSync", False)) state["updated_at"] = datetime.now(timezone.utc).isoformat() save_state(runtime.settings_path, state) return jellyfin_settings(runtime) def save_catalog_url_setting(runtime: BotRuntime, catalog_url: str) -> dict[str, str]: state = load_state(runtime.settings_path) state["catalog_url"] = validate_catalog_url(catalog_url) state["updated_at"] = datetime.now(timezone.utc).isoformat() save_state(runtime.settings_path, state) return channel_settings(runtime) def normalize_csv_key(value: str) -> str: return "".join(character for character in value.lower() if character.isalnum()) def value_from_row(row: dict[str, str], aliases: list[str]) -> str: for alias in aliases: value = row.get(alias, "").strip() if value: return value return "" def parse_int_text(value: str) -> int | None: digits = "".join(character for character in value if character.isdigit()) if not digits: return None try: return int(digits) except ValueError: return None def parse_year_text(value: str) -> str | None: match = re.search(r"(18|19|20|21)\d{2}", value) return match.group(0) if match else None def clean_media_text(value: str, limit: int = 220) -> str | None: cleaned = " ".join(str(value).replace("\r", " ").replace("\n", " ").split()) if not cleaned: return None if len(cleaned) <= limit: return cleaned return cleaned[: limit - 1].rstrip() + "…" def format_runtime(value: str) -> str | None: cleaned = clean_media_text(value, 48) if not cleaned: return None if not cleaned.isdigit(): return cleaned minutes = int(cleaned) if minutes <= 0: return None hours, remainder = divmod(minutes, 60) if hours and remainder: return f"{hours}h {remainder}m" if hours: return f"{hours}h" return f"{remainder}m" def parse_media_csv(csv_text: str, media_type: str, filename: str) -> list[MediaItem]: text = csv_text.lstrip("\ufeff") if not text.strip(): return [] try: dialect = csv.Sniffer().sniff(text[:4096], delimiters=",;\t|") except csv.Error: dialect = csv.excel reader = csv.DictReader(io.StringIO(text), dialect=dialect) if not reader.fieldnames: raise ValueError(f"{filename} does not have a header row") title_aliases = [ "title", "name", "movie", "movietitle", "sorttitle", "originaltitle", ] if media_type == "show": title_aliases = [ "showtitle", "seriestitle", "series", "show", "grandparenttitle", "parenttitle", "title", "name", ] year_aliases = ["year", "releaseyear", "productionyear", "releasedate", "premiered", "date"] genre_aliases = ["genres", "genre", "tags", "categories"] rating_aliases = ["rating", "contentrating", "agerating", "certification", "mpaarating"] runtime_aliases = ["runtime", "duration", "runtimeminutes", "length", "durationminutes"] summary_aliases = ["summary", "overview", "description", "plot", "tagline"] season_count_aliases = ["seasoncount", "seasons"] season_number_aliases = ["season", "seasonnumber", "seasonindex", "parentindex"] episode_count_aliases = ["episodecount", "episodes"] episode_number_aliases = ["episode", "episodenumber", "episodeindex", "index"] items: list[MediaItem] = [] show_groups: dict[tuple[str, str], dict[str, Any]] = {} for raw_row in reader: row = { normalize_csv_key(str(key)): str(value or "") for key, value in raw_row.items() if key is not None } title = clean_media_text(value_from_row(row, title_aliases), 120) if not title: continue year = parse_year_text(value_from_row(row, year_aliases)) genres = clean_media_text(value_from_row(row, genre_aliases), 120) rating = clean_media_text(value_from_row(row, rating_aliases), 32) runtime = format_runtime(value_from_row(row, runtime_aliases)) summary = clean_media_text(value_from_row(row, summary_aliases)) season_count = parse_int_text(value_from_row(row, season_count_aliases)) season_number = parse_int_text(value_from_row(row, season_number_aliases)) episode_count = parse_int_text(value_from_row(row, episode_count_aliases)) episode_number = parse_int_text(value_from_row(row, episode_number_aliases)) if media_type == "show": key = (title.casefold(), year or "") group = show_groups.setdefault( key, { "title": title, "year": year, "genres": genres, "rating": rating, "runtime": runtime, "summary": summary, "seasons": set(), "episodes": 0, "explicit_season_count": None, "explicit_episode_count": None, }, ) if not group.get("genres") and genres: group["genres"] = genres if not group.get("rating") and rating: group["rating"] = rating if not group.get("runtime") and runtime: group["runtime"] = runtime if not group.get("summary") and summary: group["summary"] = summary if season_number is not None: group["seasons"].add(season_number) if episode_number is not None: group["episodes"] += 1 if season_count is not None: current = group.get("explicit_season_count") group["explicit_season_count"] = max(current or 0, season_count) if episode_count is not None: current = group.get("explicit_episode_count") group["explicit_episode_count"] = max(current or 0, episode_count) continue items.append( MediaItem( title=title, media_type=media_type, year=year, genres=genres, rating=rating, runtime=runtime, summary=summary, ) ) if media_type == "show": for group in show_groups.values(): seasons = group.get("explicit_season_count") or len(group["seasons"]) or None episodes = group.get("explicit_episode_count") or group["episodes"] or None items.append( MediaItem( title=group["title"], media_type=media_type, year=group["year"], genres=group["genres"], rating=group["rating"], runtime=group["runtime"], summary=group["summary"], seasons=seasons, episodes=episodes, ) ) deduped: dict[tuple[str, str], MediaItem] = {} for item in items: deduped.setdefault((item.title.casefold(), item.year or ""), item) parsed = sorted(deduped.values(), key=lambda item: (item.title.casefold(), item.year or "")) if not parsed: raise ValueError(f"{filename} did not contain any rows with a recognizable title/name column") return parsed def media_item_to_jsonable(item: MediaItem) -> dict[str, Any]: data: dict[str, Any] = { "title": item.title, "mediaType": item.media_type, "year": item.year or "", "genres": item.genres or "", "rating": item.rating or "", "runtime": item.runtime or "", "summary": item.summary or "", } if item.media_type == "show": data["seasons"] = item.seasons if item.seasons is not None else "" data["episodes"] = item.episodes if item.episodes is not None else "" return data def media_item_from_data(data: Any, media_type: str) -> MediaItem: if not isinstance(data, dict): raise ValueError(f"{media_type.title()} entries must be objects") title = clean_media_text(str(data.get("title", "")), 120) if not title: raise ValueError(f"{media_type.title()} entries must include a title") year = parse_year_text(str(data.get("year", ""))) seasons = parse_int_text(str(data.get("seasons", ""))) if media_type == "show" else None episodes = parse_int_text(str(data.get("episodes", ""))) if media_type == "show" else None return MediaItem( title=title, media_type=media_type, year=year, genres=clean_media_text(str(data.get("genres", "")), 120), rating=clean_media_text(str(data.get("rating", "")), 32), runtime=format_runtime(str(data.get("runtime", ""))) if media_type == "movie" else None, summary=clean_media_text(str(data.get("summary", ""))), seasons=seasons, episodes=episodes, ) def media_items_from_data(raw_items: Any, media_type: str) -> list[MediaItem]: if raw_items is None: return [] if not isinstance(raw_items, list): raise ValueError(f"{media_type.title()} library must be a list") deduped: dict[tuple[str, str], MediaItem] = {} for raw_item in raw_items: item = media_item_from_data(raw_item, media_type) deduped[(item.title.casefold(), item.year or "")] = item return sorted(deduped.values(), key=lambda item: (item.title.casefold(), item.year or "")) def media_library_to_jsonable(movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]: return { "movies": [media_item_to_jsonable(item) for item in movies], "shows": [media_item_to_jsonable(item) for item in shows], } def load_media_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]: data = load_state(runtime.media_library_path) return ( media_items_from_data(data.get("movies", []), "movie"), media_items_from_data(data.get("shows", []), "show"), ) def save_media_library(runtime: BotRuntime, movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]: payload = media_library_to_jsonable(movies, shows) payload["updated_at"] = datetime.now(timezone.utc).isoformat() save_state(runtime.media_library_path, payload) return payload def import_media_csvs( runtime: BotRuntime, movies_csv: str, shows_csv: str, movie_filename: str, show_filename: str, ) -> dict[str, Any]: current_movies, current_shows = load_media_library(runtime) movies = parse_media_csv(movies_csv, "movie", movie_filename) if movies_csv.strip() else current_movies shows = parse_media_csv(shows_csv, "show", show_filename) if shows_csv.strip() else current_shows save_media_library(runtime, movies, shows) return { "library": media_library_to_jsonable(movies, shows), "movieCount": len(movies), "showCount": len(shows), } def jellyfin_runtime(runtime_ticks: Any) -> str | None: try: ticks = int(runtime_ticks or 0) except (TypeError, ValueError): return None if ticks <= 0: return None minutes = round(ticks / 10_000_000 / 60) if minutes <= 0: return None hours, remainder = divmod(minutes, 60) if hours and remainder: return f"{hours}h {remainder}m" if hours: return f"{hours}h" return f"{remainder}m" def jellyfin_item_year(item: dict[str, Any]) -> str | None: production_year = item.get("ProductionYear") if production_year: return parse_year_text(str(production_year)) return parse_year_text(str(item.get("PremiereDate", ""))) def jellyfin_item_summary(item: dict[str, Any]) -> str | None: return clean_media_text(str(item.get("Overview", "") or item.get("ShortOverview", ""))) def jellyfin_movie_from_item(item: dict[str, Any]) -> MediaItem: return MediaItem( title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Movie", media_type="movie", year=jellyfin_item_year(item), genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120), rating=clean_media_text(str(item.get("OfficialRating", "")), 32), runtime=jellyfin_runtime(item.get("RunTimeTicks")), summary=jellyfin_item_summary(item), ) def jellyfin_show_from_item(item: dict[str, Any]) -> MediaItem: return MediaItem( title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Show", media_type="show", year=jellyfin_item_year(item), genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120), rating=clean_media_text(str(item.get("OfficialRating", "")), 32), summary=jellyfin_item_summary(item), seasons=parse_int_text(str(item.get("ChildCount", ""))), episodes=parse_int_text(str(item.get("RecursiveItemCount", ""))), ) def jellyfin_request(settings: dict[str, Any], path: str, params: dict[str, Any]) -> dict[str, Any]: base_url = str(settings.get("url", "")).strip().rstrip("/") api_key = str(settings.get("apiKey", "")).strip() if not base_url or not api_key: raise ValueError("Jellyfin URL and API key are required") query = urllib.parse.urlencode({key: value for key, value in params.items() if value is not None}) url = f"{base_url}{path}" if query: url = f"{url}?{query}" request = urllib.request.Request( url, headers={ "Accept": "application/json", "User-Agent": "ArchiveStatusBot/1.0", "X-Emby-Token": api_key, }, method="GET", ) try: with urllib.request.urlopen(request, timeout=30) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"Jellyfin API failed: HTTP {exc.code} {detail}") from exc except (urllib.error.URLError, TimeoutError, socket.timeout) as exc: raise RuntimeError(f"Jellyfin API failed: {clean_error(exc)}") from exc def fetch_jellyfin_items(settings: dict[str, Any], item_type: str) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] start_index = 0 limit = 200 while True: data = jellyfin_request( settings, "/Items", { "Recursive": "true", "IncludeItemTypes": item_type, "Fields": "Genres,Overview,OfficialRating,RecursiveItemCount,ChildCount,PremiereDate,RunTimeTicks,ProviderIds", "SortBy": "SortName", "SortOrder": "Ascending", "EnableImages": "false", "StartIndex": start_index, "Limit": limit, }, ) page = data.get("Items", []) if not isinstance(page, list): raise RuntimeError("Jellyfin returned an invalid Items payload") items.extend(item for item in page if isinstance(item, dict)) total = int(data.get("TotalRecordCount", len(items)) or len(items)) if not page or len(items) >= total: return items start_index += limit def jellyfin_item_dedupe_key(item: dict[str, Any], item_type: str) -> tuple[str, str]: provider_ids = item.get("ProviderIds") if isinstance(provider_ids, dict): for provider in ("Tmdb", "Imdb", "Tvdb"): value = str(provider_ids.get(provider, "")).strip().casefold() if value: return item_type.casefold(), f"{provider.casefold()}:{value}" title = clean_media_text(str(item.get("Name", "")), 120) or "" year = jellyfin_item_year(item) or "" return item_type.casefold(), f"title:{title.casefold()}:{year}" def dedupe_jellyfin_items(items: list[dict[str, Any]], item_type: str) -> list[dict[str, Any]]: provider_deduped: dict[tuple[str, str], dict[str, Any]] = {} for item in items: key = jellyfin_item_dedupe_key(item, item_type) current = provider_deduped.get(key) if current is None: provider_deduped[key] = item continue current_overview = str(current.get("Overview", "") or current.get("ShortOverview", "")) next_overview = str(item.get("Overview", "") or item.get("ShortOverview", "")) if len(next_overview) > len(current_overview): provider_deduped[key] = item title_deduped: dict[tuple[str, str], dict[str, Any]] = {} for item in provider_deduped.values(): title = clean_media_text(str(item.get("Name", "")), 120) or "" key = (title.casefold(), jellyfin_item_year(item) or "") current = title_deduped.get(key) if current is None: title_deduped[key] = item continue current_has_provider = bool(current.get("ProviderIds")) next_has_provider = bool(item.get("ProviderIds")) if next_has_provider and not current_has_provider: title_deduped[key] = item continue current_overview = str(current.get("Overview", "") or current.get("ShortOverview", "")) next_overview = str(item.get("Overview", "") or item.get("ShortOverview", "")) if len(next_overview) > len(current_overview): title_deduped[key] = item return sorted(title_deduped.values(), key=lambda item: (str(item.get("SortName", item.get("Name", ""))).casefold(), str(item.get("ProductionYear", "")))) def fetch_jellyfin_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]: state = load_state(runtime.settings_path) settings = { "url": str(state.get("jellyfin_url", "")).strip(), "apiKey": str(state.get("jellyfin_api_key", "")).strip(), } movie_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Movie"), "Movie") show_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Series"), "Series") movies = [jellyfin_movie_from_item(item) for item in movie_items] shows = [jellyfin_show_from_item(item) for item in show_items] return ( sorted(movies, key=lambda item: (item.title.casefold(), item.year or "")), sorted(shows, key=lambda item: (item.title.casefold(), item.year or "")), ) def media_library_fingerprint(movies: list[MediaItem], shows: list[MediaItem]) -> str: payload = media_library_to_jsonable(movies, shows) body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") return hashlib.sha256(body).hexdigest() def update_jellyfin_sync_state( runtime: BotRuntime, *, fingerprint: str | None = None, error: str | None = None, published: bool = False, ) -> None: state = load_state(runtime.settings_path) now = datetime.now(timezone.utc).isoformat() state["jellyfin_last_sync_at"] = now state["jellyfin_last_sync_error"] = error if fingerprint is not None: state["jellyfin_last_fingerprint"] = fingerprint if published: state["jellyfin_last_published_at"] = now if fingerprint is not None: state["jellyfin_last_published_fingerprint"] = fingerprint save_state(runtime.settings_path, state) def sync_jellyfin_library(runtime: BotRuntime, force_publish: bool = False) -> dict[str, Any]: movies, shows = fetch_jellyfin_library(runtime) fingerprint = media_library_fingerprint(movies, shows) settings_state = load_state(runtime.settings_path) changed = fingerprint != str(settings_state.get("jellyfin_last_fingerprint", "")) publish_changed = fingerprint != str(settings_state.get("jellyfin_last_published_fingerprint", "")) save_media_library(runtime, movies, shows) published = False result: dict[str, Any] = {} if (publish_changed or force_publish) and not runtime.dry_run: result = publish_media_items( runtime=runtime, channel_id=channel_settings(runtime)["mediaChannelId"], movies_all=movies, shows_all=shows, ) published = True update_jellyfin_sync_state(runtime, fingerprint=fingerprint, published=published) return { "changed": changed, "published": published, "movieCount": len(movies), "showCount": len(shows), "library": media_library_to_jsonable(movies, shows), "publishResult": result, "jellyfin": jellyfin_settings(runtime), } def maybe_run_jellyfin_sync(runtime: BotRuntime) -> dict[str, Any] | None: settings = jellyfin_settings(runtime) if not settings["configured"] or not settings["autoSync"]: return None interval = int(os.getenv("JELLYFIN_SYNC_INTERVAL_SECONDS", str(DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS))) state = load_state(runtime.settings_path) last_sync = str(state.get("jellyfin_last_sync_at", "")).strip() if last_sync: try: last = datetime.fromisoformat(last_sync) if (datetime.now(timezone.utc) - last).total_seconds() < interval: return None except ValueError: pass try: return sync_jellyfin_library(runtime) except Exception as exc: update_jellyfin_sync_state(runtime, error=str(exc)[:240]) raise def media_item_heading(item: MediaItem) -> str: if item.year: return f"{item.title} ({item.year})" return item.title def media_item_value(item: MediaItem) -> str: details: list[str] = [] if item.genres: details.append(item.genres) if item.rating: details.append(item.rating) if item.runtime and item.media_type == "movie": details.append(item.runtime) if item.media_type == "show": counts = [] if item.seasons: counts.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}") if item.episodes: counts.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}") if counts: details.append(" · ".join(counts)) lines = [] if details: lines.append(" • ".join(details)) if item.summary: lines.append(item.summary) return "\n".join(lines)[:1024] or "No details provided." def media_category_embeds( title: str, items: list[MediaItem], color: int, total_count: int, source_name: str, ) -> list[dict[str, Any]]: embeds: list[dict[str, Any]] = [] omitted = max(total_count - len(items), 0) page_count = max((len(items) + MEDIA_ITEMS_PER_EMBED - 1) // MEDIA_ITEMS_PER_EMBED, 1) for page_index in range(page_count): start = page_index * MEDIA_ITEMS_PER_EMBED chunk = items[start : start + MEDIA_ITEMS_PER_EMBED] description = f"{total_count} total from `{source_name}`" if omitted: description += f" · showing first {len(items)}" if page_count > 1: description += f" · page {page_index + 1}/{page_count}" fields = [ { "name": media_item_heading(item)[:256], "value": media_item_value(item), "inline": False, } for item in chunk ] embeds.append( { "title": title, "description": description, "color": color, "fields": fields, } ) return embeds def render_media_catalog_payloads( movies: list[MediaItem], shows: list[MediaItem], movie_total: int, show_total: int, movie_source: str, show_source: str, ) -> list[dict[str, Any]]: now = datetime.now(timezone.utc) embeds: list[dict[str, Any]] = [ { "title": "The Mithral Archive Media Catalog", "description": "Current movies and shows available in the archive.", "color": 0x5865F2, "fields": [ {"name": "Movies", "value": str(movie_total), "inline": True}, {"name": "Shows", "value": str(show_total), "inline": True}, {"name": "Updated", "value": now.strftime("%Y-%m-%d %H:%M UTC"), "inline": True}, ], "timestamp": now.isoformat(), } ] if movies: embeds.extend(media_category_embeds("Movies", movies, 0xF59E0B, movie_total, movie_source)) if shows: embeds.extend(media_category_embeds("Shows", shows, 0x10B981, show_total, show_source)) payloads = [] for index in range(0, len(embeds), MAX_DISCORD_EMBEDS): payloads.append( { "content": "**The Mithral Archive Media Catalog**" if index == 0 else "", "embeds": embeds[index : index + MAX_DISCORD_EMBEDS], } ) return payloads def media_markdown_line(item: MediaItem) -> str: title = media_item_heading(item) details: list[str] = [] if item.genres: details.append(item.genres) if item.rating: details.append(item.rating) if item.runtime and item.media_type == "movie": details.append(item.runtime) if item.media_type == "show": if item.seasons: details.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}") if item.episodes: details.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}") suffix = f" - {'; '.join(details)}" if details else "" return f"- **{title}**{suffix}" def render_media_catalog_markdown(movies: list[MediaItem], shows: list[MediaItem]) -> str: now = datetime.now(timezone.utc) lines = [ "# The Mithral Archive Media Catalog", "", f"Updated: {now.strftime('%Y-%m-%d %H:%M UTC')}", f"Movies: {len(movies)}", f"Shows: {len(shows)}", "", ] if movies: lines.extend(["## Movies", ""]) for item in movies: lines.append(media_markdown_line(item)) if item.summary: lines.append(f" - {item.summary}") lines.append("") if shows: lines.extend(["## Shows", ""]) for item in shows: lines.append(media_markdown_line(item)) if item.summary: lines.append(f" - {item.summary}") lines.append("") return "\n".join(lines).strip() + "\n" def render_catalog_html(runtime: BotRuntime) -> bytes: movies, shows = load_media_library(runtime) updated_at = load_state(runtime.media_library_path).get("updated_at") updated = "Never" if updated_at: try: updated = datetime.fromisoformat(str(updated_at)).strftime("%Y-%m-%d %H:%M UTC") except ValueError: updated = str(updated_at) def item_block(item: MediaItem) -> str: meta = [] if item.year: meta.append(item.year) if item.genres: meta.append(item.genres) if item.rating: meta.append(item.rating) if item.runtime: meta.append(item.runtime) if item.media_type == "show": if item.seasons: meta.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}") if item.episodes: meta.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}") summary = f"
{html.escape(item.summary)}
" if item.summary else "" return ( f'