1564 lines
54 KiB
Python
1564 lines
54 KiB
Python
#!/usr/bin/env python3
|
|
"""Live Discord status message for The Mithral Archive."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import asyncio
|
|
import csv
|
|
import hashlib
|
|
import hmac
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import secrets
|
|
import signal
|
|
import socket
|
|
import ssl
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from http.cookies import SimpleCookie
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DISCORD_API = "https://discord.com/api/v10"
|
|
DEFAULT_INTERVAL_SECONDS = 60
|
|
DEFAULT_TIMEOUT_SECONDS = 10
|
|
MAX_DISCORD_EMBEDS = 10
|
|
MAX_REQUEST_BYTES = 8_000_000
|
|
SESSION_COOKIE = "archive_bot_session"
|
|
PBKDF2_ITERATIONS = 390_000
|
|
MEDIA_ITEMS_PER_EMBED = 10
|
|
MAX_MEDIA_ITEMS_PER_CATEGORY = 240
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Service:
|
|
name: str
|
|
group: str
|
|
url: str
|
|
display_url: str
|
|
method: str
|
|
timeout: float
|
|
expected_statuses: set[int]
|
|
expected_min: int
|
|
expected_max: int
|
|
keyword: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CheckResult:
|
|
service: Service
|
|
ok: bool
|
|
status: int | None
|
|
latency_ms: int | None
|
|
error: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MediaItem:
|
|
title: str
|
|
media_type: str
|
|
year: str | None = None
|
|
genres: str | None = None
|
|
rating: str | None = None
|
|
runtime: str | None = None
|
|
summary: str | None = None
|
|
seasons: int | None = None
|
|
episodes: int | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DashboardAuthConfig:
|
|
username: str
|
|
password_hash: str
|
|
session_ttl_seconds: int
|
|
cookie_secure: bool
|
|
|
|
|
|
@dataclass
|
|
class DashboardSession:
|
|
username: str
|
|
csrf_token: str
|
|
expires_at: float
|
|
|
|
|
|
class DashboardAuth:
|
|
def __init__(self, config: DashboardAuthConfig) -> None:
|
|
self.config = config
|
|
self.lock = threading.Lock()
|
|
self.sessions: dict[str, DashboardSession] = {}
|
|
self.failed_logins: dict[str, list[float]] = {}
|
|
|
|
def login_allowed(self, key: str) -> bool:
|
|
now = time.time()
|
|
window_start = now - 900
|
|
with self.lock:
|
|
attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start]
|
|
self.failed_logins[key] = attempts
|
|
return len(attempts) < 10
|
|
|
|
def record_failed_login(self, key: str) -> None:
|
|
now = time.time()
|
|
with self.lock:
|
|
self.failed_logins.setdefault(key, []).append(now)
|
|
|
|
def clear_failed_login(self, key: str) -> None:
|
|
with self.lock:
|
|
self.failed_logins.pop(key, None)
|
|
|
|
def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None:
|
|
if not hmac.compare_digest(username, self.config.username):
|
|
return None
|
|
if not verify_password_hash(self.config.password_hash, password):
|
|
return None
|
|
|
|
session_id = secrets.token_urlsafe(32)
|
|
session = DashboardSession(
|
|
username=username,
|
|
csrf_token=secrets.token_urlsafe(32),
|
|
expires_at=time.time() + self.config.session_ttl_seconds,
|
|
)
|
|
with self.lock:
|
|
self.sessions[session_id] = session
|
|
return session_id, session
|
|
|
|
def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None:
|
|
if not cookie_header:
|
|
return None
|
|
cookie = SimpleCookie()
|
|
cookie.load(cookie_header)
|
|
morsel = cookie.get(SESSION_COOKIE)
|
|
if morsel is None:
|
|
return None
|
|
|
|
session_id = morsel.value
|
|
now = time.time()
|
|
with self.lock:
|
|
session = self.sessions.get(session_id)
|
|
if session is None:
|
|
return None
|
|
if session.expires_at <= now:
|
|
self.sessions.pop(session_id, None)
|
|
return None
|
|
session.expires_at = now + self.config.session_ttl_seconds
|
|
return session_id, session
|
|
|
|
def logout(self, session_id: str) -> None:
|
|
with self.lock:
|
|
self.sessions.pop(session_id, None)
|
|
|
|
|
|
class BotRuntime:
|
|
def __init__(
|
|
self,
|
|
token: str,
|
|
channel_id: str,
|
|
config_path: Path,
|
|
state_path: Path,
|
|
media_state_path: Path,
|
|
settings_path: Path,
|
|
dry_run: bool = False,
|
|
) -> None:
|
|
self.token = token
|
|
self.default_channel_id = channel_id
|
|
self.config_path = config_path
|
|
self.state_path = state_path
|
|
self.media_state_path = media_state_path
|
|
self.settings_path = settings_path
|
|
self.dry_run = dry_run
|
|
self.lock = threading.Lock()
|
|
self.last_results: list[CheckResult] = []
|
|
self.last_error: str | None = None
|
|
self.last_message_id: str | None = None
|
|
self.last_checked_at: datetime | None = None
|
|
|
|
|
|
class DiscordGatewayManager:
|
|
def __init__(self, token: str) -> None:
|
|
self.token = token
|
|
self.thread: threading.Thread | None = None
|
|
self.loop: asyncio.AbstractEventLoop | None = None
|
|
self.client: Any = None
|
|
self.ready = threading.Event()
|
|
self._disconnecting = threading.Event()
|
|
|
|
def start(self) -> None:
|
|
if self.thread is not None:
|
|
return
|
|
self.thread = threading.Thread(target=self._run, name="discord-gateway", daemon=True)
|
|
self.thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self._disconnecting.set()
|
|
if self.loop is not None and self.client is not None:
|
|
asyncio.run_coroutine_threadsafe(self.client.close(), self.loop)
|
|
if self.thread is not None:
|
|
self.thread.join(timeout=15)
|
|
|
|
def _run(self) -> None:
|
|
try:
|
|
import discord
|
|
except ImportError:
|
|
print("discord.py is not installed. Install requirements.txt to keep the bot online.", file=sys.stderr, flush=True)
|
|
self.ready.set()
|
|
return
|
|
|
|
class GatewayClient(discord.Client):
|
|
def __init__(self, manager: DiscordGatewayManager) -> None:
|
|
intents = discord.Intents.default()
|
|
intents.guilds = True
|
|
super().__init__(intents=intents)
|
|
self.manager = manager
|
|
|
|
async def on_ready(self) -> None:
|
|
await self.change_presence(status=discord.Status.online)
|
|
user = self.user
|
|
name = user.name if user is not None else "unknown"
|
|
bot_id = user.id if user is not None else "unknown"
|
|
print(f"Discord gateway connected as {name} ({bot_id})", flush=True)
|
|
self.manager.ready.set()
|
|
|
|
async def on_disconnect(self) -> None:
|
|
self.manager.ready.clear()
|
|
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
self.client = GatewayClient(self)
|
|
|
|
try:
|
|
self.loop.run_until_complete(self.client.start(self.token))
|
|
except Exception as exc:
|
|
if not self._disconnecting.is_set():
|
|
print(f"Discord gateway stopped: {exc}", file=sys.stderr, flush=True)
|
|
finally:
|
|
self.ready.set()
|
|
try:
|
|
pending = asyncio.all_tasks(self.loop)
|
|
for task in pending:
|
|
task.cancel()
|
|
if pending:
|
|
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
|
finally:
|
|
self.loop.close()
|
|
|
|
|
|
def env(name: str, default: str | None = None) -> str:
|
|
value = os.getenv(name, default)
|
|
if value is None or not value.strip():
|
|
raise SystemExit(f"Missing required environment variable: {name}")
|
|
return value.strip()
|
|
|
|
|
|
def normalize_discord_token(token: str) -> str:
|
|
cleaned = token.strip().strip("\"'")
|
|
if cleaned.lower().startswith("bot "):
|
|
cleaned = cleaned[4:].strip()
|
|
return cleaned
|
|
|
|
|
|
def load_dotenv(path: Path = Path(".env")) -> None:
|
|
if not path.exists():
|
|
return
|
|
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
|
continue
|
|
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip("\"'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except FileNotFoundError as exc:
|
|
raise ValueError(f"Config file not found: {path}") from exc
|
|
except PermissionError as exc:
|
|
raise ValueError(f"Config file is not readable: {path}") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
|
|
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"Config must be a JSON object: {path}")
|
|
return data
|
|
|
|
|
|
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
|
|
if raw is None:
|
|
return set(), 200, 399
|
|
|
|
if isinstance(raw, str):
|
|
raw = [part.strip() for part in raw.split(",") if part.strip()]
|
|
|
|
if not isinstance(raw, list):
|
|
raise ValueError("expectedStatuses must be a list or comma-separated string")
|
|
|
|
exact: set[int] = set()
|
|
min_status = 999
|
|
max_status = 0
|
|
|
|
for item in raw:
|
|
if isinstance(item, int):
|
|
exact.add(item)
|
|
continue
|
|
if not isinstance(item, str):
|
|
raise ValueError("expectedStatuses entries must be integers or ranges")
|
|
if "-" in item:
|
|
left, right = item.split("-", 1)
|
|
try:
|
|
min_status = min(min_status, int(left))
|
|
max_status = max(max_status, int(right))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status range: {item}") from exc
|
|
continue
|
|
try:
|
|
exact.add(int(item))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status value: {item}") from exc
|
|
|
|
if min_status == 999 and max_status == 0:
|
|
min_status, max_status = 0, -1
|
|
return exact, min_status, max_status
|
|
|
|
|
|
def services_from_data(data: dict[str, Any]) -> list[Service]:
|
|
raw_services = data.get("services")
|
|
if not isinstance(raw_services, list) or not raw_services:
|
|
raise ValueError("Config must include a non-empty services array")
|
|
|
|
services: list[Service] = []
|
|
for index, item in enumerate(raw_services, start=1):
|
|
if not isinstance(item, dict):
|
|
raise ValueError(f"Service #{index} must be an object")
|
|
|
|
name = str(item.get("name", "")).strip()
|
|
url = str(item.get("url", "")).strip()
|
|
if not name or not url:
|
|
raise ValueError(f"Service #{index} must include name and url")
|
|
|
|
parsed = urllib.parse.urlparse(url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise ValueError(f"Service {name} has an invalid http(s) URL")
|
|
|
|
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
|
|
services.append(
|
|
Service(
|
|
name=name,
|
|
group=str(item.get("group", "Main Services")).strip() or "Main Services",
|
|
url=url,
|
|
display_url=str(item.get("displayUrl", url)).strip() or url,
|
|
method=str(item.get("method", "GET")).strip().upper(),
|
|
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
|
|
expected_statuses=exact,
|
|
expected_min=minimum,
|
|
expected_max=maximum,
|
|
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
|
|
)
|
|
)
|
|
|
|
return services
|
|
|
|
|
|
def load_services(path: Path) -> list[Service]:
|
|
return services_from_data(load_json(path))
|
|
|
|
|
|
def save_services_config(path: Path, data: dict[str, Any]) -> None:
|
|
services_from_data(data)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
try:
|
|
temporary.replace(path)
|
|
except OSError:
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
temporary.unlink(missing_ok=True)
|
|
|
|
|
|
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for service in services:
|
|
expected: list[int | str] = sorted(service.expected_statuses)
|
|
if service.expected_min <= service.expected_max:
|
|
expected.append(f"{service.expected_min}-{service.expected_max}")
|
|
item: dict[str, Any] = {
|
|
"name": service.name,
|
|
"group": service.group,
|
|
"url": service.url,
|
|
"displayUrl": service.display_url,
|
|
"method": service.method,
|
|
"timeoutSeconds": service.timeout,
|
|
"expectedStatuses": expected or ["200-399"],
|
|
}
|
|
if service.keyword:
|
|
item["keyword"] = service.keyword
|
|
output.append(item)
|
|
return output
|
|
|
|
|
|
def status_expected(service: Service, status: int) -> bool:
|
|
if status in service.expected_statuses:
|
|
return True
|
|
return service.expected_min <= status <= service.expected_max
|
|
|
|
|
|
def check_service(service: Service) -> CheckResult:
|
|
started = time.monotonic()
|
|
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
|
|
request = urllib.request.Request(service.url, headers=headers, method=service.method)
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
|
|
body = response.read(1_000_000) if service.keyword else b""
|
|
status = int(response.status)
|
|
except urllib.error.HTTPError as exc:
|
|
status = int(exc.code)
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
return CheckResult(service, False, None, latency_ms, clean_error(exc))
|
|
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
|
|
if ok and service.keyword:
|
|
try:
|
|
text = body.decode("utf-8", errors="ignore")
|
|
except UnicodeDecodeError:
|
|
text = ""
|
|
if service.keyword not in text:
|
|
ok = False
|
|
return CheckResult(service, False, status, latency_ms, "keyword missing")
|
|
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
|
|
|
|
def clean_error(exc: BaseException) -> str:
|
|
reason = getattr(exc, "reason", None)
|
|
if reason:
|
|
return str(reason)[:120]
|
|
return str(exc)[:120] or exc.__class__.__name__
|
|
|
|
|
|
def discord_request(
|
|
method: str,
|
|
token: str,
|
|
path: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
body = None
|
|
headers = {
|
|
"Authorization": f"Bot {token}",
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
}
|
|
|
|
if payload is not None:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
request = urllib.request.Request(
|
|
f"{DISCORD_API}{path}",
|
|
data=body,
|
|
headers=headers,
|
|
method=method,
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=20) as response:
|
|
data = response.read()
|
|
if not data:
|
|
return {}
|
|
return json.loads(data.decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
|
|
|
|
|
|
def discord_delete_message(token: str, channel_id: str, message_id: str) -> None:
|
|
try:
|
|
discord_request("DELETE", token, f"/channels/{channel_id}/messages/{message_id}")
|
|
except RuntimeError as exc:
|
|
print(f"Could not delete old media catalog message {message_id}: {exc}", file=sys.stderr)
|
|
|
|
|
|
def discord_bot_identity(token: str) -> dict[str, Any]:
|
|
return discord_request("GET", token, "/users/@me")
|
|
|
|
|
|
def load_state(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def save_state(path: Path, state: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(state, handle, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
temporary.replace(path)
|
|
|
|
|
|
def validate_channel_id(value: str, label: str) -> str:
|
|
channel_id = value.strip()
|
|
if not channel_id:
|
|
raise ValueError(f"{label} channel ID is required")
|
|
if not channel_id.isdigit():
|
|
raise ValueError(f"{label} channel ID must be a Discord numeric channel ID")
|
|
return channel_id
|
|
|
|
|
|
def channel_settings(runtime: BotRuntime) -> dict[str, str]:
|
|
data = load_state(runtime.settings_path)
|
|
status_channel = str(data.get("status_channel_id", "")).strip() or runtime.default_channel_id
|
|
media_channel = str(data.get("media_channel_id", "")).strip() or status_channel
|
|
return {
|
|
"statusChannelId": status_channel,
|
|
"mediaChannelId": media_channel,
|
|
}
|
|
|
|
|
|
def save_channel_settings(runtime: BotRuntime, status_channel_id: str, media_channel_id: str) -> dict[str, str]:
|
|
status_channel = validate_channel_id(status_channel_id, "Status")
|
|
media_channel = validate_channel_id(media_channel_id or status_channel, "Media")
|
|
state = load_state(runtime.settings_path)
|
|
state.update(
|
|
{
|
|
"status_channel_id": status_channel,
|
|
"media_channel_id": media_channel,
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
)
|
|
save_state(runtime.settings_path, state)
|
|
return channel_settings(runtime)
|
|
|
|
|
|
def save_media_channel_setting(runtime: BotRuntime, media_channel_id: str) -> dict[str, str]:
|
|
media_channel = validate_channel_id(media_channel_id, "Media")
|
|
state = load_state(runtime.settings_path)
|
|
state["media_channel_id"] = media_channel
|
|
state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
save_state(runtime.settings_path, state)
|
|
return channel_settings(runtime)
|
|
|
|
|
|
def normalize_csv_key(value: str) -> str:
|
|
return "".join(character for character in value.lower() if character.isalnum())
|
|
|
|
|
|
def value_from_row(row: dict[str, str], aliases: list[str]) -> str:
|
|
for alias in aliases:
|
|
value = row.get(alias, "").strip()
|
|
if value:
|
|
return value
|
|
return ""
|
|
|
|
|
|
def parse_int_text(value: str) -> int | None:
|
|
digits = "".join(character for character in value if character.isdigit())
|
|
if not digits:
|
|
return None
|
|
try:
|
|
return int(digits)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_year_text(value: str) -> str | None:
|
|
match = re.search(r"(18|19|20|21)\d{2}", value)
|
|
return match.group(0) if match else None
|
|
|
|
|
|
def clean_media_text(value: str, limit: int = 220) -> str | None:
|
|
cleaned = " ".join(str(value).replace("\r", " ").replace("\n", " ").split())
|
|
if not cleaned:
|
|
return None
|
|
if len(cleaned) <= limit:
|
|
return cleaned
|
|
return cleaned[: limit - 1].rstrip() + "…"
|
|
|
|
|
|
def format_runtime(value: str) -> str | None:
|
|
cleaned = clean_media_text(value, 48)
|
|
if not cleaned:
|
|
return None
|
|
if not cleaned.isdigit():
|
|
return cleaned
|
|
|
|
minutes = int(cleaned)
|
|
if minutes <= 0:
|
|
return None
|
|
hours, remainder = divmod(minutes, 60)
|
|
if hours and remainder:
|
|
return f"{hours}h {remainder}m"
|
|
if hours:
|
|
return f"{hours}h"
|
|
return f"{remainder}m"
|
|
|
|
|
|
def parse_media_csv(csv_text: str, media_type: str, filename: str) -> list[MediaItem]:
|
|
text = csv_text.lstrip("\ufeff")
|
|
if not text.strip():
|
|
return []
|
|
|
|
try:
|
|
dialect = csv.Sniffer().sniff(text[:4096], delimiters=",;\t|")
|
|
except csv.Error:
|
|
dialect = csv.excel
|
|
|
|
reader = csv.DictReader(io.StringIO(text), dialect=dialect)
|
|
if not reader.fieldnames:
|
|
raise ValueError(f"{filename} does not have a header row")
|
|
|
|
title_aliases = [
|
|
"title",
|
|
"name",
|
|
"movie",
|
|
"movietitle",
|
|
"sorttitle",
|
|
"originaltitle",
|
|
]
|
|
if media_type == "show":
|
|
title_aliases = [
|
|
"showtitle",
|
|
"seriestitle",
|
|
"series",
|
|
"show",
|
|
"grandparenttitle",
|
|
"parenttitle",
|
|
"title",
|
|
"name",
|
|
]
|
|
|
|
year_aliases = ["year", "releaseyear", "productionyear", "releasedate", "premiered", "date"]
|
|
genre_aliases = ["genres", "genre", "tags", "categories"]
|
|
rating_aliases = ["rating", "contentrating", "agerating", "certification", "mpaarating"]
|
|
runtime_aliases = ["runtime", "duration", "runtimeminutes", "length", "durationminutes"]
|
|
summary_aliases = ["summary", "overview", "description", "plot", "tagline"]
|
|
season_count_aliases = ["seasoncount", "seasons"]
|
|
season_number_aliases = ["season", "seasonnumber", "seasonindex", "parentindex"]
|
|
episode_count_aliases = ["episodecount", "episodes"]
|
|
episode_number_aliases = ["episode", "episodenumber", "episodeindex", "index"]
|
|
|
|
items: list[MediaItem] = []
|
|
show_groups: dict[tuple[str, str], dict[str, Any]] = {}
|
|
|
|
for raw_row in reader:
|
|
row = {
|
|
normalize_csv_key(str(key)): str(value or "")
|
|
for key, value in raw_row.items()
|
|
if key is not None
|
|
}
|
|
title = clean_media_text(value_from_row(row, title_aliases), 120)
|
|
if not title:
|
|
continue
|
|
|
|
year = parse_year_text(value_from_row(row, year_aliases))
|
|
genres = clean_media_text(value_from_row(row, genre_aliases), 120)
|
|
rating = clean_media_text(value_from_row(row, rating_aliases), 32)
|
|
runtime = format_runtime(value_from_row(row, runtime_aliases))
|
|
summary = clean_media_text(value_from_row(row, summary_aliases))
|
|
season_count = parse_int_text(value_from_row(row, season_count_aliases))
|
|
season_number = parse_int_text(value_from_row(row, season_number_aliases))
|
|
episode_count = parse_int_text(value_from_row(row, episode_count_aliases))
|
|
episode_number = parse_int_text(value_from_row(row, episode_number_aliases))
|
|
|
|
if media_type == "show":
|
|
key = (title.casefold(), year or "")
|
|
group = show_groups.setdefault(
|
|
key,
|
|
{
|
|
"title": title,
|
|
"year": year,
|
|
"genres": genres,
|
|
"rating": rating,
|
|
"runtime": runtime,
|
|
"summary": summary,
|
|
"seasons": set(),
|
|
"episodes": 0,
|
|
"explicit_season_count": None,
|
|
"explicit_episode_count": None,
|
|
},
|
|
)
|
|
if not group.get("genres") and genres:
|
|
group["genres"] = genres
|
|
if not group.get("rating") and rating:
|
|
group["rating"] = rating
|
|
if not group.get("runtime") and runtime:
|
|
group["runtime"] = runtime
|
|
if not group.get("summary") and summary:
|
|
group["summary"] = summary
|
|
if season_number is not None:
|
|
group["seasons"].add(season_number)
|
|
if episode_number is not None:
|
|
group["episodes"] += 1
|
|
if season_count is not None:
|
|
current = group.get("explicit_season_count")
|
|
group["explicit_season_count"] = max(current or 0, season_count)
|
|
if episode_count is not None:
|
|
current = group.get("explicit_episode_count")
|
|
group["explicit_episode_count"] = max(current or 0, episode_count)
|
|
continue
|
|
|
|
items.append(
|
|
MediaItem(
|
|
title=title,
|
|
media_type=media_type,
|
|
year=year,
|
|
genres=genres,
|
|
rating=rating,
|
|
runtime=runtime,
|
|
summary=summary,
|
|
)
|
|
)
|
|
|
|
if media_type == "show":
|
|
for group in show_groups.values():
|
|
seasons = group.get("explicit_season_count") or len(group["seasons"]) or None
|
|
episodes = group.get("explicit_episode_count") or group["episodes"] or None
|
|
items.append(
|
|
MediaItem(
|
|
title=group["title"],
|
|
media_type=media_type,
|
|
year=group["year"],
|
|
genres=group["genres"],
|
|
rating=group["rating"],
|
|
runtime=group["runtime"],
|
|
summary=group["summary"],
|
|
seasons=seasons,
|
|
episodes=episodes,
|
|
)
|
|
)
|
|
|
|
deduped: dict[tuple[str, str], MediaItem] = {}
|
|
for item in items:
|
|
deduped.setdefault((item.title.casefold(), item.year or ""), item)
|
|
|
|
parsed = sorted(deduped.values(), key=lambda item: (item.title.casefold(), item.year or ""))
|
|
if not parsed:
|
|
raise ValueError(f"{filename} did not contain any rows with a recognizable title/name column")
|
|
return parsed
|
|
|
|
|
|
def media_item_heading(item: MediaItem) -> str:
|
|
if item.year:
|
|
return f"{item.title} ({item.year})"
|
|
return item.title
|
|
|
|
|
|
def media_item_value(item: MediaItem) -> str:
|
|
details: list[str] = []
|
|
if item.genres:
|
|
details.append(item.genres)
|
|
if item.rating:
|
|
details.append(item.rating)
|
|
if item.runtime and item.media_type == "movie":
|
|
details.append(item.runtime)
|
|
if item.media_type == "show":
|
|
counts = []
|
|
if item.seasons:
|
|
counts.append(f"{item.seasons} season{'s' if item.seasons != 1 else ''}")
|
|
if item.episodes:
|
|
counts.append(f"{item.episodes} episode{'s' if item.episodes != 1 else ''}")
|
|
if counts:
|
|
details.append(" · ".join(counts))
|
|
|
|
lines = []
|
|
if details:
|
|
lines.append(" • ".join(details))
|
|
if item.summary:
|
|
lines.append(item.summary)
|
|
return "\n".join(lines)[:1024] or "No details provided."
|
|
|
|
|
|
def media_category_embeds(
|
|
title: str,
|
|
items: list[MediaItem],
|
|
color: int,
|
|
total_count: int,
|
|
source_name: str,
|
|
) -> list[dict[str, Any]]:
|
|
embeds: list[dict[str, Any]] = []
|
|
omitted = max(total_count - len(items), 0)
|
|
page_count = max((len(items) + MEDIA_ITEMS_PER_EMBED - 1) // MEDIA_ITEMS_PER_EMBED, 1)
|
|
|
|
for page_index in range(page_count):
|
|
start = page_index * MEDIA_ITEMS_PER_EMBED
|
|
chunk = items[start : start + MEDIA_ITEMS_PER_EMBED]
|
|
description = f"{total_count} total from `{source_name}`"
|
|
if omitted:
|
|
description += f" · showing first {len(items)}"
|
|
if page_count > 1:
|
|
description += f" · page {page_index + 1}/{page_count}"
|
|
|
|
fields = [
|
|
{
|
|
"name": media_item_heading(item)[:256],
|
|
"value": media_item_value(item),
|
|
"inline": False,
|
|
}
|
|
for item in chunk
|
|
]
|
|
embeds.append(
|
|
{
|
|
"title": title,
|
|
"description": description,
|
|
"color": color,
|
|
"fields": fields,
|
|
}
|
|
)
|
|
|
|
return embeds
|
|
|
|
|
|
def render_media_catalog_payloads(
|
|
movies: list[MediaItem],
|
|
shows: list[MediaItem],
|
|
movie_total: int,
|
|
show_total: int,
|
|
movie_source: str,
|
|
show_source: str,
|
|
) -> list[dict[str, Any]]:
|
|
now = datetime.now(timezone.utc)
|
|
embeds: list[dict[str, Any]] = [
|
|
{
|
|
"title": "The Mithral Archive Media Catalog",
|
|
"description": "Current movies and shows available in the archive.",
|
|
"color": 0x5865F2,
|
|
"fields": [
|
|
{"name": "Movies", "value": str(movie_total), "inline": True},
|
|
{"name": "Shows", "value": str(show_total), "inline": True},
|
|
{"name": "Updated", "value": now.strftime("%Y-%m-%d %H:%M UTC"), "inline": True},
|
|
],
|
|
"timestamp": now.isoformat(),
|
|
}
|
|
]
|
|
if movies:
|
|
embeds.extend(media_category_embeds("Movies", movies, 0xF59E0B, movie_total, movie_source))
|
|
if shows:
|
|
embeds.extend(media_category_embeds("Shows", shows, 0x10B981, show_total, show_source))
|
|
|
|
payloads = []
|
|
for index in range(0, len(embeds), MAX_DISCORD_EMBEDS):
|
|
payloads.append(
|
|
{
|
|
"content": "**The Mithral Archive Media Catalog**" if index == 0 else "",
|
|
"embeds": embeds[index : index + MAX_DISCORD_EMBEDS],
|
|
}
|
|
)
|
|
return payloads
|
|
|
|
|
|
def publish_media_catalog(
|
|
runtime: BotRuntime,
|
|
channel_id: str,
|
|
movies_csv: str,
|
|
shows_csv: str,
|
|
movie_filename: str,
|
|
show_filename: str,
|
|
) -> dict[str, Any]:
|
|
if runtime.dry_run:
|
|
raise RuntimeError("Discord dry run is enabled; media catalog was parsed but not sent")
|
|
|
|
settings = channel_settings(runtime)
|
|
channel = validate_channel_id(channel_id.strip() or settings["mediaChannelId"], "Media")
|
|
|
|
movies_all = parse_media_csv(movies_csv, "movie", movie_filename) if movies_csv.strip() else []
|
|
shows_all = parse_media_csv(shows_csv, "show", show_filename) if shows_csv.strip() else []
|
|
if not movies_all and not shows_all:
|
|
raise ValueError("Upload at least one Movies.csv or Shows.csv file")
|
|
|
|
limit = int(os.getenv("MEDIA_CATALOG_MAX_ITEMS", str(MAX_MEDIA_ITEMS_PER_CATEGORY)))
|
|
movies = movies_all[:limit]
|
|
shows = shows_all[:limit]
|
|
payloads = render_media_catalog_payloads(
|
|
movies=movies,
|
|
shows=shows,
|
|
movie_total=len(movies_all),
|
|
show_total=len(shows_all),
|
|
movie_source=movie_filename or "Movies.csv",
|
|
show_source=show_filename or "Shows.csv",
|
|
)
|
|
|
|
state = load_state(runtime.media_state_path)
|
|
old_channel = str(state.get("channel_id", "")).strip()
|
|
existing_ids = [
|
|
str(message_id)
|
|
for message_id in state.get("message_ids", [])
|
|
if str(message_id).strip()
|
|
]
|
|
if old_channel and old_channel != channel:
|
|
for old_id in existing_ids:
|
|
discord_delete_message(runtime.token, old_channel, old_id)
|
|
existing_ids = []
|
|
|
|
message_ids: list[str] = []
|
|
for index, payload in enumerate(payloads):
|
|
if index < len(existing_ids):
|
|
discord_request("PATCH", runtime.token, f"/channels/{channel}/messages/{existing_ids[index]}", payload)
|
|
message_ids.append(existing_ids[index])
|
|
continue
|
|
message = discord_request("POST", runtime.token, f"/channels/{channel}/messages", payload)
|
|
message_id = str(message.get("id", "")).strip()
|
|
if not message_id:
|
|
raise RuntimeError("Discord did not return a message id for the media catalog")
|
|
message_ids.append(message_id)
|
|
|
|
for old_id in existing_ids[len(payloads) :]:
|
|
discord_delete_message(runtime.token, channel, old_id)
|
|
|
|
save_state(
|
|
runtime.media_state_path,
|
|
{
|
|
"channel_id": channel,
|
|
"message_ids": message_ids,
|
|
"movie_count": len(movies_all),
|
|
"show_count": len(shows_all),
|
|
"published_at": datetime.now(timezone.utc).isoformat(),
|
|
},
|
|
)
|
|
save_media_channel_setting(runtime, channel)
|
|
|
|
return {
|
|
"channelId": channel,
|
|
"messageIds": message_ids,
|
|
"movieCount": len(movies_all),
|
|
"showCount": len(shows_all),
|
|
"displayedMovieCount": len(movies),
|
|
"displayedShowCount": len(shows),
|
|
}
|
|
|
|
|
|
def media_catalog_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
state = load_state(runtime.media_state_path)
|
|
settings = channel_settings(runtime)
|
|
return {
|
|
"channelId": str(state.get("channel_id", "")).strip() or settings["mediaChannelId"],
|
|
"messageIds": state.get("message_ids", []) if isinstance(state.get("message_ids"), list) else [],
|
|
"movieCount": state.get("movie_count"),
|
|
"showCount": state.get("show_count"),
|
|
"publishedAt": state.get("published_at"),
|
|
"maxItemsPerCategory": int(os.getenv("MEDIA_CATALOG_MAX_ITEMS", str(MAX_MEDIA_ITEMS_PER_CATEGORY))),
|
|
}
|
|
|
|
|
|
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
|
|
checked_at = datetime.now(timezone.utc)
|
|
online = sum(1 for result in results if result.ok)
|
|
total = len(results)
|
|
degraded = 0 < online < total
|
|
|
|
if total == 0:
|
|
color = 0x6B7280
|
|
summary = "No services configured."
|
|
elif online == total:
|
|
color = 0x10B981
|
|
summary = f"🟢 Operational · {online}/{total} online"
|
|
elif degraded:
|
|
color = 0xF59E0B
|
|
offline = total - online
|
|
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
|
|
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
|
|
else:
|
|
color = 0xEF4444
|
|
summary = f"🔴 Outage · {online}/{total} online"
|
|
|
|
groups: dict[str, list[CheckResult]] = {}
|
|
for result in results:
|
|
groups.setdefault(result.service.group, []).append(result)
|
|
|
|
fields = []
|
|
for group_name, group_results in groups.items():
|
|
service_lines = []
|
|
state_lines = []
|
|
for result in group_results:
|
|
icon = "🟢" if result.ok else "🔴"
|
|
label = "Online" if result.ok else "Issue"
|
|
service_lines.append(f"**{result.service.name}**")
|
|
state_lines.append(f"{icon} {label}")
|
|
|
|
fields.append({
|
|
"name": group_name,
|
|
"value": "\n".join(service_lines)[:1024] or "None",
|
|
"inline": True,
|
|
})
|
|
fields.append({
|
|
"name": "Status",
|
|
"value": "\n".join(state_lines)[:1024] or "None",
|
|
"inline": True,
|
|
})
|
|
fields.append({
|
|
"name": "\u200b",
|
|
"value": "\u200b",
|
|
"inline": False,
|
|
})
|
|
|
|
if fields and fields[-1]["name"] == "\u200b":
|
|
fields.pop()
|
|
|
|
interval = os.getenv("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
|
|
return [
|
|
{
|
|
"title": "The Mithral Archive",
|
|
"description": summary,
|
|
"color": color,
|
|
"fields": fields[:25],
|
|
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
|
|
"timestamp": checked_at.isoformat(),
|
|
}
|
|
]
|
|
|
|
|
|
def upsert_status_message(
|
|
token: str,
|
|
channel_id: str,
|
|
state_path: Path,
|
|
results: list[CheckResult],
|
|
) -> str:
|
|
state = load_state(state_path)
|
|
message_id = str(state.get("message_id", "")).strip()
|
|
payload = {"content": "", "embeds": render_embeds(results)}
|
|
|
|
if message_id:
|
|
try:
|
|
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
|
|
return message_id
|
|
except RuntimeError as exc:
|
|
print(f"Could not edit existing status message, creating a new one: {exc}", file=sys.stderr)
|
|
|
|
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
|
|
new_id = str(message.get("id", "")).strip()
|
|
if not new_id:
|
|
raise RuntimeError("Discord did not return a message id")
|
|
|
|
save_state(state_path, {"message_id": new_id})
|
|
return new_id
|
|
|
|
|
|
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
|
|
results: list[CheckResult] = []
|
|
for index, service in enumerate(services):
|
|
results.append(
|
|
CheckResult(
|
|
service=service,
|
|
ok=index != len(services) - 1,
|
|
status=200 if index != len(services) - 1 else 502,
|
|
latency_ms=42 + (index * 31),
|
|
error=None if index != len(services) - 1 else "HTTP 502",
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
def print_preview(services: list[Service]) -> None:
|
|
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
|
|
print(json.dumps(payload, indent=2))
|
|
|
|
|
|
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
|
|
return {
|
|
"name": result.service.name,
|
|
"group": result.service.group,
|
|
"url": result.service.url,
|
|
"displayUrl": result.service.display_url,
|
|
"ok": result.ok,
|
|
"status": result.status,
|
|
"latencyMs": result.latency_ms,
|
|
"error": result.error,
|
|
}
|
|
|
|
|
|
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
|
|
services = load_services(runtime.config_path)
|
|
results = [check_service(service) for service in services]
|
|
if runtime.dry_run:
|
|
message_id = "dry-run"
|
|
else:
|
|
channel_id = validate_channel_id(channel_settings(runtime)["statusChannelId"], "Status")
|
|
message_id = upsert_status_message(runtime.token, channel_id, runtime.state_path, results)
|
|
|
|
with runtime.lock:
|
|
runtime.last_results = results
|
|
runtime.last_message_id = message_id
|
|
runtime.last_checked_at = datetime.now(timezone.utc)
|
|
runtime.last_error = None
|
|
|
|
return message_id, results
|
|
|
|
|
|
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
services = load_services(runtime.config_path)
|
|
settings = channel_settings(runtime)
|
|
with runtime.lock:
|
|
results = list(runtime.last_results)
|
|
return {
|
|
"services": services_to_jsonable(services),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
"lastError": runtime.last_error,
|
|
"lastMessageId": runtime.last_message_id,
|
|
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
|
|
"channelId": settings["statusChannelId"],
|
|
"channels": settings,
|
|
}
|
|
|
|
|
|
def bool_env(name: str, default: bool = False) -> bool:
|
|
raw = os.getenv(name)
|
|
if raw is None:
|
|
return default
|
|
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def password_hash(password: str) -> str:
|
|
salt = secrets.token_bytes(16)
|
|
digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS)
|
|
salt_text = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=")
|
|
digest_text = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
|
|
return f"pbkdf2_sha256${PBKDF2_ITERATIONS}${salt_text}${digest_text}"
|
|
|
|
|
|
def decode_urlsafe_base64(value: str) -> bytes:
|
|
padding = "=" * (-len(value) % 4)
|
|
return base64.urlsafe_b64decode(value + padding)
|
|
|
|
|
|
def verify_password_hash(encoded: str, password: str) -> bool:
|
|
try:
|
|
algorithm, iterations, salt_text, digest_text = encoded.split("$", 3)
|
|
if algorithm != "pbkdf2_sha256":
|
|
return False
|
|
salt = decode_urlsafe_base64(salt_text)
|
|
expected = decode_urlsafe_base64(digest_text)
|
|
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(iterations))
|
|
except (ValueError, TypeError):
|
|
return False
|
|
return hmac.compare_digest(actual, expected)
|
|
|
|
|
|
def dashboard_auth_from_env() -> DashboardAuth | None:
|
|
if bool_env("DASHBOARD_AUTH_DISABLED", False):
|
|
return None
|
|
|
|
username = os.getenv("DASHBOARD_USERNAME", "").strip()
|
|
encoded_hash = os.getenv("DASHBOARD_PASSWORD_HASH", "").strip()
|
|
if not username or not encoded_hash:
|
|
return None
|
|
|
|
ttl = int(os.getenv("DASHBOARD_SESSION_TTL_SECONDS", "28800"))
|
|
secure = bool_env("DASHBOARD_COOKIE_SECURE", False)
|
|
return DashboardAuth(
|
|
DashboardAuthConfig(
|
|
username=username,
|
|
password_hash=encoded_hash,
|
|
session_ttl_seconds=ttl,
|
|
cookie_secure=secure,
|
|
)
|
|
)
|
|
|
|
|
|
def print_password_hash() -> None:
|
|
import getpass
|
|
|
|
first = getpass.getpass("Dashboard password: ")
|
|
second = getpass.getpass("Confirm password: ")
|
|
if first != second:
|
|
raise SystemExit("Passwords did not match")
|
|
if len(first) < 12:
|
|
raise SystemExit("Use at least 12 characters")
|
|
print(password_hash(first))
|
|
|
|
|
|
def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]:
|
|
dashboard_path = Path(__file__).with_name("dashboard.html")
|
|
|
|
class DashboardHandler(BaseHTTPRequestHandler):
|
|
server_version = "ArchiveStatusDashboard/1.0"
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
print(f"[dashboard] {self.address_string()} - {format % args}", flush=True)
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path in {"/", "/dashboard"}:
|
|
self.send_dashboard()
|
|
return
|
|
if self.path == "/favicon.ico":
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
return
|
|
if self.path == "/api/session":
|
|
session = self.require_auth()
|
|
if session is None:
|
|
return
|
|
_session_id, data = session
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"username": data.username,
|
|
"csrfToken": data.csrf_token,
|
|
},
|
|
)
|
|
return
|
|
if self.path == "/api/status":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, runtime_status(runtime))
|
|
return
|
|
if self.path == "/api/media":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, media_catalog_status(runtime))
|
|
return
|
|
if self.path == "/api/settings":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, {"channels": channel_settings(runtime)})
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def do_POST(self) -> None:
|
|
if self.path == "/api/login":
|
|
self.handle_login()
|
|
return
|
|
session = self.require_auth(require_csrf=True)
|
|
if session is None:
|
|
return
|
|
if self.path == "/api/logout":
|
|
self.handle_logout(session[0])
|
|
return
|
|
if self.path == "/api/check":
|
|
self.handle_check()
|
|
return
|
|
if self.path == "/api/services":
|
|
self.handle_services()
|
|
return
|
|
if self.path == "/api/media":
|
|
self.handle_media_catalog()
|
|
return
|
|
if self.path == "/api/settings":
|
|
self.handle_settings()
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None:
|
|
if auth is None:
|
|
return "disabled", DashboardSession("local", "disabled", time.time() + 3600)
|
|
|
|
session = auth.session_from_cookie(self.headers.get("Cookie"))
|
|
if session is None:
|
|
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"})
|
|
return None
|
|
|
|
if require_csrf:
|
|
csrf = self.headers.get("X-CSRF-Token", "")
|
|
if not hmac.compare_digest(csrf, session[1].csrf_token):
|
|
self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"})
|
|
return None
|
|
|
|
return session
|
|
|
|
def cookie_attributes(self, max_age: int) -> str:
|
|
attrs = [
|
|
"Path=/",
|
|
"HttpOnly",
|
|
"SameSite=Strict",
|
|
f"Max-Age={max_age}",
|
|
]
|
|
if auth is not None and auth.config.cookie_secure:
|
|
attrs.append("Secure")
|
|
return "; ".join(attrs)
|
|
|
|
def set_session_cookie(self, session_id: str) -> None:
|
|
ttl = auth.config.session_ttl_seconds if auth is not None else 3600
|
|
self.send_header(
|
|
"Set-Cookie",
|
|
f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}",
|
|
)
|
|
|
|
def clear_session_cookie(self) -> None:
|
|
self.send_header(
|
|
"Set-Cookie",
|
|
f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}",
|
|
)
|
|
|
|
def send_dashboard(self) -> None:
|
|
try:
|
|
body = dashboard_path.read_bytes()
|
|
except FileNotFoundError:
|
|
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing")
|
|
return
|
|
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def read_json(self) -> dict[str, Any]:
|
|
raw_length = self.headers.get("Content-Length", "0")
|
|
try:
|
|
length = int(raw_length)
|
|
except ValueError as exc:
|
|
raise ValueError("Invalid Content-Length") from exc
|
|
if length > MAX_REQUEST_BYTES:
|
|
raise ValueError("Request body is too large")
|
|
|
|
body = self.rfile.read(length)
|
|
try:
|
|
data = json.loads(body.decode("utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON: {exc}") from exc
|
|
if not isinstance(data, dict):
|
|
raise ValueError("JSON body must be an object")
|
|
return data
|
|
|
|
def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_login(self) -> None:
|
|
if auth is None:
|
|
self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"})
|
|
return
|
|
|
|
try:
|
|
data = self.read_json()
|
|
except ValueError as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
username = str(data.get("username", ""))
|
|
password = str(data.get("password", ""))
|
|
throttle_key = f"{self.client_address[0]}:{username}"
|
|
if not auth.login_allowed(throttle_key):
|
|
self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."})
|
|
return
|
|
|
|
login = auth.login(username, password)
|
|
if login is None:
|
|
auth.record_failed_login(throttle_key)
|
|
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"})
|
|
return
|
|
|
|
auth.clear_failed_login(throttle_key)
|
|
session_id, session = login
|
|
payload = {
|
|
"username": session.username,
|
|
"csrfToken": session.csrf_token,
|
|
}
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(HTTPStatus.OK)
|
|
self.set_session_cookie(session_id)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_logout(self, session_id: str) -> None:
|
|
if auth is not None:
|
|
auth.logout(session_id)
|
|
body = b'{\n "ok": true\n}'
|
|
self.send_response(HTTPStatus.OK)
|
|
self.clear_session_cookie()
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_check(self) -> None:
|
|
try:
|
|
message_id, results = run_check_cycle(runtime)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"messageId": message_id,
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
},
|
|
)
|
|
|
|
def handle_services(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
services_from_data(data)
|
|
save_services_config(runtime.config_path, data)
|
|
message_id, results = run_check_cycle(runtime)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"messageId": message_id,
|
|
"services": data.get("services", []),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
},
|
|
)
|
|
|
|
def handle_media_catalog(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
result = publish_media_catalog(
|
|
runtime=runtime,
|
|
channel_id=str(data.get("channelId", "")),
|
|
movies_csv=str(data.get("moviesCsv", "")),
|
|
shows_csv=str(data.get("showsCsv", "")),
|
|
movie_filename=str(data.get("movieFileName", "Movies.csv") or "Movies.csv"),
|
|
show_filename=str(data.get("showFileName", "Shows.csv") or "Shows.csv"),
|
|
)
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, result)
|
|
|
|
def handle_settings(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
channels = data.get("channels", data)
|
|
if not isinstance(channels, dict):
|
|
raise ValueError("Settings payload must include a channels object")
|
|
result = save_channel_settings(
|
|
runtime,
|
|
str(channels.get("statusChannelId", "")),
|
|
str(channels.get("mediaChannelId", "")),
|
|
)
|
|
except Exception as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(HTTPStatus.OK, {"channels": result})
|
|
|
|
return DashboardHandler
|
|
|
|
|
|
def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None:
|
|
if not bool_env("DASHBOARD_ENABLED", False):
|
|
return None
|
|
|
|
host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1"
|
|
port = int(os.getenv("DASHBOARD_PORT", "8787"))
|
|
auth = dashboard_auth_from_env()
|
|
server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth))
|
|
thread = threading.Thread(target=server.serve_forever, name="dashboard", daemon=True)
|
|
thread.start()
|
|
auth_note = "without auth" if auth is None else "with password sessions"
|
|
print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True)
|
|
return server
|
|
|
|
|
|
def main() -> int:
|
|
load_dotenv()
|
|
|
|
if "--hash-password" in sys.argv:
|
|
print_password_hash()
|
|
return 0
|
|
|
|
if "--preview" in sys.argv:
|
|
config_path = Path(os.getenv("ARCHIVE_STATUS_CONFIG", "services.json"))
|
|
print_preview(load_services(config_path))
|
|
return 0
|
|
|
|
token = env("DISCORD_BOT_TOKEN")
|
|
token = normalize_discord_token(token)
|
|
channel_id = os.getenv("DISCORD_CHANNEL_ID", "").strip()
|
|
config_path = Path(env("ARCHIVE_STATUS_CONFIG", "services.json"))
|
|
state_path = Path(env("ARCHIVE_STATUS_STATE", "state/status-message.json"))
|
|
media_state_path = Path(env("MEDIA_CATALOG_STATE", "state/media-catalog.json"))
|
|
settings_path = Path(env("BOT_SETTINGS_STATE", "state/bot-settings.json"))
|
|
interval = int(env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)))
|
|
runtime = BotRuntime(token, channel_id, config_path, state_path, media_state_path, settings_path, dry_run=bool_env("DISCORD_DRY_RUN", False))
|
|
gateway = None
|
|
if bool_env("DISCORD_GATEWAY_ENABLED", True) and not runtime.dry_run:
|
|
gateway = DiscordGatewayManager(token)
|
|
gateway.start()
|
|
dashboard = maybe_start_dashboard(runtime)
|
|
|
|
if runtime.dry_run:
|
|
print("Discord dry run is enabled; no Discord messages will be sent or edited.", flush=True)
|
|
else:
|
|
try:
|
|
identity = discord_bot_identity(token)
|
|
username = identity.get("username", "unknown")
|
|
bot_id = identity.get("id", "unknown")
|
|
print(f"Discord token authenticated as {username} ({bot_id})", flush=True)
|
|
except Exception as exc:
|
|
print(f"Could not verify Discord bot identity: {exc}", file=sys.stderr, flush=True)
|
|
|
|
stopped = False
|
|
|
|
def stop(_signum: int, _frame: Any) -> None:
|
|
nonlocal stopped
|
|
stopped = True
|
|
|
|
signal.signal(signal.SIGINT, stop)
|
|
signal.signal(signal.SIGTERM, stop)
|
|
|
|
while not stopped:
|
|
try:
|
|
message_id, results = run_check_cycle(runtime)
|
|
online = sum(1 for result in results if result.ok)
|
|
print(f"Updated Discord status message {message_id}: {online}/{len(results)} online", flush=True)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
print(f"Status update failed: {exc}", file=sys.stderr, flush=True)
|
|
|
|
for _ in range(interval):
|
|
if stopped:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if dashboard is not None:
|
|
dashboard.shutdown()
|
|
if gateway is not None:
|
|
gateway.stop()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|