2237 lines
81 KiB
Python
2237 lines
81 KiB
Python
#!/usr/bin/env python3
|
|
"""Live Discord status message for The Mithral Archive."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import asyncio
|
|
import hashlib
|
|
import hmac
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import secrets
|
|
import signal
|
|
import socket
|
|
import ssl
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from http.cookies import SimpleCookie
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DISCORD_API = "https://discord.com/api/v10"
|
|
DEFAULT_INTERVAL_SECONDS = 60
|
|
DEFAULT_TIMEOUT_SECONDS = 10
|
|
MAX_DISCORD_EMBEDS = 10
|
|
MAX_REQUEST_BYTES = 8_000_000
|
|
SESSION_COOKIE = "archive_bot_session"
|
|
PBKDF2_ITERATIONS = 390_000
|
|
MEDIA_ITEMS_PER_EMBED = 10
|
|
DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS = 900
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Service:
|
|
name: str
|
|
group: str
|
|
url: str
|
|
display_url: str
|
|
method: str
|
|
timeout: float
|
|
expected_statuses: set[int]
|
|
expected_min: int
|
|
expected_max: int
|
|
keyword: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CheckResult:
|
|
service: Service
|
|
ok: bool
|
|
status: int | None
|
|
latency_ms: int | None
|
|
error: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MediaItem:
|
|
title: str
|
|
media_type: str
|
|
year: str | None = None
|
|
genres: str | None = None
|
|
rating: str | None = None
|
|
runtime: str | None = None
|
|
summary: str | None = None
|
|
seasons: int | None = None
|
|
episodes: int | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DashboardAuthConfig:
|
|
username: str
|
|
password_hash: str
|
|
session_ttl_seconds: int
|
|
cookie_secure: bool
|
|
|
|
|
|
@dataclass
|
|
class DashboardSession:
|
|
username: str
|
|
csrf_token: str
|
|
expires_at: float
|
|
|
|
|
|
class DashboardAuth:
|
|
def __init__(self, config: DashboardAuthConfig) -> None:
|
|
self.config = config
|
|
self.lock = threading.Lock()
|
|
self.sessions: dict[str, DashboardSession] = {}
|
|
self.failed_logins: dict[str, list[float]] = {}
|
|
|
|
def login_allowed(self, key: str) -> bool:
|
|
now = time.time()
|
|
window_start = now - 900
|
|
with self.lock:
|
|
attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start]
|
|
self.failed_logins[key] = attempts
|
|
return len(attempts) < 10
|
|
|
|
def record_failed_login(self, key: str) -> None:
|
|
now = time.time()
|
|
with self.lock:
|
|
self.failed_logins.setdefault(key, []).append(now)
|
|
|
|
def clear_failed_login(self, key: str) -> None:
|
|
with self.lock:
|
|
self.failed_logins.pop(key, None)
|
|
|
|
def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None:
|
|
if not hmac.compare_digest(username, self.config.username):
|
|
return None
|
|
if not verify_password_hash(self.config.password_hash, password):
|
|
return None
|
|
|
|
session_id = secrets.token_urlsafe(32)
|
|
session = DashboardSession(
|
|
username=username,
|
|
csrf_token=secrets.token_urlsafe(32),
|
|
expires_at=time.time() + self.config.session_ttl_seconds,
|
|
)
|
|
with self.lock:
|
|
self.sessions[session_id] = session
|
|
return session_id, session
|
|
|
|
def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None:
|
|
if not cookie_header:
|
|
return None
|
|
cookie = SimpleCookie()
|
|
cookie.load(cookie_header)
|
|
morsel = cookie.get(SESSION_COOKIE)
|
|
if morsel is None:
|
|
return None
|
|
|
|
session_id = morsel.value
|
|
now = time.time()
|
|
with self.lock:
|
|
session = self.sessions.get(session_id)
|
|
if session is None:
|
|
return None
|
|
if session.expires_at <= now:
|
|
self.sessions.pop(session_id, None)
|
|
return None
|
|
session.expires_at = now + self.config.session_ttl_seconds
|
|
return session_id, session
|
|
|
|
def logout(self, session_id: str) -> None:
|
|
with self.lock:
|
|
self.sessions.pop(session_id, None)
|
|
|
|
|
|
class BotRuntime:
|
|
def __init__(
|
|
self,
|
|
token: str,
|
|
channel_id: str,
|
|
config_path: Path,
|
|
state_path: Path,
|
|
media_state_path: Path,
|
|
media_library_path: Path,
|
|
settings_path: Path,
|
|
dry_run: bool = False,
|
|
) -> None:
|
|
self.token = token
|
|
self.default_channel_id = channel_id
|
|
self.config_path = config_path
|
|
self.state_path = state_path
|
|
self.media_state_path = media_state_path
|
|
self.media_library_path = media_library_path
|
|
self.settings_path = settings_path
|
|
self.dry_run = dry_run
|
|
self.lock = threading.Lock()
|
|
self.last_results: list[CheckResult] = []
|
|
self.last_error: str | None = None
|
|
self.last_message_id: str | None = None
|
|
self.last_checked_at: datetime | None = None
|
|
|
|
|
|
class DiscordGatewayManager:
|
|
def __init__(self, token: str) -> None:
|
|
self.token = token
|
|
self.thread: threading.Thread | None = None
|
|
self.loop: asyncio.AbstractEventLoop | None = None
|
|
self.client: Any = None
|
|
self.ready = threading.Event()
|
|
self._disconnecting = threading.Event()
|
|
|
|
def start(self) -> None:
|
|
if self.thread is not None:
|
|
return
|
|
self.thread = threading.Thread(target=self._run, name="discord-gateway", daemon=True)
|
|
self.thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self._disconnecting.set()
|
|
if self.loop is not None and self.client is not None:
|
|
asyncio.run_coroutine_threadsafe(self.client.close(), self.loop)
|
|
if self.thread is not None:
|
|
self.thread.join(timeout=15)
|
|
|
|
def _run(self) -> None:
|
|
try:
|
|
import discord
|
|
except ImportError:
|
|
print("discord.py is not installed. Install requirements.txt to keep the bot online.", file=sys.stderr, flush=True)
|
|
self.ready.set()
|
|
return
|
|
|
|
class GatewayClient(discord.Client):
|
|
def __init__(self, manager: DiscordGatewayManager) -> None:
|
|
intents = discord.Intents.default()
|
|
intents.guilds = True
|
|
super().__init__(intents=intents)
|
|
self.manager = manager
|
|
|
|
async def on_ready(self) -> None:
|
|
await self.change_presence(status=discord.Status.online)
|
|
user = self.user
|
|
name = user.name if user is not None else "unknown"
|
|
bot_id = user.id if user is not None else "unknown"
|
|
print(f"Discord gateway connected as {name} ({bot_id})", flush=True)
|
|
self.manager.ready.set()
|
|
|
|
async def on_disconnect(self) -> None:
|
|
self.manager.ready.clear()
|
|
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
self.client = GatewayClient(self)
|
|
|
|
try:
|
|
self.loop.run_until_complete(self.client.start(self.token))
|
|
except Exception as exc:
|
|
if not self._disconnecting.is_set():
|
|
print(f"Discord gateway stopped: {exc}", file=sys.stderr, flush=True)
|
|
finally:
|
|
self.ready.set()
|
|
try:
|
|
pending = asyncio.all_tasks(self.loop)
|
|
for task in pending:
|
|
task.cancel()
|
|
if pending:
|
|
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
|
finally:
|
|
self.loop.close()
|
|
|
|
|
|
def env(name: str, default: str | None = None) -> str:
|
|
value = os.getenv(name, default)
|
|
if value is None or not value.strip():
|
|
raise SystemExit(f"Missing required environment variable: {name}")
|
|
return value.strip()
|
|
|
|
|
|
def normalize_discord_token(token: str) -> str:
|
|
cleaned = token.strip().strip("\"'")
|
|
if cleaned.lower().startswith("bot "):
|
|
cleaned = cleaned[4:].strip()
|
|
return cleaned
|
|
|
|
|
|
def load_dotenv(path: Path = Path(".env")) -> None:
|
|
if not path.exists():
|
|
return
|
|
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
|
continue
|
|
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip("\"'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except FileNotFoundError as exc:
|
|
raise ValueError(f"Config file not found: {path}") from exc
|
|
except PermissionError as exc:
|
|
raise ValueError(f"Config file is not readable: {path}") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
|
|
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"Config must be a JSON object: {path}")
|
|
return data
|
|
|
|
|
|
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
|
|
if raw is None:
|
|
return set(), 200, 399
|
|
|
|
if isinstance(raw, str):
|
|
raw = [part.strip() for part in raw.split(",") if part.strip()]
|
|
|
|
if not isinstance(raw, list):
|
|
raise ValueError("expectedStatuses must be a list or comma-separated string")
|
|
|
|
exact: set[int] = set()
|
|
min_status = 999
|
|
max_status = 0
|
|
|
|
for item in raw:
|
|
if isinstance(item, int):
|
|
exact.add(item)
|
|
continue
|
|
if not isinstance(item, str):
|
|
raise ValueError("expectedStatuses entries must be integers or ranges")
|
|
if "-" in item:
|
|
left, right = item.split("-", 1)
|
|
try:
|
|
min_status = min(min_status, int(left))
|
|
max_status = max(max_status, int(right))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status range: {item}") from exc
|
|
continue
|
|
try:
|
|
exact.add(int(item))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status value: {item}") from exc
|
|
|
|
if min_status == 999 and max_status == 0:
|
|
min_status, max_status = 0, -1
|
|
return exact, min_status, max_status
|
|
|
|
|
|
def services_from_data(data: dict[str, Any]) -> list[Service]:
|
|
raw_services = data.get("services")
|
|
if not isinstance(raw_services, list) or not raw_services:
|
|
raise ValueError("Config must include a non-empty services array")
|
|
|
|
services: list[Service] = []
|
|
for index, item in enumerate(raw_services, start=1):
|
|
if not isinstance(item, dict):
|
|
raise ValueError(f"Service #{index} must be an object")
|
|
|
|
name = str(item.get("name", "")).strip()
|
|
url = str(item.get("url", "")).strip()
|
|
if not name or not url:
|
|
raise ValueError(f"Service #{index} must include name and url")
|
|
|
|
parsed = urllib.parse.urlparse(url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise ValueError(f"Service {name} has an invalid http(s) URL")
|
|
|
|
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
|
|
services.append(
|
|
Service(
|
|
name=name,
|
|
group=str(item.get("group", "Main Services")).strip() or "Main Services",
|
|
url=url,
|
|
display_url=str(item.get("displayUrl", url)).strip() or url,
|
|
method=str(item.get("method", "GET")).strip().upper(),
|
|
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
|
|
expected_statuses=exact,
|
|
expected_min=minimum,
|
|
expected_max=maximum,
|
|
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
|
|
)
|
|
)
|
|
|
|
return services
|
|
|
|
|
|
def load_services(path: Path) -> list[Service]:
|
|
return services_from_data(load_json(path))
|
|
|
|
|
|
def save_services_config(path: Path, data: dict[str, Any]) -> None:
|
|
services_from_data(data)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
try:
|
|
temporary.replace(path)
|
|
except OSError:
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
temporary.unlink(missing_ok=True)
|
|
|
|
|
|
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for service in services:
|
|
expected: list[int | str] = sorted(service.expected_statuses)
|
|
if service.expected_min <= service.expected_max:
|
|
expected.append(f"{service.expected_min}-{service.expected_max}")
|
|
item: dict[str, Any] = {
|
|
"name": service.name,
|
|
"group": service.group,
|
|
"url": service.url,
|
|
"displayUrl": service.display_url,
|
|
"method": service.method,
|
|
"timeoutSeconds": service.timeout,
|
|
"expectedStatuses": expected or ["200-399"],
|
|
}
|
|
if service.keyword:
|
|
item["keyword"] = service.keyword
|
|
output.append(item)
|
|
return output
|
|
|
|
|
|
def status_expected(service: Service, status: int) -> bool:
|
|
if status in service.expected_statuses:
|
|
return True
|
|
return service.expected_min <= status <= service.expected_max
|
|
|
|
|
|
def check_service(service: Service) -> CheckResult:
|
|
started = time.monotonic()
|
|
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
|
|
request = urllib.request.Request(service.url, headers=headers, method=service.method)
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
|
|
body = response.read(1_000_000) if service.keyword else b""
|
|
status = int(response.status)
|
|
except urllib.error.HTTPError as exc:
|
|
status = int(exc.code)
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
return CheckResult(service, False, None, latency_ms, clean_error(exc))
|
|
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
|
|
if ok and service.keyword:
|
|
try:
|
|
text = body.decode("utf-8", errors="ignore")
|
|
except UnicodeDecodeError:
|
|
text = ""
|
|
if service.keyword not in text:
|
|
ok = False
|
|
return CheckResult(service, False, status, latency_ms, "keyword missing")
|
|
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
|
|
|
|
def clean_error(exc: BaseException) -> str:
|
|
reason = getattr(exc, "reason", None)
|
|
if reason:
|
|
return str(reason)[:120]
|
|
return str(exc)[:120] or exc.__class__.__name__
|
|
|
|
|
|
def discord_request(
|
|
method: str,
|
|
token: str,
|
|
path: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
body = None
|
|
headers = {
|
|
"Authorization": f"Bot {token}",
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
}
|
|
|
|
if payload is not None:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
request = urllib.request.Request(
|
|
f"{DISCORD_API}{path}",
|
|
data=body,
|
|
headers=headers,
|
|
method=method,
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=20) as response:
|
|
data = response.read()
|
|
if not data:
|
|
return {}
|
|
return json.loads(data.decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
|
|
|
|
|
|
def discord_multipart_request(
|
|
method: str,
|
|
token: str,
|
|
path: str,
|
|
payload: dict[str, Any],
|
|
files: list[tuple[str, str, str, bytes]],
|
|
) -> dict[str, Any]:
|
|
boundary = f"----ArchiveBot{secrets.token_hex(16)}"
|
|
body = bytearray()
|
|
|
|
def add_part(name: str, content: bytes, content_type: str, filename: str | None = None) -> None:
|
|
body.extend(f"--{boundary}\r\n".encode("ascii"))
|
|
disposition = f'Content-Disposition: form-data; name="{name}"'
|
|
if filename is not None:
|
|
disposition += f'; filename="{filename}"'
|
|
body.extend(f"{disposition}\r\n".encode("utf-8"))
|
|
body.extend(f"Content-Type: {content_type}\r\n\r\n".encode("ascii"))
|
|
body.extend(content)
|
|
body.extend(b"\r\n")
|
|
|
|
add_part("payload_json", json.dumps(payload).encode("utf-8"), "application/json")
|
|
for field_name, filename, content_type, content in files:
|
|
add_part(field_name, content, content_type, filename)
|
|
body.extend(f"--{boundary}--\r\n".encode("ascii"))
|
|
|
|
request = urllib.request.Request(
|
|
f"{DISCORD_API}{path}",
|
|
data=bytes(body),
|
|
headers={
|
|
"Authorization": f"Bot {token}",
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
|
},
|
|
method=method,
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
data = response.read()
|
|
if not data:
|
|
return {}
|
|
return json.loads(data.decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
|
|
|
|
|
|
def discord_delete_message(token: str, channel_id: str, message_id: str) -> None:
|
|
try:
|
|
discord_request("DELETE", token, f"/channels/{channel_id}/messages/{message_id}")
|
|
except RuntimeError as exc:
|
|
print(f"Could not delete old media catalog message {message_id}: {exc}", file=sys.stderr)
|
|
|
|
|
|
def discord_bot_identity(token: str) -> dict[str, Any]:
|
|
return discord_request("GET", token, "/users/@me")
|
|
|
|
|
|
def load_state(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def save_state(path: Path, state: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(state, handle, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
temporary.replace(path)
|
|
|
|
|
|
def validate_channel_id(value: str, label: str) -> str:
|
|
channel_id = value.strip()
|
|
if not channel_id:
|
|
raise ValueError(f"{label} channel ID is required")
|
|
if not channel_id.isdigit():
|
|
raise ValueError(f"{label} channel ID must be a Discord numeric channel ID")
|
|
return channel_id
|
|
|
|
|
|
def channel_settings(runtime: BotRuntime) -> dict[str, str]:
|
|
data = load_state(runtime.settings_path)
|
|
status_channel = str(data.get("status_channel_id", "")).strip() or runtime.default_channel_id
|
|
media_channel = str(data.get("media_channel_id", "")).strip() or status_channel
|
|
catalog_url = str(data.get("catalog_url", "")).strip() or os.getenv("PUBLIC_CATALOG_URL", "").strip()
|
|
return {
|
|
"statusChannelId": status_channel,
|
|
"mediaChannelId": media_channel,
|
|
"catalogUrl": catalog_url,
|
|
}
|
|
|
|
|
|
def validate_catalog_url(value: str) -> str:
|
|
catalog_url = value.strip()
|
|
if not catalog_url:
|
|
return ""
|
|
parsed = urllib.parse.urlparse(catalog_url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise ValueError("Catalog URL must be a valid http(s) URL")
|
|
return catalog_url
|
|
|
|
|
|
def save_channel_settings(runtime: BotRuntime, status_channel_id: str, media_channel_id: str, catalog_url: str = "") -> dict[str, str]:
|
|
status_channel = validate_channel_id(status_channel_id, "Status")
|
|
media_channel = validate_channel_id(media_channel_id or status_channel, "Media")
|
|
state = load_state(runtime.settings_path)
|
|
state.update(
|
|
{
|
|
"status_channel_id": status_channel,
|
|
"media_channel_id": media_channel,
|
|
"catalog_url": validate_catalog_url(catalog_url),
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
)
|
|
save_state(runtime.settings_path, state)
|
|
return channel_settings(runtime)
|
|
|
|
|
|
def save_media_channel_setting(runtime: BotRuntime, media_channel_id: str) -> dict[str, str]:
|
|
media_channel = validate_channel_id(media_channel_id, "Media")
|
|
state = load_state(runtime.settings_path)
|
|
state["media_channel_id"] = media_channel
|
|
state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.settings_path, state)
|
|
return channel_settings(runtime)
|
|
|
|
|
|
def jellyfin_settings(runtime: BotRuntime) -> dict[str, Any]:
|
|
data = load_state(runtime.settings_path)
|
|
api_key = str(data.get("jellyfin_api_key", "")).strip()
|
|
library_names = data.get("jellyfin_library_names", [])
|
|
if not isinstance(library_names, list):
|
|
library_names = []
|
|
return {
|
|
"url": str(data.get("jellyfin_url", "")).strip(),
|
|
"configured": bool(api_key),
|
|
"libraryNames": [str(name) for name in library_names if str(name).strip()],
|
|
"autoSync": bool(data.get("jellyfin_auto_sync", False)),
|
|
"lastSyncAt": data.get("jellyfin_last_sync_at"),
|
|
"lastSyncError": data.get("jellyfin_last_sync_error"),
|
|
"lastPublishedAt": data.get("jellyfin_last_published_at"),
|
|
"lastFingerprint": data.get("jellyfin_last_fingerprint"),
|
|
"lastPublishedFingerprint": data.get("jellyfin_last_published_fingerprint"),
|
|
"lastChanges": data.get("jellyfin_last_changes"),
|
|
}
|
|
|
|
|
|
def save_jellyfin_settings(runtime: BotRuntime, data: dict[str, Any]) -> dict[str, Any]:
|
|
state = load_state(runtime.settings_path)
|
|
url = str(data.get("url", "")).strip().rstrip("/")
|
|
if url:
|
|
parsed = urllib.parse.urlparse(url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise ValueError("Jellyfin URL must be a valid http(s) URL")
|
|
|
|
api_key = str(data.get("apiKey", "")).strip()
|
|
state["jellyfin_url"] = url
|
|
if api_key:
|
|
state["jellyfin_api_key"] = api_key
|
|
elif data.get("clearApiKey"):
|
|
state["jellyfin_api_key"] = ""
|
|
raw_library_names = data.get("libraryNames", [])
|
|
if isinstance(raw_library_names, str):
|
|
library_names = [name.strip() for name in raw_library_names.split(",") if name.strip()]
|
|
elif isinstance(raw_library_names, list):
|
|
library_names = [str(name).strip() for name in raw_library_names if str(name).strip()]
|
|
else:
|
|
raise ValueError("Jellyfin libraries must be a list or comma-separated string")
|
|
state["jellyfin_library_names"] = library_names
|
|
state["jellyfin_auto_sync"] = bool(data.get("autoSync", False))
|
|
state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.settings_path, state)
|
|
return jellyfin_settings(runtime)
|
|
|
|
|
|
def save_catalog_url_setting(runtime: BotRuntime, catalog_url: str) -> dict[str, str]:
|
|
state = load_state(runtime.settings_path)
|
|
state["catalog_url"] = validate_catalog_url(catalog_url)
|
|
state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.settings_path, state)
|
|
return channel_settings(runtime)
|
|
|
|
|
|
def parse_int_text(value: str) -> int | None:
|
|
digits = "".join(character for character in value if character.isdigit())
|
|
if not digits:
|
|
return None
|
|
try:
|
|
return int(digits)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_year_text(value: str) -> str | None:
|
|
match = re.search(r"(18|19|20|21)\d{2}", value)
|
|
return match.group(0) if match else None
|
|
|
|
|
|
def clean_media_text(value: str, limit: int = 220) -> str | None:
|
|
cleaned = " ".join(str(value).replace("\r", " ").replace("\n", " ").split())
|
|
if not cleaned:
|
|
return None
|
|
if len(cleaned) <= limit:
|
|
return cleaned
|
|
return cleaned[: limit - 1].rstrip() + "…"
|
|
|
|
|
|
def format_runtime(value: str) -> str | None:
|
|
cleaned = clean_media_text(value, 48)
|
|
if not cleaned:
|
|
return None
|
|
if not cleaned.isdigit():
|
|
return cleaned
|
|
|
|
minutes = int(cleaned)
|
|
if minutes <= 0:
|
|
return None
|
|
hours, remainder = divmod(minutes, 60)
|
|
if hours and remainder:
|
|
return f"{hours}h {remainder}m"
|
|
if hours:
|
|
return f"{hours}h"
|
|
return f"{remainder}m"
|
|
|
|
|
|
def media_item_to_jsonable(item: MediaItem) -> dict[str, Any]:
|
|
data: dict[str, Any] = {
|
|
"title": item.title,
|
|
"mediaType": item.media_type,
|
|
"year": item.year or "",
|
|
"genres": item.genres or "",
|
|
"rating": item.rating or "",
|
|
"runtime": item.runtime or "",
|
|
"summary": item.summary or "",
|
|
}
|
|
if item.media_type == "show":
|
|
data["seasons"] = item.seasons if item.seasons is not None else ""
|
|
data["episodes"] = item.episodes if item.episodes is not None else ""
|
|
return data
|
|
|
|
|
|
def media_item_from_data(data: Any, media_type: str) -> MediaItem:
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"{media_type.title()} entries must be objects")
|
|
|
|
title = clean_media_text(str(data.get("title", "")), 120)
|
|
if not title:
|
|
raise ValueError(f"{media_type.title()} entries must include a title")
|
|
|
|
year = parse_year_text(str(data.get("year", "")))
|
|
seasons = parse_int_text(str(data.get("seasons", ""))) if media_type == "show" else None
|
|
episodes = parse_int_text(str(data.get("episodes", ""))) if media_type == "show" else None
|
|
|
|
return MediaItem(
|
|
title=title,
|
|
media_type=media_type,
|
|
year=year,
|
|
genres=clean_media_text(str(data.get("genres", "")), 120),
|
|
rating=clean_media_text(str(data.get("rating", "")), 32),
|
|
runtime=format_runtime(str(data.get("runtime", ""))) if media_type == "movie" else None,
|
|
summary=clean_media_text(str(data.get("summary", ""))),
|
|
seasons=seasons,
|
|
episodes=episodes,
|
|
)
|
|
|
|
|
|
def media_items_from_data(raw_items: Any, media_type: str) -> list[MediaItem]:
|
|
if raw_items is None:
|
|
return []
|
|
if not isinstance(raw_items, list):
|
|
raise ValueError(f"{media_type.title()} library must be a list")
|
|
|
|
deduped: dict[tuple[str, str], MediaItem] = {}
|
|
for raw_item in raw_items:
|
|
item = media_item_from_data(raw_item, media_type)
|
|
deduped[(item.title.casefold(), item.year or "")] = item
|
|
|
|
return sorted(deduped.values(), key=lambda item: (item.title.casefold(), item.year or ""))
|
|
|
|
|
|
def media_library_to_jsonable(movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]:
|
|
return {
|
|
"movies": [media_item_to_jsonable(item) for item in movies],
|
|
"shows": [media_item_to_jsonable(item) for item in shows],
|
|
}
|
|
|
|
|
|
def load_media_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]:
|
|
data = load_state(runtime.media_library_path)
|
|
return (
|
|
media_items_from_data(data.get("movies", []), "movie"),
|
|
media_items_from_data(data.get("shows", []), "show"),
|
|
)
|
|
|
|
|
|
def save_media_library(runtime: BotRuntime, movies: list[MediaItem], shows: list[MediaItem]) -> dict[str, Any]:
|
|
payload = media_library_to_jsonable(movies, shows)
|
|
payload["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.media_library_path, payload)
|
|
return payload
|
|
|
|
|
|
def reset_media_library(runtime: BotRuntime) -> dict[str, Any]:
|
|
save_media_library(runtime, [], [])
|
|
state = load_state(runtime.settings_path)
|
|
for key in (
|
|
"jellyfin_last_sync_at",
|
|
"jellyfin_last_sync_error",
|
|
"jellyfin_last_published_at",
|
|
"jellyfin_last_fingerprint",
|
|
"jellyfin_last_published_fingerprint",
|
|
"jellyfin_last_changes",
|
|
):
|
|
state.pop(key, None)
|
|
state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.settings_path, state)
|
|
return {
|
|
"library": media_library_to_jsonable([], []),
|
|
"movieCount": 0,
|
|
"showCount": 0,
|
|
"changes": {"added": [], "removed": [], "addedCount": 0, "removedCount": 0},
|
|
"jellyfin": jellyfin_settings(runtime),
|
|
}
|
|
|
|
|
|
def jellyfin_runtime(runtime_ticks: Any) -> str | None:
|
|
try:
|
|
ticks = int(runtime_ticks or 0)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if ticks <= 0:
|
|
return None
|
|
minutes = round(ticks / 10_000_000 / 60)
|
|
if minutes <= 0:
|
|
return None
|
|
hours, remainder = divmod(minutes, 60)
|
|
if hours and remainder:
|
|
return f"{hours}h {remainder}m"
|
|
if hours:
|
|
return f"{hours}h"
|
|
return f"{remainder}m"
|
|
|
|
|
|
def jellyfin_item_year(item: dict[str, Any]) -> str | None:
|
|
production_year = item.get("ProductionYear")
|
|
if production_year:
|
|
return parse_year_text(str(production_year))
|
|
return parse_year_text(str(item.get("PremiereDate", "")))
|
|
|
|
|
|
def jellyfin_item_summary(item: dict[str, Any]) -> str | None:
|
|
return clean_media_text(str(item.get("Overview", "") or item.get("ShortOverview", "")))
|
|
|
|
|
|
def jellyfin_movie_from_item(item: dict[str, Any]) -> MediaItem:
|
|
return MediaItem(
|
|
title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Movie",
|
|
media_type="movie",
|
|
year=jellyfin_item_year(item),
|
|
genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120),
|
|
rating=clean_media_text(str(item.get("OfficialRating", "")), 32),
|
|
runtime=jellyfin_runtime(item.get("RunTimeTicks")),
|
|
summary=jellyfin_item_summary(item),
|
|
)
|
|
|
|
|
|
def jellyfin_show_from_item(item: dict[str, Any]) -> MediaItem:
|
|
return MediaItem(
|
|
title=clean_media_text(str(item.get("Name", "")), 120) or "Untitled Show",
|
|
media_type="show",
|
|
year=jellyfin_item_year(item),
|
|
genres=clean_media_text(", ".join(str(genre) for genre in item.get("Genres", []) if genre), 120),
|
|
rating=clean_media_text(str(item.get("OfficialRating", "")), 32),
|
|
summary=jellyfin_item_summary(item),
|
|
seasons=parse_int_text(str(item.get("ChildCount", ""))),
|
|
episodes=parse_int_text(str(item.get("RecursiveItemCount", ""))),
|
|
)
|
|
|
|
|
|
def jellyfin_request(settings: dict[str, Any], path: str, params: dict[str, Any]) -> dict[str, Any]:
|
|
base_url = str(settings.get("url", "")).strip().rstrip("/")
|
|
api_key = str(settings.get("apiKey", "")).strip()
|
|
if not base_url or not api_key:
|
|
raise ValueError("Jellyfin URL and API key are required")
|
|
|
|
query = urllib.parse.urlencode({key: value for key, value in params.items() if value is not None})
|
|
url = f"{base_url}{path}"
|
|
if query:
|
|
url = f"{url}?{query}"
|
|
request = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"Accept": "application/json",
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
"X-Emby-Token": api_key,
|
|
},
|
|
method="GET",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Jellyfin API failed: HTTP {exc.code} {detail}") from exc
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
|
|
raise RuntimeError(f"Jellyfin API failed: {clean_error(exc)}") from exc
|
|
|
|
|
|
def fetch_jellyfin_library_ids(settings: dict[str, Any], library_names: list[str]) -> list[str]:
|
|
if not library_names:
|
|
return []
|
|
|
|
requested = {name.casefold() for name in library_names}
|
|
data = jellyfin_request(settings, "/Library/MediaFolders", {})
|
|
items = data.get("Items", [])
|
|
if not isinstance(items, list):
|
|
raise RuntimeError("Jellyfin returned an invalid library folder payload")
|
|
|
|
matched: list[str] = []
|
|
available: list[str] = []
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = str(item.get("Name", "")).strip()
|
|
folder_id = str(item.get("Id", "")).strip()
|
|
if name:
|
|
available.append(name)
|
|
if name.casefold() in requested and folder_id:
|
|
matched.append(folder_id)
|
|
|
|
missing = sorted(set(library_names) - {name for name in available if name.casefold() in requested})
|
|
if missing:
|
|
raise ValueError(f"Jellyfin library not found: {', '.join(missing)}")
|
|
return matched
|
|
|
|
|
|
def fetch_jellyfin_items(settings: dict[str, Any], item_type: str) -> list[dict[str, Any]]:
|
|
parent_ids = settings.get("ParentIds")
|
|
if isinstance(parent_ids, list) and parent_ids:
|
|
all_items: list[dict[str, Any]] = []
|
|
for parent_id in parent_ids:
|
|
scoped_settings = dict(settings)
|
|
scoped_settings["ParentId"] = str(parent_id)
|
|
scoped_settings.pop("ParentIds", None)
|
|
all_items.extend(fetch_jellyfin_items(scoped_settings, item_type))
|
|
return all_items
|
|
|
|
parent_id_value = str(settings.get("ParentId", "")).strip() or None
|
|
items: list[dict[str, Any]] = []
|
|
start_index = 0
|
|
limit = 200
|
|
while True:
|
|
data = jellyfin_request(
|
|
settings,
|
|
"/Items",
|
|
{
|
|
"Recursive": "true",
|
|
"IncludeItemTypes": item_type,
|
|
"Fields": "Genres,Overview,OfficialRating,RecursiveItemCount,ChildCount,PremiereDate,RunTimeTicks,ProviderIds",
|
|
"SortBy": "SortName",
|
|
"SortOrder": "Ascending",
|
|
"EnableImages": "false",
|
|
"ParentId": parent_id_value,
|
|
"StartIndex": start_index,
|
|
"Limit": limit,
|
|
},
|
|
)
|
|
page = data.get("Items", [])
|
|
if not isinstance(page, list):
|
|
raise RuntimeError("Jellyfin returned an invalid Items payload")
|
|
items.extend(item for item in page if isinstance(item, dict))
|
|
total = int(data.get("TotalRecordCount", len(items)) or len(items))
|
|
if not page or len(items) >= total:
|
|
return items
|
|
start_index += limit
|
|
|
|
|
|
def jellyfin_item_dedupe_key(item: dict[str, Any], item_type: str) -> tuple[str, str]:
|
|
provider_ids = item.get("ProviderIds")
|
|
if isinstance(provider_ids, dict):
|
|
for provider in ("Tmdb", "Imdb", "Tvdb"):
|
|
value = str(provider_ids.get(provider, "")).strip().casefold()
|
|
if value:
|
|
return item_type.casefold(), f"{provider.casefold()}:{value}"
|
|
|
|
title = clean_media_text(str(item.get("Name", "")), 120) or ""
|
|
year = jellyfin_item_year(item) or ""
|
|
return item_type.casefold(), f"title:{title.casefold()}:{year}"
|
|
|
|
|
|
def dedupe_jellyfin_items(items: list[dict[str, Any]], item_type: str) -> list[dict[str, Any]]:
|
|
provider_deduped: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for item in items:
|
|
key = jellyfin_item_dedupe_key(item, item_type)
|
|
current = provider_deduped.get(key)
|
|
if current is None:
|
|
provider_deduped[key] = item
|
|
continue
|
|
current_overview = str(current.get("Overview", "") or current.get("ShortOverview", ""))
|
|
next_overview = str(item.get("Overview", "") or item.get("ShortOverview", ""))
|
|
if len(next_overview) > len(current_overview):
|
|
provider_deduped[key] = item
|
|
|
|
title_deduped: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for item in provider_deduped.values():
|
|
title = clean_media_text(str(item.get("Name", "")), 120) or ""
|
|
key = (title.casefold(), jellyfin_item_year(item) or "")
|
|
current = title_deduped.get(key)
|
|
if current is None:
|
|
title_deduped[key] = item
|
|
continue
|
|
current_has_provider = bool(current.get("ProviderIds"))
|
|
next_has_provider = bool(item.get("ProviderIds"))
|
|
if next_has_provider and not current_has_provider:
|
|
title_deduped[key] = item
|
|
continue
|
|
current_overview = str(current.get("Overview", "") or current.get("ShortOverview", ""))
|
|
next_overview = str(item.get("Overview", "") or item.get("ShortOverview", ""))
|
|
if len(next_overview) > len(current_overview):
|
|
title_deduped[key] = item
|
|
|
|
return sorted(title_deduped.values(), key=lambda item: (str(item.get("SortName", item.get("Name", ""))).casefold(), str(item.get("ProductionYear", ""))))
|
|
|
|
|
|
def fetch_jellyfin_library(runtime: BotRuntime) -> tuple[list[MediaItem], list[MediaItem]]:
|
|
state = load_state(runtime.settings_path)
|
|
settings = {
|
|
"url": str(state.get("jellyfin_url", "")).strip(),
|
|
"apiKey": str(state.get("jellyfin_api_key", "")).strip(),
|
|
}
|
|
library_names = jellyfin_settings(runtime).get("libraryNames", [])
|
|
if isinstance(library_names, list) and library_names:
|
|
settings["ParentIds"] = fetch_jellyfin_library_ids(settings, [str(name) for name in library_names])
|
|
movie_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Movie"), "Movie")
|
|
show_items = dedupe_jellyfin_items(fetch_jellyfin_items(settings, "Series"), "Series")
|
|
movies = [jellyfin_movie_from_item(item) for item in movie_items]
|
|
shows = [jellyfin_show_from_item(item) for item in show_items]
|
|
return (
|
|
sorted(movies, key=lambda item: (item.title.casefold(), item.year or "")),
|
|
sorted(shows, key=lambda item: (item.title.casefold(), item.year or "")),
|
|
)
|
|
|
|
|
|
def media_library_fingerprint(movies: list[MediaItem], shows: list[MediaItem]) -> str:
|
|
payload = media_library_to_jsonable(movies, shows)
|
|
body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
|
return hashlib.sha256(body).hexdigest()
|
|
|
|
|
|
def media_item_tracking_key(item: MediaItem) -> str:
|
|
return f"{item.media_type}:{item.title.casefold()}:{item.year or ''}"
|
|
|
|
|
|
def media_change_entry(item: MediaItem) -> dict[str, Any]:
|
|
return {
|
|
"title": item.title,
|
|
"year": item.year or "",
|
|
"mediaType": item.media_type,
|
|
}
|
|
|
|
|
|
def media_library_changes(
|
|
previous_movies: list[MediaItem],
|
|
previous_shows: list[MediaItem],
|
|
current_movies: list[MediaItem],
|
|
current_shows: list[MediaItem],
|
|
) -> dict[str, Any]:
|
|
previous = {media_item_tracking_key(item): item for item in [*previous_movies, *previous_shows]}
|
|
current = {media_item_tracking_key(item): item for item in [*current_movies, *current_shows]}
|
|
added_keys = sorted(set(current) - set(previous), key=lambda key: (current[key].media_type, current[key].title.casefold()))
|
|
removed_keys = sorted(set(previous) - set(current), key=lambda key: (previous[key].media_type, previous[key].title.casefold()))
|
|
added = [media_change_entry(current[key]) for key in added_keys]
|
|
removed = [media_change_entry(previous[key]) for key in removed_keys]
|
|
return {
|
|
"added": added,
|
|
"removed": removed,
|
|
"addedCount": len(added),
|
|
"removedCount": len(removed),
|
|
}
|
|
|
|
|
|
def update_jellyfin_sync_state(
|
|
runtime: BotRuntime,
|
|
*,
|
|
fingerprint: str | None = None,
|
|
error: str | None = None,
|
|
published: bool = False,
|
|
changes: dict[str, Any] | None = None,
|
|
) -> None:
|
|
state = load_state(runtime.settings_path)
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
state["jellyfin_last_sync_at"] = now
|
|
state["jellyfin_last_sync_error"] = error
|
|
if fingerprint is not None:
|
|
state["jellyfin_last_fingerprint"] = fingerprint
|
|
if published:
|
|
state["jellyfin_last_published_at"] = now
|
|
if fingerprint is not None:
|
|
state["jellyfin_last_published_fingerprint"] = fingerprint
|
|
if changes is not None:
|
|
state["jellyfin_last_changes"] = changes
|
|
save_state(runtime.settings_path, state)
|
|
|
|
|
|
def sync_jellyfin_library(runtime: BotRuntime, force_publish: bool = False) -> dict[str, Any]:
|
|
previous_movies, previous_shows = load_media_library(runtime)
|
|
movies, shows = fetch_jellyfin_library(runtime)
|
|
changes = media_library_changes(previous_movies, previous_shows, movies, shows)
|
|
fingerprint = media_library_fingerprint(movies, shows)
|
|
settings_state = load_state(runtime.settings_path)
|
|
changed = fingerprint != str(settings_state.get("jellyfin_last_fingerprint", ""))
|
|
publish_changed = fingerprint != str(settings_state.get("jellyfin_last_published_fingerprint", ""))
|
|
save_media_library(runtime, movies, shows)
|
|
|
|
published = False
|
|
result: dict[str, Any] = {}
|
|
if (publish_changed or force_publish) and not runtime.dry_run:
|
|
result = publish_media_items(
|
|
runtime=runtime,
|
|
channel_id=channel_settings(runtime)["mediaChannelId"],
|
|
movies_all=movies,
|
|
shows_all=shows,
|
|
)
|
|
published = True
|
|
|
|
update_jellyfin_sync_state(runtime, fingerprint=fingerprint, published=published, changes=changes)
|
|
return {
|
|
"changed": changed,
|
|
"published": published,
|
|
"changes": changes,
|
|
"movieCount": len(movies),
|
|
"showCount": len(shows),
|
|
"libraryNames": jellyfin_settings(runtime).get("libraryNames", []),
|
|
"library": media_library_to_jsonable(movies, shows),
|
|
"publishResult": result,
|
|
"jellyfin": jellyfin_settings(runtime),
|
|
}
|
|
|
|
|
|
def maybe_run_jellyfin_sync(runtime: BotRuntime) -> dict[str, Any] | None:
|
|
settings = jellyfin_settings(runtime)
|
|
if not settings["configured"] or not settings["autoSync"]:
|
|
return None
|
|
|
|
interval = int(os.getenv("JELLYFIN_SYNC_INTERVAL_SECONDS", str(DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS)))
|
|
state = load_state(runtime.settings_path)
|
|
last_sync = str(state.get("jellyfin_last_sync_at", "")).strip()
|
|
if last_sync:
|
|
try:
|
|
last = datetime.fromisoformat(last_sync)
|
|
if (datetime.now(timezone.utc) - last).total_seconds() < interval:
|
|
return None
|
|
except ValueError:
|
|
pass
|
|
|
|
try:
|
|
return sync_jellyfin_library(runtime)
|
|
except Exception as exc:
|
|
update_jellyfin_sync_state(runtime, error=str(exc)[:240])
|
|
raise
|
|
|
|
|
|
def media_item_heading(item: MediaItem) -> str:
|
|
if item.year:
|
|
return f"{item.title} ({item.year})"
|
|
return item.title
|
|
|
|
|
|
def media_item_value(item: MediaItem) -> str:
|
|
details: list[str] = []
|
|
if item.genres:
|
|
details.append(item.genres)
|
|
if item.rating:
|
|
details.append(item.rating)
|
|
if item.runtime and item.media_type == "movie":
|
|
details.append(item.runtime)
|
|
if item.media_type == "show":
|
|
counts = []
|
|
if item.seasons:
|
|
counts.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
counts.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
if counts:
|
|
details.append(" · ".join(counts))
|
|
|
|
lines = []
|
|
if details:
|
|
lines.append(" • ".join(details))
|
|
if item.summary:
|
|
lines.append(item.summary)
|
|
return "\n".join(lines)[:1024] or "No details provided."
|
|
|
|
|
|
def media_category_embeds(
|
|
title: str,
|
|
items: list[MediaItem],
|
|
color: int,
|
|
total_count: int,
|
|
source_name: str,
|
|
) -> list[dict[str, Any]]:
|
|
embeds: list[dict[str, Any]] = []
|
|
omitted = max(total_count - len(items), 0)
|
|
page_count = max((len(items) + MEDIA_ITEMS_PER_EMBED - 1) // MEDIA_ITEMS_PER_EMBED, 1)
|
|
|
|
for page_index in range(page_count):
|
|
start = page_index * MEDIA_ITEMS_PER_EMBED
|
|
chunk = items[start : start + MEDIA_ITEMS_PER_EMBED]
|
|
description = f"{total_count} total from `{source_name}`"
|
|
if omitted:
|
|
description += f" · showing first {len(items)}"
|
|
if page_count > 1:
|
|
description += f" · page {page_index + 1}/{page_count}"
|
|
|
|
fields = [
|
|
{
|
|
"name": media_item_heading(item)[:256],
|
|
"value": media_item_value(item),
|
|
"inline": False,
|
|
}
|
|
for item in chunk
|
|
]
|
|
embeds.append(
|
|
{
|
|
"title": title,
|
|
"description": description,
|
|
"color": color,
|
|
"fields": fields,
|
|
}
|
|
)
|
|
|
|
return embeds
|
|
|
|
|
|
def render_media_catalog_payloads(
|
|
movies: list[MediaItem],
|
|
shows: list[MediaItem],
|
|
movie_total: int,
|
|
show_total: int,
|
|
movie_source: str,
|
|
show_source: str,
|
|
) -> list[dict[str, Any]]:
|
|
now = datetime.now(timezone.utc)
|
|
embeds: list[dict[str, Any]] = [
|
|
{
|
|
"title": "The Mithral Archive Media Catalog",
|
|
"description": "Current movies and shows available in the archive.",
|
|
"color": 0x5865F2,
|
|
"fields": [
|
|
{"name": "Movies", "value": str(movie_total), "inline": True},
|
|
{"name": "Shows", "value": str(show_total), "inline": True},
|
|
{"name": "Updated", "value": now.strftime("%Y-%m-%d %H:%M UTC"), "inline": True},
|
|
],
|
|
"timestamp": now.isoformat(),
|
|
}
|
|
]
|
|
if movies:
|
|
embeds.extend(media_category_embeds("Movies", movies, 0xF59E0B, movie_total, movie_source))
|
|
if shows:
|
|
embeds.extend(media_category_embeds("Shows", shows, 0x10B981, show_total, show_source))
|
|
|
|
payloads = []
|
|
for index in range(0, len(embeds), MAX_DISCORD_EMBEDS):
|
|
payloads.append(
|
|
{
|
|
"content": "**The Mithral Archive Media Catalog**" if index == 0 else "",
|
|
"embeds": embeds[index : index + MAX_DISCORD_EMBEDS],
|
|
}
|
|
)
|
|
return payloads
|
|
|
|
|
|
def media_markdown_line(item: MediaItem) -> str:
|
|
title = media_item_heading(item)
|
|
details: list[str] = []
|
|
if item.genres:
|
|
details.append(item.genres)
|
|
if item.rating:
|
|
details.append(item.rating)
|
|
if item.runtime and item.media_type == "movie":
|
|
details.append(item.runtime)
|
|
if item.media_type == "show":
|
|
if item.seasons:
|
|
details.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
details.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
|
|
suffix = f" - {'; '.join(details)}" if details else ""
|
|
return f"- **{title}**{suffix}"
|
|
|
|
|
|
def render_media_catalog_markdown(movies: list[MediaItem], shows: list[MediaItem]) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
lines = [
|
|
"# The Mithral Archive Media Catalog",
|
|
"",
|
|
f"Updated: {now.strftime('%Y-%m-%d %H:%M UTC')}",
|
|
f"Movies: {len(movies)}",
|
|
f"Shows: {len(shows)}",
|
|
"",
|
|
]
|
|
|
|
if movies:
|
|
lines.extend(["## Movies", ""])
|
|
for item in movies:
|
|
lines.append(media_markdown_line(item))
|
|
if item.summary:
|
|
lines.append(f" - {item.summary}")
|
|
lines.append("")
|
|
|
|
if shows:
|
|
lines.extend(["## Shows", ""])
|
|
for item in shows:
|
|
lines.append(media_markdown_line(item))
|
|
if item.summary:
|
|
lines.append(f" - {item.summary}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines).strip() + "\n"
|
|
|
|
|
|
def render_catalog_html(runtime: BotRuntime) -> bytes:
|
|
movies, shows = load_media_library(runtime)
|
|
updated_at = load_state(runtime.media_library_path).get("updated_at")
|
|
updated = "Never"
|
|
if updated_at:
|
|
try:
|
|
updated = datetime.fromisoformat(str(updated_at)).strftime("%Y-%m-%d %H:%M UTC")
|
|
except ValueError:
|
|
updated = str(updated_at)
|
|
|
|
def item_block(item: MediaItem) -> str:
|
|
meta = []
|
|
if item.year:
|
|
meta.append(item.year)
|
|
if item.genres:
|
|
meta.append(item.genres)
|
|
if item.rating:
|
|
meta.append(item.rating)
|
|
if item.runtime:
|
|
meta.append(item.runtime)
|
|
if item.media_type == "show":
|
|
if item.seasons:
|
|
meta.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
meta.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
summary = f"<p>{html.escape(item.summary)}</p>" if item.summary else ""
|
|
return (
|
|
f'<article class="item" data-type="{html.escape(item.media_type)}" '
|
|
f'data-search="{html.escape((item.title + " " + (item.genres or "") + " " + (item.year or "")).casefold())}">'
|
|
f"<h2>{html.escape(item.title)}</h2>"
|
|
f'<div class="meta">{html.escape(" · ".join(meta))}</div>'
|
|
f"{summary}</article>"
|
|
)
|
|
|
|
items = "\n".join(item_block(item) for item in [*movies, *shows])
|
|
body = f"""<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>The Mithral Archive Media Catalog</title>
|
|
<style>
|
|
:root {{ color-scheme: dark; --bg: #111214; --panel: #1b1d21; --line: #30343b; --text: #f1f2f4; --muted: #a2a8b3; --action: #e7e9ed; --action-text: #15171a; }}
|
|
* {{ box-sizing: border-box; }}
|
|
body {{ margin: 0; background: var(--bg); color: var(--text); font: 14px/1.45 ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", sans-serif; }}
|
|
header {{ position: sticky; top: 0; z-index: 1; background: #15171a; border-bottom: 1px solid var(--line); padding: 14px 18px; }}
|
|
h1 {{ margin: 0 0 10px; font-size: 20px; letter-spacing: 0; }}
|
|
.controls {{ display: grid; grid-template-columns: minmax(180px, 1fr) auto auto auto; gap: 8px; align-items: center; }}
|
|
input, button {{ border: 1px solid var(--line); border-radius: 6px; background: #14161a; color: var(--text); padding: 8px 9px; font: inherit; }}
|
|
button.active {{ background: var(--action); color: var(--action-text); border-color: var(--action); }}
|
|
main {{ padding: 18px; display: grid; gap: 1px; background: var(--line); }}
|
|
.item {{ background: var(--panel); padding: 13px 14px; }}
|
|
.item[hidden] {{ display: none; }}
|
|
h2 {{ margin: 0 0 4px; font-size: 16px; }}
|
|
.meta {{ color: var(--muted); }}
|
|
p {{ margin: 8px 0 0; color: #d5d8de; }}
|
|
.stats {{ color: var(--muted); margin-bottom: 8px; }}
|
|
@media (max-width: 720px) {{ .controls {{ grid-template-columns: 1fr 1fr; }} .controls input {{ grid-column: 1 / -1; }} }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<header>
|
|
<h1>The Mithral Archive Media Catalog</h1>
|
|
<div class="stats">{len(movies)} movies · {len(shows)} shows · Updated {html.escape(updated)}</div>
|
|
<div class="controls">
|
|
<input id="search" type="search" placeholder="Search title, genre, or year" autocomplete="off">
|
|
<button class="active" type="button" data-filter="all">All</button>
|
|
<button type="button" data-filter="movie">Movies</button>
|
|
<button type="button" data-filter="show">Shows</button>
|
|
</div>
|
|
</header>
|
|
<main id="items">{items}</main>
|
|
<script>
|
|
const search = document.querySelector("#search");
|
|
const buttons = document.querySelectorAll("[data-filter]");
|
|
let filter = "all";
|
|
function applyFilter() {{
|
|
const query = search.value.trim().toLowerCase();
|
|
document.querySelectorAll(".item").forEach((item) => {{
|
|
const typeMatch = filter === "all" || item.dataset.type === filter;
|
|
const searchMatch = !query || item.dataset.search.includes(query);
|
|
item.hidden = !(typeMatch && searchMatch);
|
|
}});
|
|
}}
|
|
search.addEventListener("input", applyFilter);
|
|
buttons.forEach((button) => button.addEventListener("click", () => {{
|
|
filter = button.dataset.filter;
|
|
buttons.forEach((item) => item.classList.toggle("active", item === button));
|
|
applyFilter();
|
|
}}));
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
return body.encode("utf-8")
|
|
|
|
|
|
def publish_media_markdown_message(
|
|
token: str,
|
|
channel_id: str,
|
|
movies: list[MediaItem],
|
|
shows: list[MediaItem],
|
|
catalog_url: str = "",
|
|
) -> str:
|
|
markdown = render_media_catalog_markdown(movies, shows)
|
|
payload = {
|
|
"content": (
|
|
"**The Mithral Archive Media Catalog**\n"
|
|
f"{len(movies)} movies · {len(shows)} shows\n"
|
|
"Attached as a compact Markdown list."
|
|
),
|
|
"allowed_mentions": {"parse": []},
|
|
}
|
|
if catalog_url:
|
|
payload["components"] = [
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 2,
|
|
"style": 5,
|
|
"label": "Open Catalog",
|
|
"url": catalog_url,
|
|
}
|
|
],
|
|
}
|
|
]
|
|
message = discord_multipart_request(
|
|
"POST",
|
|
token,
|
|
f"/channels/{channel_id}/messages",
|
|
payload,
|
|
[("files[0]", "media-catalog.md", "text/markdown; charset=utf-8", markdown.encode("utf-8"))],
|
|
)
|
|
message_id = str(message.get("id", "")).strip()
|
|
if not message_id:
|
|
raise RuntimeError("Discord did not return a message id for the media catalog")
|
|
return message_id
|
|
|
|
|
|
def publish_media_items(
|
|
runtime: BotRuntime,
|
|
channel_id: str,
|
|
movies_all: list[MediaItem],
|
|
shows_all: list[MediaItem],
|
|
movie_source: str = "Dashboard library",
|
|
show_source: str = "Dashboard library",
|
|
) -> dict[str, Any]:
|
|
if runtime.dry_run:
|
|
raise RuntimeError("Discord dry run is enabled; media catalog was parsed but not sent")
|
|
|
|
settings = channel_settings(runtime)
|
|
channel = validate_channel_id(channel_id.strip() or settings["mediaChannelId"], "Media")
|
|
|
|
if not movies_all and not shows_all:
|
|
raise ValueError("Add at least one movie or show before publishing")
|
|
|
|
state = load_state(runtime.media_state_path)
|
|
old_channel = str(state.get("channel_id", "")).strip()
|
|
existing_ids = [
|
|
str(message_id)
|
|
for message_id in state.get("message_ids", [])
|
|
if str(message_id).strip()
|
|
]
|
|
delete_channel = old_channel or channel
|
|
for old_id in existing_ids:
|
|
discord_delete_message(runtime.token, delete_channel, old_id)
|
|
|
|
message_id = publish_media_markdown_message(
|
|
runtime.token,
|
|
channel,
|
|
movies_all,
|
|
shows_all,
|
|
catalog_url=settings.get("catalogUrl", ""),
|
|
)
|
|
|
|
save_state(
|
|
runtime.media_state_path,
|
|
{
|
|
"channel_id": channel,
|
|
"message_ids": [message_id],
|
|
"movie_count": len(movies_all),
|
|
"show_count": len(shows_all),
|
|
"format": "markdown",
|
|
"published_at": datetime.now(timezone.utc).isoformat(),
|
|
},
|
|
)
|
|
save_media_channel_setting(runtime, channel)
|
|
|
|
return {
|
|
"channelId": channel,
|
|
"messageIds": [message_id],
|
|
"movieCount": len(movies_all),
|
|
"showCount": len(shows_all),
|
|
"displayedMovieCount": len(movies_all),
|
|
"displayedShowCount": len(shows_all),
|
|
"format": "markdown",
|
|
}
|
|
|
|
|
|
def media_catalog_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
state = load_state(runtime.media_state_path)
|
|
settings = channel_settings(runtime)
|
|
movies, shows = load_media_library(runtime)
|
|
return {
|
|
"channelId": str(state.get("channel_id", "")).strip() or settings["mediaChannelId"],
|
|
"channels": settings,
|
|
"messageIds": state.get("message_ids", []) if isinstance(state.get("message_ids"), list) else [],
|
|
"movieCount": state.get("movie_count"),
|
|
"showCount": state.get("show_count"),
|
|
"format": state.get("format", "markdown"),
|
|
"publishedAt": state.get("published_at"),
|
|
"changes": jellyfin_settings(runtime).get("lastChanges"),
|
|
"library": media_library_to_jsonable(movies, shows),
|
|
}
|
|
|
|
|
|
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
|
|
checked_at = datetime.now(timezone.utc)
|
|
online = sum(1 for result in results if result.ok)
|
|
total = len(results)
|
|
degraded = 0 < online < total
|
|
|
|
if total == 0:
|
|
color = 0x6B7280
|
|
summary = "No services configured."
|
|
elif online == total:
|
|
color = 0x10B981
|
|
summary = f"🟢 Operational · {online}/{total} online"
|
|
elif degraded:
|
|
color = 0xF59E0B
|
|
offline = total - online
|
|
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
|
|
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
|
|
else:
|
|
color = 0xEF4444
|
|
summary = f"🔴 Outage · {online}/{total} online"
|
|
|
|
groups: dict[str, list[CheckResult]] = {}
|
|
for result in results:
|
|
groups.setdefault(result.service.group, []).append(result)
|
|
|
|
fields = []
|
|
for group_name, group_results in groups.items():
|
|
service_lines = []
|
|
state_lines = []
|
|
for result in group_results:
|
|
icon = "🟢" if result.ok else "🔴"
|
|
label = "Online" if result.ok else "Issue"
|
|
service_lines.append(f"**{result.service.name}**")
|
|
state_lines.append(f"{icon} {label}")
|
|
|
|
fields.append({
|
|
"name": group_name,
|
|
"value": "\n".join(service_lines)[:1024] or "None",
|
|
"inline": True,
|
|
})
|
|
fields.append({
|
|
"name": "Status",
|
|
"value": "\n".join(state_lines)[:1024] or "None",
|
|
"inline": True,
|
|
})
|
|
fields.append({
|
|
"name": "\u200b",
|
|
"value": "\u200b",
|
|
"inline": False,
|
|
})
|
|
|
|
if fields and fields[-1]["name"] == "\u200b":
|
|
fields.pop()
|
|
|
|
interval = os.getenv("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
|
|
return [
|
|
{
|
|
"title": "The Mithral Archive",
|
|
"description": summary,
|
|
"color": color,
|
|
"fields": fields[:25],
|
|
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
|
|
"timestamp": checked_at.isoformat(),
|
|
}
|
|
]
|
|
|
|
|
|
def upsert_status_message(
|
|
token: str,
|
|
channel_id: str,
|
|
state_path: Path,
|
|
results: list[CheckResult],
|
|
) -> str:
|
|
state = load_state(state_path)
|
|
message_id = str(state.get("message_id", "")).strip()
|
|
payload = {"content": "", "embeds": render_embeds(results)}
|
|
|
|
if message_id:
|
|
try:
|
|
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
|
|
return message_id
|
|
except RuntimeError as exc:
|
|
print(f"Could not edit existing status message, creating a new one: {exc}", file=sys.stderr)
|
|
|
|
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
|
|
new_id = str(message.get("id", "")).strip()
|
|
if not new_id:
|
|
raise RuntimeError("Discord did not return a message id")
|
|
|
|
save_state(state_path, {"message_id": new_id})
|
|
return new_id
|
|
|
|
|
|
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
|
|
results: list[CheckResult] = []
|
|
for index, service in enumerate(services):
|
|
results.append(
|
|
CheckResult(
|
|
service=service,
|
|
ok=index != len(services) - 1,
|
|
status=200 if index != len(services) - 1 else 502,
|
|
latency_ms=42 + (index * 31),
|
|
error=None if index != len(services) - 1 else "HTTP 502",
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
def print_preview(services: list[Service]) -> None:
|
|
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
|
|
print(json.dumps(payload, indent=2))
|
|
|
|
|
|
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
|
|
return {
|
|
"name": result.service.name,
|
|
"group": result.service.group,
|
|
"url": result.service.url,
|
|
"displayUrl": result.service.display_url,
|
|
"ok": result.ok,
|
|
"status": result.status,
|
|
"latencyMs": result.latency_ms,
|
|
"error": result.error,
|
|
}
|
|
|
|
|
|
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
|
|
services = load_services(runtime.config_path)
|
|
results = [check_service(service) for service in services]
|
|
if runtime.dry_run:
|
|
message_id = "dry-run"
|
|
else:
|
|
channel_id = validate_channel_id(channel_settings(runtime)["statusChannelId"], "Status")
|
|
message_id = upsert_status_message(runtime.token, channel_id, runtime.state_path, results)
|
|
|
|
with runtime.lock:
|
|
runtime.last_results = results
|
|
runtime.last_message_id = message_id
|
|
runtime.last_checked_at = datetime.now(timezone.utc)
|
|
runtime.last_error = None
|
|
|
|
return message_id, results
|
|
|
|
|
|
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
services = load_services(runtime.config_path)
|
|
settings = channel_settings(runtime)
|
|
with runtime.lock:
|
|
results = list(runtime.last_results)
|
|
return {
|
|
"services": services_to_jsonable(services),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
"lastError": runtime.last_error,
|
|
"lastMessageId": runtime.last_message_id,
|
|
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
|
|
"channelId": settings["statusChannelId"],
|
|
"channels": settings,
|
|
}
|
|
|
|
|
|
def bool_env(name: str, default: bool = False) -> bool:
|
|
raw = os.getenv(name)
|
|
if raw is None:
|
|
return default
|
|
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def password_hash(password: str) -> str:
|
|
salt = secrets.token_bytes(16)
|
|
digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS)
|
|
salt_text = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=")
|
|
digest_text = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
|
|
return f"pbkdf2_sha256${PBKDF2_ITERATIONS}${salt_text}${digest_text}"
|
|
|
|
|
|
def decode_urlsafe_base64(value: str) -> bytes:
|
|
padding = "=" * (-len(value) % 4)
|
|
return base64.urlsafe_b64decode(value + padding)
|
|
|
|
|
|
def verify_password_hash(encoded: str, password: str) -> bool:
|
|
try:
|
|
algorithm, iterations, salt_text, digest_text = encoded.split("$", 3)
|
|
if algorithm != "pbkdf2_sha256":
|
|
return False
|
|
salt = decode_urlsafe_base64(salt_text)
|
|
expected = decode_urlsafe_base64(digest_text)
|
|
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(iterations))
|
|
except (ValueError, TypeError):
|
|
return False
|
|
return hmac.compare_digest(actual, expected)
|
|
|
|
|
|
def dashboard_auth_from_env() -> DashboardAuth | None:
|
|
if bool_env("DASHBOARD_AUTH_DISABLED", False):
|
|
return None
|
|
|
|
username = os.getenv("DASHBOARD_USERNAME", "").strip()
|
|
encoded_hash = os.getenv("DASHBOARD_PASSWORD_HASH", "").strip()
|
|
if not username or not encoded_hash:
|
|
return None
|
|
|
|
ttl = int(os.getenv("DASHBOARD_SESSION_TTL_SECONDS", "28800"))
|
|
secure = bool_env("DASHBOARD_COOKIE_SECURE", False)
|
|
return DashboardAuth(
|
|
DashboardAuthConfig(
|
|
username=username,
|
|
password_hash=encoded_hash,
|
|
session_ttl_seconds=ttl,
|
|
cookie_secure=secure,
|
|
)
|
|
)
|
|
|
|
|
|
def print_password_hash() -> None:
|
|
import getpass
|
|
|
|
first = getpass.getpass("Dashboard password: ")
|
|
second = getpass.getpass("Confirm password: ")
|
|
if first != second:
|
|
raise SystemExit("Passwords did not match")
|
|
if len(first) < 12:
|
|
raise SystemExit("Use at least 12 characters")
|
|
print(password_hash(first))
|
|
|
|
|
|
def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]:
|
|
dashboard_path = Path(__file__).with_name("dashboard.html")
|
|
|
|
class DashboardHandler(BaseHTTPRequestHandler):
|
|
server_version = "ArchiveStatusDashboard/1.0"
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
print(f"[dashboard] {self.address_string()} - {format % args}", flush=True)
|
|
|
|
def do_GET(self) -> None:
|
|
path = urllib.parse.urlparse(self.path).path
|
|
if path in {"/", "/dashboard"}:
|
|
self.send_dashboard()
|
|
return
|
|
if path == "/catalog":
|
|
self.send_catalog()
|
|
return
|
|
if path == "/favicon.ico":
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
return
|
|
if path == "/api/session":
|
|
session = self.require_auth()
|
|
if session is None:
|
|
return
|
|
_session_id, data = session
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"username": data.username,
|
|
"csrfToken": data.csrf_token,
|
|
},
|
|
)
|
|
return
|
|
if path == "/api/status":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, runtime_status(runtime))
|
|
return
|
|
if path == "/api/media":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, media_catalog_status(runtime))
|
|
return
|
|
if path == "/api/settings":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, {"channels": channel_settings(runtime)})
|
|
return
|
|
if path == "/api/jellyfin":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, {"jellyfin": jellyfin_settings(runtime)})
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def do_POST(self) -> None:
|
|
path = urllib.parse.urlparse(self.path).path
|
|
if path == "/api/login":
|
|
self.handle_login()
|
|
return
|
|
session = self.require_auth(require_csrf=True)
|
|
if session is None:
|
|
return
|
|
if path == "/api/logout":
|
|
self.handle_logout(session[0])
|
|
return
|
|
if path == "/api/check":
|
|
self.handle_check()
|
|
return
|
|
if path == "/api/services":
|
|
self.handle_services()
|
|
return
|
|
if path == "/api/media":
|
|
self.handle_media_catalog()
|
|
return
|
|
if path == "/api/media/library":
|
|
self.handle_media_library()
|
|
return
|
|
if path == "/api/media/reset":
|
|
self.handle_media_reset()
|
|
return
|
|
if path == "/api/settings":
|
|
self.handle_settings()
|
|
return
|
|
if path == "/api/jellyfin/settings":
|
|
self.handle_jellyfin_settings()
|
|
return
|
|
if path == "/api/jellyfin/sync":
|
|
self.handle_jellyfin_sync()
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None:
|
|
if auth is None:
|
|
return "disabled", DashboardSession("local", "disabled", time.time() + 3600)
|
|
|
|
session = auth.session_from_cookie(self.headers.get("Cookie"))
|
|
if session is None:
|
|
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"})
|
|
return None
|
|
|
|
if require_csrf:
|
|
csrf = self.headers.get("X-CSRF-Token", "")
|
|
if not hmac.compare_digest(csrf, session[1].csrf_token):
|
|
self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"})
|
|
return None
|
|
|
|
return session
|
|
|
|
def cookie_attributes(self, max_age: int) -> str:
|
|
attrs = [
|
|
"Path=/",
|
|
"HttpOnly",
|
|
"SameSite=Strict",
|
|
f"Max-Age={max_age}",
|
|
]
|
|
if auth is not None and auth.config.cookie_secure:
|
|
attrs.append("Secure")
|
|
return "; ".join(attrs)
|
|
|
|
def set_session_cookie(self, session_id: str) -> None:
|
|
ttl = auth.config.session_ttl_seconds if auth is not None else 3600
|
|
self.send_header(
|
|
"Set-Cookie",
|
|
f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}",
|
|
)
|
|
|
|
def clear_session_cookie(self) -> None:
|
|
self.send_header(
|
|
"Set-Cookie",
|
|
f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}",
|
|
)
|
|
|
|
def send_dashboard(self) -> None:
|
|
try:
|
|
body = dashboard_path.read_bytes()
|
|
except FileNotFoundError:
|
|
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing")
|
|
return
|
|
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def send_catalog(self) -> None:
|
|
body = render_catalog_html(runtime)
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def read_json(self) -> dict[str, Any]:
|
|
raw_length = self.headers.get("Content-Length", "0")
|
|
try:
|
|
length = int(raw_length)
|
|
except ValueError as exc:
|
|
raise ValueError("Invalid Content-Length") from exc
|
|
if length > MAX_REQUEST_BYTES:
|
|
raise ValueError("Request body is too large")
|
|
|
|
body = self.rfile.read(length)
|
|
try:
|
|
data = json.loads(body.decode("utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON: {exc}") from exc
|
|
if not isinstance(data, dict):
|
|
raise ValueError("JSON body must be an object")
|
|
return data
|
|
|
|
def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_login(self) -> None:
|
|
if auth is None:
|
|
self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"})
|
|
return
|
|
|
|
try:
|
|
data = self.read_json()
|
|
except ValueError as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
username = str(data.get("username", ""))
|
|
password = str(data.get("password", ""))
|
|
throttle_key = f"{self.client_address[0]}:{username}"
|
|
if not auth.login_allowed(throttle_key):
|
|
self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."})
|
|
return
|
|
|
|
login = auth.login(username, password)
|
|
if login is None:
|
|
auth.record_failed_login(throttle_key)
|
|
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"})
|
|
return
|
|
|
|
auth.clear_failed_login(throttle_key)
|
|
session_id, session = login
|
|
payload = {
|
|
"username": session.username,
|
|
"csrfToken": session.csrf_token,
|
|
}
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(HTTPStatus.OK)
|
|
self.set_session_cookie(session_id)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_logout(self, session_id: str) -> None:
|
|
if auth is not None:
|
|
auth.logout(session_id)
|
|
body = b'{\n "ok": true\n}'
|
|
self.send_response(HTTPStatus.OK)
|
|
self.clear_session_cookie()
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_check(self) -> None:
|
|
try:
|
|
message_id, results = run_check_cycle(runtime)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"messageId": message_id,
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
},
|
|
)
|
|
|
|
def handle_services(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
services_from_data(data)
|
|
save_services_config(runtime.config_path, data)
|
|
message_id, results = run_check_cycle(runtime)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"messageId": message_id,
|
|
"services": data.get("services", []),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
},
|
|
)
|
|
|
|
def handle_media_catalog(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
if "movies" in data or "shows" in data:
|
|
if "catalogUrl" in data:
|
|
save_catalog_url_setting(runtime, str(data.get("catalogUrl", "")))
|
|
movies = media_items_from_data(data.get("movies", []), "movie")
|
|
shows = media_items_from_data(data.get("shows", []), "show")
|
|
save_media_library(runtime, movies, shows)
|
|
result = publish_media_items(
|
|
runtime=runtime,
|
|
channel_id=str(data.get("channelId", "")),
|
|
movies_all=movies,
|
|
shows_all=shows,
|
|
)
|
|
self.send_json(HTTPStatus.OK, result)
|
|
return
|
|
|
|
raise ValueError("Publish requires the saved Jellyfin media library")
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, result)
|
|
|
|
def handle_media_library(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
movies = media_items_from_data(data.get("movies", []), "movie")
|
|
shows = media_items_from_data(data.get("shows", []), "show")
|
|
saved = save_media_library(runtime, movies, shows)
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"library": {
|
|
"movies": saved.get("movies", []),
|
|
"shows": saved.get("shows", []),
|
|
},
|
|
"movieCount": len(movies),
|
|
"showCount": len(shows),
|
|
},
|
|
)
|
|
|
|
def handle_media_reset(self) -> None:
|
|
try:
|
|
result = reset_media_library(runtime)
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, result)
|
|
|
|
def handle_settings(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
channels = data.get("channels", data)
|
|
if not isinstance(channels, dict):
|
|
raise ValueError("Settings payload must include a channels object")
|
|
result = save_channel_settings(
|
|
runtime,
|
|
str(channels.get("statusChannelId", "")),
|
|
str(channels.get("mediaChannelId", "")),
|
|
str(channels.get("catalogUrl", "")),
|
|
)
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, {"channels": result})
|
|
|
|
def handle_jellyfin_settings(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
result = save_jellyfin_settings(runtime, data)
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, {"jellyfin": result})
|
|
|
|
def handle_jellyfin_sync(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
if data:
|
|
save_jellyfin_settings(runtime, data)
|
|
result = sync_jellyfin_library(runtime, force_publish=bool(data.get("forcePublish", False)))
|
|
except Exception as exc:
|
|
update_jellyfin_sync_state(runtime, error=str(exc)[:240])
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc), "jellyfin": jellyfin_settings(runtime)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, result)
|
|
|
|
return DashboardHandler
|
|
|
|
|
|
def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None:
|
|
if not bool_env("DASHBOARD_ENABLED", False):
|
|
return None
|
|
|
|
host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1"
|
|
port = int(os.getenv("DASHBOARD_PORT", "8787"))
|
|
auth = dashboard_auth_from_env()
|
|
server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth))
|
|
thread = threading.Thread(target=server.serve_forever, name="dashboard", daemon=True)
|
|
thread.start()
|
|
auth_note = "without auth" if auth is None else "with password sessions"
|
|
print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True)
|
|
return server
|
|
|
|
|
|
def main() -> int:
|
|
load_dotenv()
|
|
|
|
if "--hash-password" in sys.argv:
|
|
print_password_hash()
|
|
return 0
|
|
|
|
if "--preview" in sys.argv:
|
|
config_path = Path(os.getenv("ARCHIVE_STATUS_CONFIG", "services.json"))
|
|
print_preview(load_services(config_path))
|
|
return 0
|
|
|
|
token = env("DISCORD_BOT_TOKEN")
|
|
token = normalize_discord_token(token)
|
|
channel_id = os.getenv("DISCORD_CHANNEL_ID", "").strip()
|
|
config_path = Path(env("ARCHIVE_STATUS_CONFIG", "services.json"))
|
|
state_path = Path(env("ARCHIVE_STATUS_STATE", "state/status-message.json"))
|
|
media_state_path = Path(env("MEDIA_CATALOG_STATE", "state/media-catalog.json"))
|
|
media_library_path = Path(env("MEDIA_LIBRARY_STATE", "state/media-library.json"))
|
|
settings_path = Path(env("BOT_SETTINGS_STATE", "state/bot-settings.json"))
|
|
interval = int(env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)))
|
|
runtime = BotRuntime(token, channel_id, config_path, state_path, media_state_path, media_library_path, settings_path, dry_run=bool_env("DISCORD_DRY_RUN", False))
|
|
gateway = None
|
|
if bool_env("DISCORD_GATEWAY_ENABLED", True) and not runtime.dry_run:
|
|
gateway = DiscordGatewayManager(token)
|
|
gateway.start()
|
|
dashboard = maybe_start_dashboard(runtime)
|
|
|
|
if runtime.dry_run:
|
|
print("Discord dry run is enabled; no Discord messages will be sent or edited.", flush=True)
|
|
else:
|
|
try:
|
|
identity = discord_bot_identity(token)
|
|
username = identity.get("username", "unknown")
|
|
bot_id = identity.get("id", "unknown")
|
|
print(f"Discord token authenticated as {username} ({bot_id})", flush=True)
|
|
except Exception as exc:
|
|
print(f"Could not verify Discord bot identity: {exc}", file=sys.stderr, flush=True)
|
|
|
|
stopped = False
|
|
|
|
def stop(_signum: int, _frame: Any) -> None:
|
|
nonlocal stopped
|
|
stopped = True
|
|
|
|
signal.signal(signal.SIGINT, stop)
|
|
signal.signal(signal.SIGTERM, stop)
|
|
|
|
while not stopped:
|
|
try:
|
|
message_id, results = run_check_cycle(runtime)
|
|
online = sum(1 for result in results if result.ok)
|
|
print(f"Updated Discord status message {message_id}: {online}/{len(results)} online", flush=True)
|
|
sync_result = maybe_run_jellyfin_sync(runtime)
|
|
if sync_result is not None:
|
|
action = "published" if sync_result["published"] else "checked"
|
|
print(
|
|
f"Jellyfin sync {action}: {sync_result['movieCount']} movies, {sync_result['showCount']} shows",
|
|
flush=True,
|
|
)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
print(f"Status update failed: {exc}", file=sys.stderr, flush=True)
|
|
|
|
for _ in range(interval):
|
|
if stopped:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if dashboard is not None:
|
|
dashboard.shutdown()
|
|
if gateway is not None:
|
|
gateway.stop()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|