from __future__ import annotations import json import socket import ssl import time import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Any from .core import ( BotRuntime, CheckResult, DEFAULT_INTERVAL_SECONDS, DEFAULT_TIMEOUT_SECONDS, Service, clean_error, env, load_json, ) from .db import load_document, save_document from .discord_api import discord_request from .storage import channel_settings, load_state, save_state, validate_channel_id def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]: if raw is None: return set(), 200, 399 if isinstance(raw, str): raw = [part.strip() for part in raw.split(",") if part.strip()] if not isinstance(raw, list): raise ValueError("expectedStatuses must be a list or comma-separated string") exact: set[int] = set() min_status = 999 max_status = 0 for item in raw: if isinstance(item, int): exact.add(item) continue if not isinstance(item, str): raise ValueError("expectedStatuses entries must be integers or ranges") if "-" in item: left, right = item.split("-", 1) try: min_status = min(min_status, int(left)) max_status = max(max_status, int(right)) except ValueError as exc: raise ValueError(f"Invalid expected status range: {item}") from exc continue try: exact.add(int(item)) except ValueError as exc: raise ValueError(f"Invalid expected status value: {item}") from exc if min_status == 999 and max_status == 0: min_status, max_status = 0, -1 return exact, min_status, max_status def services_from_data(data: dict[str, Any]) -> list[Service]: raw_services = data.get("services") if not isinstance(raw_services, list) or not raw_services: raise ValueError("Config must include a non-empty services array") services: list[Service] = [] for index, item in enumerate(raw_services, start=1): if not isinstance(item, dict): raise ValueError(f"Service #{index} must be an object") name = str(item.get("name", "")).strip() url = str(item.get("url", "")).strip() if not name or not url: raise ValueError(f"Service #{index} must include name and url") parsed = urllib.parse.urlparse(url) if parsed.scheme not in {"http", "https"} or not parsed.netloc: raise ValueError(f"Service {name} has an invalid http(s) URL") exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses")) services.append( Service( name=name, group=str(item.get("group", "Main Services")).strip() or "Main Services", url=url, display_url=str(item.get("displayUrl", url)).strip() or url, method=str(item.get("method", "GET")).strip().upper(), timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)), expected_statuses=exact, expected_min=minimum, expected_max=maximum, keyword=(str(item["keyword"]).strip() if item.get("keyword") else None), ) ) return services def load_services(path: Path) -> list[Service]: data = load_document(path) if data is None: data = load_json(path) return services_from_data(data) def save_services_config(path: Path, data: dict[str, Any]) -> None: services_from_data(data) save_document(path, data) path.parent.mkdir(parents=True, exist_ok=True) temporary = path.with_suffix(f"{path.suffix}.tmp") with temporary.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") try: temporary.replace(path) except OSError: with path.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") temporary.unlink(missing_ok=True) def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]: output: list[dict[str, Any]] = [] for service in services: expected: list[int | str] = sorted(service.expected_statuses) if service.expected_min <= service.expected_max: expected.append(f"{service.expected_min}-{service.expected_max}") item: dict[str, Any] = { "name": service.name, "group": service.group, "url": service.url, "displayUrl": service.display_url, "method": service.method, "timeoutSeconds": service.timeout, "expectedStatuses": expected or ["200-399"], } if service.keyword: item["keyword"] = service.keyword output.append(item) return output def status_expected(service: Service, status: int) -> bool: if status in service.expected_statuses: return True return service.expected_min <= status <= service.expected_max def check_service(service: Service) -> CheckResult: started = time.monotonic() headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")} request = urllib.request.Request(service.url, headers=headers, method=service.method) try: context = ssl.create_default_context() with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response: body = response.read(1_000_000) if service.keyword else b"" status = int(response.status) except urllib.error.HTTPError as exc: status = int(exc.code) latency_ms = int((time.monotonic() - started) * 1000) ok = status_expected(service, status) return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}") except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc: latency_ms = int((time.monotonic() - started) * 1000) return CheckResult(service, False, None, latency_ms, clean_error(exc)) latency_ms = int((time.monotonic() - started) * 1000) ok = status_expected(service, status) if ok and service.keyword: try: text = body.decode("utf-8", errors="ignore") except UnicodeDecodeError: text = "" if service.keyword not in text: return CheckResult(service, False, status, latency_ms, "keyword missing") return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}") def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]: checked_at = datetime.now(timezone.utc) online = sum(1 for result in results if result.ok) total = len(results) degraded = 0 < online < total if total == 0: color = 0x6B7280 summary = "No services configured." elif online == total: color = 0x10B981 summary = f"🟢 Operational · {online}/{total} online" elif degraded: color = 0xF59E0B offline = total - online attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention" summary = f"🟡 Degraded · {online}/{total} online · {attention}" else: color = 0xEF4444 summary = f"🔴 Outage · {online}/{total} online" groups: dict[str, list[CheckResult]] = {} for result in results: groups.setdefault(result.service.group, []).append(result) fields = [] for group_name, group_results in groups.items(): service_lines = [] state_lines = [] for result in group_results: icon = "🟢" if result.ok else "🔴" label = "Online" if result.ok else "Issue" service_lines.append(f"**{result.service.name}**") state_lines.append(f"{icon} {label}") fields.append( { "name": group_name, "value": "\n".join(service_lines)[:1024] or "None", "inline": True, } ) fields.append( { "name": "Status", "value": "\n".join(state_lines)[:1024] or "None", "inline": True, } ) fields.append( { "name": "\u200b", "value": "\u200b", "inline": False, } ) if fields and fields[-1]["name"] == "\u200b": fields.pop() interval = env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip() return [ { "title": "The Mithral Archive", "description": summary, "color": color, "fields": fields[:25], "footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"}, "timestamp": checked_at.isoformat(), } ] def upsert_status_message( token: str, channel_id: str, state_path: Path, results: list[CheckResult], ) -> str: state = load_state(state_path) message_id = str(state.get("message_id", "")).strip() payload = {"content": "", "embeds": render_embeds(results)} if message_id: try: discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload) return message_id except RuntimeError as exc: print(f"Could not edit existing status message, creating a new one: {exc}", flush=True) message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload) new_id = str(message.get("id", "")).strip() if not new_id: raise RuntimeError("Discord did not return a message id") save_state(state_path, {"message_id": new_id}) return new_id def fake_preview_results(services: list[Service]) -> list[CheckResult]: results: list[CheckResult] = [] for index, service in enumerate(services): results.append( CheckResult( service=service, ok=index != len(services) - 1, status=200 if index != len(services) - 1 else 502, latency_ms=42 + (index * 31), error=None if index != len(services) - 1 else "HTTP 502", ) ) return results def print_preview(services: list[Service]) -> None: payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))} print(json.dumps(payload, indent=2)) def result_to_jsonable(result: CheckResult) -> dict[str, Any]: return { "name": result.service.name, "group": result.service.group, "url": result.service.url, "displayUrl": result.service.display_url, "ok": result.ok, "status": result.status, "latencyMs": result.latency_ms, "error": result.error, } def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]: services = load_services(runtime.config_path) results = [check_service(service) for service in services] if runtime.dry_run: message_id = "dry-run" else: channel_id = validate_channel_id(channel_settings(runtime)["statusChannelId"], "Status") message_id = upsert_status_message(runtime.token, channel_id, runtime.state_path, results) with runtime.lock: runtime.last_results = results runtime.last_message_id = message_id runtime.last_checked_at = datetime.now(timezone.utc) runtime.last_error = None return message_id, results def runtime_status(runtime: BotRuntime) -> dict[str, Any]: services = load_services(runtime.config_path) settings = channel_settings(runtime) with runtime.lock: results = list(runtime.last_results) return { "services": services_to_jsonable(services), "results": [result_to_jsonable(result) for result in results], "lastError": runtime.last_error, "lastMessageId": runtime.last_message_id, "lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None, "channelId": settings["statusChannelId"], "channels": settings, }