#!/usr/bin/env python3 """Live Discord status message for The Mithral Archive.""" from __future__ import annotations import base64 import asyncio import hashlib import hmac import json import os import secrets import signal import socket import ssl import sys import threading import time import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from http.cookies import SimpleCookie from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any DISCORD_API = "https://discord.com/api/v10" DEFAULT_INTERVAL_SECONDS = 60 DEFAULT_TIMEOUT_SECONDS = 10 MAX_DISCORD_EMBEDS = 10 MAX_REQUEST_BYTES = 1_000_000 SESSION_COOKIE = "archive_bot_session" PBKDF2_ITERATIONS = 390_000 @dataclass(frozen=True) class Service: name: str group: str url: str display_url: str method: str timeout: float expected_statuses: set[int] expected_min: int expected_max: int keyword: str | None @dataclass(frozen=True) class CheckResult: service: Service ok: bool status: int | None latency_ms: int | None error: str | None @dataclass(frozen=True) class DashboardAuthConfig: username: str password_hash: str session_ttl_seconds: int cookie_secure: bool @dataclass class DashboardSession: username: str csrf_token: str expires_at: float class DashboardAuth: def __init__(self, config: DashboardAuthConfig) -> None: self.config = config self.lock = threading.Lock() self.sessions: dict[str, DashboardSession] = {} self.failed_logins: dict[str, list[float]] = {} def login_allowed(self, key: str) -> bool: now = time.time() window_start = now - 900 with self.lock: attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start] self.failed_logins[key] = attempts return len(attempts) < 10 def record_failed_login(self, key: str) -> None: now = time.time() with self.lock: self.failed_logins.setdefault(key, []).append(now) def clear_failed_login(self, key: str) -> None: with self.lock: self.failed_logins.pop(key, None) def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None: if not hmac.compare_digest(username, self.config.username): return None if not verify_password_hash(self.config.password_hash, password): return None session_id = secrets.token_urlsafe(32) session = DashboardSession( username=username, csrf_token=secrets.token_urlsafe(32), expires_at=time.time() + self.config.session_ttl_seconds, ) with self.lock: self.sessions[session_id] = session return session_id, session def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None: if not cookie_header: return None cookie = SimpleCookie() cookie.load(cookie_header) morsel = cookie.get(SESSION_COOKIE) if morsel is None: return None session_id = morsel.value now = time.time() with self.lock: session = self.sessions.get(session_id) if session is None: return None if session.expires_at <= now: self.sessions.pop(session_id, None) return None session.expires_at = now + self.config.session_ttl_seconds return session_id, session def logout(self, session_id: str) -> None: with self.lock: self.sessions.pop(session_id, None) class BotRuntime: def __init__( self, token: str, channel_id: str, config_path: Path, state_path: Path, dry_run: bool = False, ) -> None: self.token = token self.channel_id = channel_id self.config_path = config_path self.state_path = state_path self.dry_run = dry_run self.lock = threading.Lock() self.last_results: list[CheckResult] = [] self.last_error: str | None = None self.last_message_id: str | None = None self.last_checked_at: datetime | None = None class DiscordGatewayManager: def __init__(self, token: str) -> None: self.token = token self.thread: threading.Thread | None = None self.loop: asyncio.AbstractEventLoop | None = None self.client: Any = None self.ready = threading.Event() self._disconnecting = threading.Event() def start(self) -> None: if self.thread is not None: return self.thread = threading.Thread(target=self._run, name="discord-gateway", daemon=True) self.thread.start() def stop(self) -> None: self._disconnecting.set() if self.loop is not None and self.client is not None: asyncio.run_coroutine_threadsafe(self.client.close(), self.loop) if self.thread is not None: self.thread.join(timeout=15) def _run(self) -> None: try: import discord except ImportError: print("discord.py is not installed. Install requirements.txt to keep the bot online.", file=sys.stderr, flush=True) self.ready.set() return class GatewayClient(discord.Client): def __init__(self, manager: DiscordGatewayManager) -> None: intents = discord.Intents.default() intents.guilds = True super().__init__(intents=intents) self.manager = manager async def on_ready(self) -> None: await self.change_presence(status=discord.Status.online) user = self.user name = user.name if user is not None else "unknown" bot_id = user.id if user is not None else "unknown" print(f"Discord gateway connected as {name} ({bot_id})", flush=True) self.manager.ready.set() async def on_disconnect(self) -> None: self.manager.ready.clear() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.client = GatewayClient(self) try: self.loop.run_until_complete(self.client.start(self.token)) except Exception as exc: if not self._disconnecting.is_set(): print(f"Discord gateway stopped: {exc}", file=sys.stderr, flush=True) finally: self.ready.set() try: pending = asyncio.all_tasks(self.loop) for task in pending: task.cancel() if pending: self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) finally: self.loop.close() def env(name: str, default: str | None = None) -> str: value = os.getenv(name, default) if value is None or not value.strip(): raise SystemExit(f"Missing required environment variable: {name}") return value.strip() def normalize_discord_token(token: str) -> str: cleaned = token.strip().strip("\"'") if cleaned.lower().startswith("bot "): cleaned = cleaned[4:].strip() return cleaned def load_dotenv(path: Path = Path(".env")) -> None: if not path.exists(): return for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() value = value.strip().strip("\"'") if key and key not in os.environ: os.environ[key] = value def load_json(path: Path) -> dict[str, Any]: try: with path.open("r", encoding="utf-8") as handle: data = json.load(handle) except FileNotFoundError as exc: raise ValueError(f"Config file not found: {path}") from exc except PermissionError as exc: raise ValueError(f"Config file is not readable: {path}") from exc except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in {path}: {exc}") from exc if not isinstance(data, dict): raise ValueError(f"Config must be a JSON object: {path}") return data def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]: if raw is None: return set(), 200, 399 if isinstance(raw, str): raw = [part.strip() for part in raw.split(",") if part.strip()] if not isinstance(raw, list): raise ValueError("expectedStatuses must be a list or comma-separated string") exact: set[int] = set() min_status = 999 max_status = 0 for item in raw: if isinstance(item, int): exact.add(item) continue if not isinstance(item, str): raise ValueError("expectedStatuses entries must be integers or ranges") if "-" in item: left, right = item.split("-", 1) try: min_status = min(min_status, int(left)) max_status = max(max_status, int(right)) except ValueError as exc: raise ValueError(f"Invalid expected status range: {item}") from exc continue try: exact.add(int(item)) except ValueError as exc: raise ValueError(f"Invalid expected status value: {item}") from exc if min_status == 999 and max_status == 0: min_status, max_status = 0, -1 return exact, min_status, max_status def services_from_data(data: dict[str, Any]) -> list[Service]: raw_services = data.get("services") if not isinstance(raw_services, list) or not raw_services: raise ValueError("Config must include a non-empty services array") services: list[Service] = [] for index, item in enumerate(raw_services, start=1): if not isinstance(item, dict): raise ValueError(f"Service #{index} must be an object") name = str(item.get("name", "")).strip() url = str(item.get("url", "")).strip() if not name or not url: raise ValueError(f"Service #{index} must include name and url") parsed = urllib.parse.urlparse(url) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError(f"Service {name} has an invalid http(s) URL") exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses")) services.append( Service( name=name, group=str(item.get("group", "Main Services")).strip() or "Main Services", url=url, display_url=str(item.get("displayUrl", url)).strip() or url, method=str(item.get("method", "GET")).strip().upper(), timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)), expected_statuses=exact, expected_min=minimum, expected_max=maximum, keyword=(str(item["keyword"]).strip() if item.get("keyword") else None), ) ) return services def load_services(path: Path) -> list[Service]: return services_from_data(load_json(path)) def save_services_config(path: Path, data: dict[str, Any]) -> None: services_from_data(data) path.parent.mkdir(parents=True, exist_ok=True) temporary = path.with_suffix(f"{path.suffix}.tmp") with temporary.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") try: temporary.replace(path) except OSError: with path.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") temporary.unlink(missing_ok=True) def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]: output: list[dict[str, Any]] = [] for service in services: expected: list[int | str] = sorted(service.expected_statuses) if service.expected_min <= service.expected_max: expected.append(f"{service.expected_min}-{service.expected_max}") item: dict[str, Any] = { "name": service.name, "group": service.group, "url": service.url, "displayUrl": service.display_url, "method": service.method, "timeoutSeconds": service.timeout, "expectedStatuses": expected or ["200-399"], } if service.keyword: item["keyword"] = service.keyword output.append(item) return output def status_expected(service: Service, status: int) -> bool: if status in service.expected_statuses: return True return service.expected_min <= status <= service.expected_max def check_service(service: Service) -> CheckResult: started = time.monotonic() headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")} request = urllib.request.Request(service.url, headers=headers, method=service.method) try: context = ssl.create_default_context() with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response: body = response.read(1_000_000) if service.keyword else b"" status = int(response.status) except urllib.error.HTTPError as exc: status = int(exc.code) latency_ms = int((time.monotonic() - started) * 1000) ok = status_expected(service, status) return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}") except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc: latency_ms = int((time.monotonic() - started) * 1000) return CheckResult(service, False, None, latency_ms, clean_error(exc)) latency_ms = int((time.monotonic() - started) * 1000) ok = status_expected(service, status) if ok and service.keyword: try: text = body.decode("utf-8", errors="ignore") except UnicodeDecodeError: text = "" if service.keyword not in text: ok = False return CheckResult(service, False, status, latency_ms, "keyword missing") return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}") def clean_error(exc: BaseException) -> str: reason = getattr(exc, "reason", None) if reason: return str(reason)[:120] return str(exc)[:120] or exc.__class__.__name__ def discord_request( method: str, token: str, path: str, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: body = None headers = { "Authorization": f"Bot {token}", "User-Agent": "ArchiveStatusBot/1.0", } if payload is not None: body = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" request = urllib.request.Request( f"{DISCORD_API}{path}", data=body, headers=headers, method=method, ) try: with urllib.request.urlopen(request, timeout=20) as response: data = response.read() if not data: return {} return json.loads(data.decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc def discord_bot_identity(token: str) -> dict[str, Any]: return discord_request("GET", token, "/users/@me") def load_state(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: with path.open("r", encoding="utf-8") as handle: data = json.load(handle) except json.JSONDecodeError: return {} return data if isinstance(data, dict) else {} def save_state(path: Path, state: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) temporary = path.with_suffix(f"{path.suffix}.tmp") with temporary.open("w", encoding="utf-8") as handle: json.dump(state, handle, indent=2, sort_keys=True) handle.write("\n") temporary.replace(path) def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]: checked_at = datetime.now(timezone.utc) online = sum(1 for result in results if result.ok) total = len(results) degraded = 0 < online < total if total == 0: color = 0x6B7280 summary = "No services configured." elif online == total: color = 0x10B981 summary = f"🟢 Operational · {online}/{total} online" elif degraded: color = 0xF59E0B offline = total - online attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention" summary = f"🟡 Degraded · {online}/{total} online · {attention}" else: color = 0xEF4444 summary = f"🔴 Outage · {online}/{total} online" groups: dict[str, list[CheckResult]] = {} for result in results: groups.setdefault(result.service.group, []).append(result) fields = [] for group_name, group_results in groups.items(): service_lines = [] state_lines = [] for result in group_results: icon = "🟢" if result.ok else "🔴" label = "Online" if result.ok else "Issue" service_lines.append(f"**{result.service.name}**") state_lines.append(f"{icon} {label}") fields.append({ "name": group_name, "value": "\n".join(service_lines)[:1024] or "None", "inline": True, }) fields.append({ "name": "Status", "value": "\n".join(state_lines)[:1024] or "None", "inline": True, }) fields.append({ "name": "\u200b", "value": "\u200b", "inline": False, }) if fields and fields[-1]["name"] == "\u200b": fields.pop() interval = os.getenv("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip() return [ { "title": "The Mithral Archive", "description": summary, "color": color, "fields": fields[:25], "footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"}, "timestamp": checked_at.isoformat(), } ] def upsert_status_message( token: str, channel_id: str, state_path: Path, results: list[CheckResult], ) -> str: state = load_state(state_path) message_id = str(state.get("message_id", "")).strip() payload = {"content": "", "embeds": render_embeds(results)} if message_id: try: discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload) return message_id except RuntimeError as exc: print(f"Could not edit existing status message, creating a new one: {exc}", file=sys.stderr) message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload) new_id = str(message.get("id", "")).strip() if not new_id: raise RuntimeError("Discord did not return a message id") save_state(state_path, {"message_id": new_id}) return new_id def fake_preview_results(services: list[Service]) -> list[CheckResult]: results: list[CheckResult] = [] for index, service in enumerate(services): results.append( CheckResult( service=service, ok=index != len(services) - 1, status=200 if index != len(services) - 1 else 502, latency_ms=42 + (index * 31), error=None if index != len(services) - 1 else "HTTP 502", ) ) return results def print_preview(services: list[Service]) -> None: payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))} print(json.dumps(payload, indent=2)) def result_to_jsonable(result: CheckResult) -> dict[str, Any]: return { "name": result.service.name, "group": result.service.group, "url": result.service.url, "displayUrl": result.service.display_url, "ok": result.ok, "status": result.status, "latencyMs": result.latency_ms, "error": result.error, } def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]: services = load_services(runtime.config_path) results = [check_service(service) for service in services] if runtime.dry_run: message_id = "dry-run" else: message_id = upsert_status_message(runtime.token, runtime.channel_id, runtime.state_path, results) with runtime.lock: runtime.last_results = results runtime.last_message_id = message_id runtime.last_checked_at = datetime.now(timezone.utc) runtime.last_error = None return message_id, results def runtime_status(runtime: BotRuntime) -> dict[str, Any]: services = load_services(runtime.config_path) with runtime.lock: results = list(runtime.last_results) return { "services": services_to_jsonable(services), "results": [result_to_jsonable(result) for result in results], "lastError": runtime.last_error, "lastMessageId": runtime.last_message_id, "lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None, "channelId": runtime.channel_id, } def bool_env(name: str, default: bool = False) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def password_hash(password: str) -> str: salt = secrets.token_bytes(16) digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS) salt_text = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=") digest_text = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") return f"pbkdf2_sha256${PBKDF2_ITERATIONS}${salt_text}${digest_text}" def decode_urlsafe_base64(value: str) -> bytes: padding = "=" * (-len(value) % 4) return base64.urlsafe_b64decode(value + padding) def verify_password_hash(encoded: str, password: str) -> bool: try: algorithm, iterations, salt_text, digest_text = encoded.split("$", 3) if algorithm != "pbkdf2_sha256": return False salt = decode_urlsafe_base64(salt_text) expected = decode_urlsafe_base64(digest_text) actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(iterations)) except (ValueError, TypeError): return False return hmac.compare_digest(actual, expected) def dashboard_auth_from_env() -> DashboardAuth | None: if bool_env("DASHBOARD_AUTH_DISABLED", False): return None username = os.getenv("DASHBOARD_USERNAME", "").strip() encoded_hash = os.getenv("DASHBOARD_PASSWORD_HASH", "").strip() if not username or not encoded_hash: return None ttl = int(os.getenv("DASHBOARD_SESSION_TTL_SECONDS", "28800")) secure = bool_env("DASHBOARD_COOKIE_SECURE", False) return DashboardAuth( DashboardAuthConfig( username=username, password_hash=encoded_hash, session_ttl_seconds=ttl, cookie_secure=secure, ) ) def print_password_hash() -> None: import getpass first = getpass.getpass("Dashboard password: ") second = getpass.getpass("Confirm password: ") if first != second: raise SystemExit("Passwords did not match") if len(first) < 12: raise SystemExit("Use at least 12 characters") print(password_hash(first)) def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]: dashboard_path = Path(__file__).with_name("dashboard.html") class DashboardHandler(BaseHTTPRequestHandler): server_version = "ArchiveStatusDashboard/1.0" def log_message(self, format: str, *args: Any) -> None: print(f"[dashboard] {self.address_string()} - {format % args}", flush=True) def do_GET(self) -> None: if self.path in {"/", "/dashboard"}: self.send_dashboard() return if self.path == "/favicon.ico": self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() return if self.path == "/api/session": session = self.require_auth() if session is None: return _session_id, data = session self.send_json( HTTPStatus.OK, { "username": data.username, "csrfToken": data.csrf_token, }, ) return if self.path == "/api/status": if self.require_auth() is None: return self.send_json(HTTPStatus.OK, runtime_status(runtime)) return self.send_error(HTTPStatus.NOT_FOUND) def do_POST(self) -> None: if self.path == "/api/login": self.handle_login() return session = self.require_auth(require_csrf=True) if session is None: return if self.path == "/api/logout": self.handle_logout(session[0]) return if self.path == "/api/check": self.handle_check() return if self.path == "/api/services": self.handle_services() return self.send_error(HTTPStatus.NOT_FOUND) def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None: if auth is None: return "disabled", DashboardSession("local", "disabled", time.time() + 3600) session = auth.session_from_cookie(self.headers.get("Cookie")) if session is None: self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"}) return None if require_csrf: csrf = self.headers.get("X-CSRF-Token", "") if not hmac.compare_digest(csrf, session[1].csrf_token): self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"}) return None return session def cookie_attributes(self, max_age: int) -> str: attrs = [ "Path=/", "HttpOnly", "SameSite=Strict", f"Max-Age={max_age}", ] if auth is not None and auth.config.cookie_secure: attrs.append("Secure") return "; ".join(attrs) def set_session_cookie(self, session_id: str) -> None: ttl = auth.config.session_ttl_seconds if auth is not None else 3600 self.send_header( "Set-Cookie", f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}", ) def clear_session_cookie(self) -> None: self.send_header( "Set-Cookie", f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}", ) def send_dashboard(self) -> None: try: body = dashboard_path.read_bytes() except FileNotFoundError: self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing") return self.send_response(HTTPStatus.OK) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def read_json(self) -> dict[str, Any]: raw_length = self.headers.get("Content-Length", "0") try: length = int(raw_length) except ValueError as exc: raise ValueError("Invalid Content-Length") from exc if length > MAX_REQUEST_BYTES: raise ValueError("Request body is too large") body = self.rfile.read(length) try: data = json.loads(body.decode("utf-8")) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON: {exc}") from exc if not isinstance(data, dict): raise ValueError("JSON body must be an object") return data def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: body = json.dumps(payload, indent=2).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def handle_login(self) -> None: if auth is None: self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"}) return try: data = self.read_json() except ValueError as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return username = str(data.get("username", "")) password = str(data.get("password", "")) throttle_key = f"{self.client_address[0]}:{username}" if not auth.login_allowed(throttle_key): self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."}) return login = auth.login(username, password) if login is None: auth.record_failed_login(throttle_key) self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"}) return auth.clear_failed_login(throttle_key) session_id, session = login payload = { "username": session.username, "csrfToken": session.csrf_token, } body = json.dumps(payload, indent=2).encode("utf-8") self.send_response(HTTPStatus.OK) self.set_session_cookie(session_id) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def handle_logout(self, session_id: str) -> None: if auth is not None: auth.logout(session_id) body = b'{\n "ok": true\n}' self.send_response(HTTPStatus.OK) self.clear_session_cookie() self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def handle_check(self) -> None: try: message_id, results = run_check_cycle(runtime) except Exception as exc: with runtime.lock: runtime.last_error = str(exc) self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)}) return self.send_json( HTTPStatus.OK, { "messageId": message_id, "results": [result_to_jsonable(result) for result in results], }, ) def handle_services(self) -> None: try: data = self.read_json() services_from_data(data) save_services_config(runtime.config_path, data) message_id, results = run_check_cycle(runtime) except Exception as exc: with runtime.lock: runtime.last_error = str(exc) self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json( HTTPStatus.OK, { "messageId": message_id, "services": data.get("services", []), "results": [result_to_jsonable(result) for result in results], }, ) return DashboardHandler def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None: if not bool_env("DASHBOARD_ENABLED", False): return None host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1" port = int(os.getenv("DASHBOARD_PORT", "8787")) auth = dashboard_auth_from_env() server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth)) thread = threading.Thread(target=server.serve_forever, name="dashboard", daemon=True) thread.start() auth_note = "without auth" if auth is None else "with password sessions" print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True) return server def main() -> int: load_dotenv() if "--hash-password" in sys.argv: print_password_hash() return 0 if "--preview" in sys.argv: config_path = Path(os.getenv("ARCHIVE_STATUS_CONFIG", "services.json")) print_preview(load_services(config_path)) return 0 token = env("DISCORD_BOT_TOKEN") token = normalize_discord_token(token) channel_id = env("DISCORD_CHANNEL_ID") config_path = Path(env("ARCHIVE_STATUS_CONFIG", "services.json")) state_path = Path(env("ARCHIVE_STATUS_STATE", "state/status-message.json")) interval = int(env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS))) runtime = BotRuntime(token, channel_id, config_path, state_path, dry_run=bool_env("DISCORD_DRY_RUN", False)) gateway = None if bool_env("DISCORD_GATEWAY_ENABLED", True) and not runtime.dry_run: gateway = DiscordGatewayManager(token) gateway.start() dashboard = maybe_start_dashboard(runtime) if runtime.dry_run: print("Discord dry run is enabled; no Discord messages will be sent or edited.", flush=True) else: try: identity = discord_bot_identity(token) username = identity.get("username", "unknown") bot_id = identity.get("id", "unknown") print(f"Discord token authenticated as {username} ({bot_id})", flush=True) except Exception as exc: print(f"Could not verify Discord bot identity: {exc}", file=sys.stderr, flush=True) stopped = False def stop(_signum: int, _frame: Any) -> None: nonlocal stopped stopped = True signal.signal(signal.SIGINT, stop) signal.signal(signal.SIGTERM, stop) while not stopped: try: message_id, results = run_check_cycle(runtime) online = sum(1 for result in results if result.ok) print(f"Updated Discord status message {message_id}: {online}/{len(results)} online", flush=True) except Exception as exc: with runtime.lock: runtime.last_error = str(exc) print(f"Status update failed: {exc}", file=sys.stderr, flush=True) for _ in range(interval): if stopped: break time.sleep(1) if dashboard is not None: dashboard.shutdown() if gateway is not None: gateway.stop() return 0 if __name__ == "__main__": raise SystemExit(main())