TheOrb/archive_bot/status.py

352 lines
12 KiB
Python

from __future__ import annotations
import json
import socket
import ssl
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .core import (
BotRuntime,
CheckResult,
DEFAULT_INTERVAL_SECONDS,
DEFAULT_TIMEOUT_SECONDS,
Service,
clean_error,
env,
load_json,
)
from .db import load_document, save_document
from .discord_api import discord_request
from .storage import channel_settings, load_state, save_state, validate_channel_id
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
if raw is None:
return set(), 200, 399
if isinstance(raw, str):
raw = [part.strip() for part in raw.split(",") if part.strip()]
if not isinstance(raw, list):
raise ValueError("expectedStatuses must be a list or comma-separated string")
exact: set[int] = set()
min_status = 999
max_status = 0
for item in raw:
if isinstance(item, int):
exact.add(item)
continue
if not isinstance(item, str):
raise ValueError("expectedStatuses entries must be integers or ranges")
if "-" in item:
left, right = item.split("-", 1)
try:
min_status = min(min_status, int(left))
max_status = max(max_status, int(right))
except ValueError as exc:
raise ValueError(f"Invalid expected status range: {item}") from exc
continue
try:
exact.add(int(item))
except ValueError as exc:
raise ValueError(f"Invalid expected status value: {item}") from exc
if min_status == 999 and max_status == 0:
min_status, max_status = 0, -1
return exact, min_status, max_status
def services_from_data(data: dict[str, Any]) -> list[Service]:
raw_services = data.get("services")
if not isinstance(raw_services, list) or not raw_services:
raise ValueError("Config must include a non-empty services array")
services: list[Service] = []
for index, item in enumerate(raw_services, start=1):
if not isinstance(item, dict):
raise ValueError(f"Service #{index} must be an object")
name = str(item.get("name", "")).strip()
url = str(item.get("url", "")).strip()
if not name or not url:
raise ValueError(f"Service #{index} must include name and url")
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError(f"Service {name} has an invalid http(s) URL")
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
services.append(
Service(
name=name,
group=str(item.get("group", "Main Services")).strip() or "Main Services",
url=url,
display_url=str(item.get("displayUrl", url)).strip() or url,
method=str(item.get("method", "GET")).strip().upper(),
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
expected_statuses=exact,
expected_min=minimum,
expected_max=maximum,
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
)
)
return services
def load_services(path: Path) -> list[Service]:
data = load_document(path)
if data is None:
data = load_json(path)
return services_from_data(data)
def save_services_config(path: Path, data: dict[str, Any]) -> None:
services_from_data(data)
save_document(path, data)
path.parent.mkdir(parents=True, exist_ok=True)
temporary = path.with_suffix(f"{path.suffix}.tmp")
with temporary.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
try:
temporary.replace(path)
except OSError:
with path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
temporary.unlink(missing_ok=True)
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for service in services:
expected: list[int | str] = sorted(service.expected_statuses)
if service.expected_min <= service.expected_max:
expected.append(f"{service.expected_min}-{service.expected_max}")
item: dict[str, Any] = {
"name": service.name,
"group": service.group,
"url": service.url,
"displayUrl": service.display_url,
"method": service.method,
"timeoutSeconds": service.timeout,
"expectedStatuses": expected or ["200-399"],
}
if service.keyword:
item["keyword"] = service.keyword
output.append(item)
return output
def status_expected(service: Service, status: int) -> bool:
if status in service.expected_statuses:
return True
return service.expected_min <= status <= service.expected_max
def check_service(service: Service) -> CheckResult:
started = time.monotonic()
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
request = urllib.request.Request(service.url, headers=headers, method=service.method)
try:
context = ssl.create_default_context()
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
body = response.read(1_000_000) if service.keyword else b""
status = int(response.status)
except urllib.error.HTTPError as exc:
status = int(exc.code)
latency_ms = int((time.monotonic() - started) * 1000)
ok = status_expected(service, status)
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
latency_ms = int((time.monotonic() - started) * 1000)
return CheckResult(service, False, None, latency_ms, clean_error(exc))
latency_ms = int((time.monotonic() - started) * 1000)
ok = status_expected(service, status)
if ok and service.keyword:
try:
text = body.decode("utf-8", errors="ignore")
except UnicodeDecodeError:
text = ""
if service.keyword not in text:
return CheckResult(service, False, status, latency_ms, "keyword missing")
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
checked_at = datetime.now(timezone.utc)
online = sum(1 for result in results if result.ok)
total = len(results)
degraded = 0 < online < total
if total == 0:
color = 0x6B7280
summary = "No services configured."
elif online == total:
color = 0x10B981
summary = f"🟢 Operational · {online}/{total} online"
elif degraded:
color = 0xF59E0B
offline = total - online
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
else:
color = 0xEF4444
summary = f"🔴 Outage · {online}/{total} online"
groups: dict[str, list[CheckResult]] = {}
for result in results:
groups.setdefault(result.service.group, []).append(result)
fields = []
for group_name, group_results in groups.items():
service_lines = []
state_lines = []
for result in group_results:
icon = "🟢" if result.ok else "🔴"
label = "Online" if result.ok else "Issue"
service_lines.append(f"**{result.service.name}**")
state_lines.append(f"{icon} {label}")
fields.append(
{
"name": group_name,
"value": "\n".join(service_lines)[:1024] or "None",
"inline": True,
}
)
fields.append(
{
"name": "Status",
"value": "\n".join(state_lines)[:1024] or "None",
"inline": True,
}
)
fields.append(
{
"name": "\u200b",
"value": "\u200b",
"inline": False,
}
)
if fields and fields[-1]["name"] == "\u200b":
fields.pop()
interval = env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
return [
{
"title": "The Mithral Archive",
"description": summary,
"color": color,
"fields": fields[:25],
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
"timestamp": checked_at.isoformat(),
}
]
def upsert_status_message(
token: str,
channel_id: str,
state_path: Path,
results: list[CheckResult],
) -> str:
state = load_state(state_path)
message_id = str(state.get("message_id", "")).strip()
payload = {"content": "", "embeds": render_embeds(results)}
if message_id:
try:
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
return message_id
except RuntimeError as exc:
print(f"Could not edit existing status message, creating a new one: {exc}", flush=True)
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
new_id = str(message.get("id", "")).strip()
if not new_id:
raise RuntimeError("Discord did not return a message id")
save_state(state_path, {"message_id": new_id})
return new_id
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
results: list[CheckResult] = []
for index, service in enumerate(services):
results.append(
CheckResult(
service=service,
ok=index != len(services) - 1,
status=200 if index != len(services) - 1 else 502,
latency_ms=42 + (index * 31),
error=None if index != len(services) - 1 else "HTTP 502",
)
)
return results
def print_preview(services: list[Service]) -> None:
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
print(json.dumps(payload, indent=2))
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
return {
"name": result.service.name,
"group": result.service.group,
"url": result.service.url,
"displayUrl": result.service.display_url,
"ok": result.ok,
"status": result.status,
"latencyMs": result.latency_ms,
"error": result.error,
}
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
services = load_services(runtime.config_path)
results = [check_service(service) for service in services]
if runtime.dry_run:
message_id = "dry-run"
else:
channel_id = validate_channel_id(channel_settings(runtime)["statusChannelId"], "Status")
message_id = upsert_status_message(runtime.token, channel_id, runtime.state_path, results)
with runtime.lock:
runtime.last_results = results
runtime.last_message_id = message_id
runtime.last_checked_at = datetime.now(timezone.utc)
runtime.last_error = None
return message_id, results
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
services = load_services(runtime.config_path)
settings = channel_settings(runtime)
with runtime.lock:
results = list(runtime.last_results)
return {
"services": services_to_jsonable(services),
"results": [result_to_jsonable(result) for result in results],
"lastError": runtime.last_error,
"lastMessageId": runtime.last_message_id,
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
"channelId": settings["statusChannelId"],
"channels": settings,
}