352 lines
12 KiB
Python
352 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import socket
|
|
import ssl
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .core import (
|
|
BotRuntime,
|
|
CheckResult,
|
|
DEFAULT_INTERVAL_SECONDS,
|
|
DEFAULT_TIMEOUT_SECONDS,
|
|
Service,
|
|
clean_error,
|
|
env,
|
|
load_json,
|
|
)
|
|
from .db import load_document, save_document
|
|
from .discord_api import discord_request
|
|
from .storage import channel_settings, load_state, save_state, validate_channel_id
|
|
|
|
|
|
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
|
|
if raw is None:
|
|
return set(), 200, 399
|
|
|
|
if isinstance(raw, str):
|
|
raw = [part.strip() for part in raw.split(",") if part.strip()]
|
|
|
|
if not isinstance(raw, list):
|
|
raise ValueError("expectedStatuses must be a list or comma-separated string")
|
|
|
|
exact: set[int] = set()
|
|
min_status = 999
|
|
max_status = 0
|
|
|
|
for item in raw:
|
|
if isinstance(item, int):
|
|
exact.add(item)
|
|
continue
|
|
if not isinstance(item, str):
|
|
raise ValueError("expectedStatuses entries must be integers or ranges")
|
|
if "-" in item:
|
|
left, right = item.split("-", 1)
|
|
try:
|
|
min_status = min(min_status, int(left))
|
|
max_status = max(max_status, int(right))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status range: {item}") from exc
|
|
continue
|
|
try:
|
|
exact.add(int(item))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status value: {item}") from exc
|
|
|
|
if min_status == 999 and max_status == 0:
|
|
min_status, max_status = 0, -1
|
|
return exact, min_status, max_status
|
|
|
|
|
|
def services_from_data(data: dict[str, Any]) -> list[Service]:
|
|
raw_services = data.get("services")
|
|
if not isinstance(raw_services, list) or not raw_services:
|
|
raise ValueError("Config must include a non-empty services array")
|
|
|
|
services: list[Service] = []
|
|
for index, item in enumerate(raw_services, start=1):
|
|
if not isinstance(item, dict):
|
|
raise ValueError(f"Service #{index} must be an object")
|
|
|
|
name = str(item.get("name", "")).strip()
|
|
url = str(item.get("url", "")).strip()
|
|
if not name or not url:
|
|
raise ValueError(f"Service #{index} must include name and url")
|
|
|
|
parsed = urllib.parse.urlparse(url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise ValueError(f"Service {name} has an invalid http(s) URL")
|
|
|
|
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
|
|
services.append(
|
|
Service(
|
|
name=name,
|
|
group=str(item.get("group", "Main Services")).strip() or "Main Services",
|
|
url=url,
|
|
display_url=str(item.get("displayUrl", url)).strip() or url,
|
|
method=str(item.get("method", "GET")).strip().upper(),
|
|
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
|
|
expected_statuses=exact,
|
|
expected_min=minimum,
|
|
expected_max=maximum,
|
|
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
|
|
)
|
|
)
|
|
|
|
return services
|
|
|
|
|
|
def load_services(path: Path) -> list[Service]:
|
|
data = load_document(path)
|
|
if data is None:
|
|
data = load_json(path)
|
|
return services_from_data(data)
|
|
|
|
|
|
def save_services_config(path: Path, data: dict[str, Any]) -> None:
|
|
services_from_data(data)
|
|
save_document(path, data)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
try:
|
|
temporary.replace(path)
|
|
except OSError:
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
temporary.unlink(missing_ok=True)
|
|
|
|
|
|
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for service in services:
|
|
expected: list[int | str] = sorted(service.expected_statuses)
|
|
if service.expected_min <= service.expected_max:
|
|
expected.append(f"{service.expected_min}-{service.expected_max}")
|
|
item: dict[str, Any] = {
|
|
"name": service.name,
|
|
"group": service.group,
|
|
"url": service.url,
|
|
"displayUrl": service.display_url,
|
|
"method": service.method,
|
|
"timeoutSeconds": service.timeout,
|
|
"expectedStatuses": expected or ["200-399"],
|
|
}
|
|
if service.keyword:
|
|
item["keyword"] = service.keyword
|
|
output.append(item)
|
|
return output
|
|
|
|
|
|
def status_expected(service: Service, status: int) -> bool:
|
|
if status in service.expected_statuses:
|
|
return True
|
|
return service.expected_min <= status <= service.expected_max
|
|
|
|
|
|
def check_service(service: Service) -> CheckResult:
|
|
started = time.monotonic()
|
|
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
|
|
request = urllib.request.Request(service.url, headers=headers, method=service.method)
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
|
|
body = response.read(1_000_000) if service.keyword else b""
|
|
status = int(response.status)
|
|
except urllib.error.HTTPError as exc:
|
|
status = int(exc.code)
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
return CheckResult(service, False, None, latency_ms, clean_error(exc))
|
|
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
|
|
if ok and service.keyword:
|
|
try:
|
|
text = body.decode("utf-8", errors="ignore")
|
|
except UnicodeDecodeError:
|
|
text = ""
|
|
if service.keyword not in text:
|
|
return CheckResult(service, False, status, latency_ms, "keyword missing")
|
|
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
|
|
|
|
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
|
|
checked_at = datetime.now(timezone.utc)
|
|
online = sum(1 for result in results if result.ok)
|
|
total = len(results)
|
|
degraded = 0 < online < total
|
|
|
|
if total == 0:
|
|
color = 0x6B7280
|
|
summary = "No services configured."
|
|
elif online == total:
|
|
color = 0x10B981
|
|
summary = f"🟢 Operational · {online}/{total} online"
|
|
elif degraded:
|
|
color = 0xF59E0B
|
|
offline = total - online
|
|
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
|
|
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
|
|
else:
|
|
color = 0xEF4444
|
|
summary = f"🔴 Outage · {online}/{total} online"
|
|
|
|
groups: dict[str, list[CheckResult]] = {}
|
|
for result in results:
|
|
groups.setdefault(result.service.group, []).append(result)
|
|
|
|
fields = []
|
|
for group_name, group_results in groups.items():
|
|
service_lines = []
|
|
state_lines = []
|
|
for result in group_results:
|
|
icon = "🟢" if result.ok else "🔴"
|
|
label = "Online" if result.ok else "Issue"
|
|
service_lines.append(f"**{result.service.name}**")
|
|
state_lines.append(f"{icon} {label}")
|
|
|
|
fields.append(
|
|
{
|
|
"name": group_name,
|
|
"value": "\n".join(service_lines)[:1024] or "None",
|
|
"inline": True,
|
|
}
|
|
)
|
|
fields.append(
|
|
{
|
|
"name": "Status",
|
|
"value": "\n".join(state_lines)[:1024] or "None",
|
|
"inline": True,
|
|
}
|
|
)
|
|
fields.append(
|
|
{
|
|
"name": "\u200b",
|
|
"value": "\u200b",
|
|
"inline": False,
|
|
}
|
|
)
|
|
|
|
if fields and fields[-1]["name"] == "\u200b":
|
|
fields.pop()
|
|
|
|
interval = env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
|
|
return [
|
|
{
|
|
"title": "The Mithral Archive",
|
|
"description": summary,
|
|
"color": color,
|
|
"fields": fields[:25],
|
|
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
|
|
"timestamp": checked_at.isoformat(),
|
|
}
|
|
]
|
|
|
|
|
|
def upsert_status_message(
|
|
token: str,
|
|
channel_id: str,
|
|
state_path: Path,
|
|
results: list[CheckResult],
|
|
) -> str:
|
|
state = load_state(state_path)
|
|
message_id = str(state.get("message_id", "")).strip()
|
|
payload = {"content": "", "embeds": render_embeds(results)}
|
|
|
|
if message_id:
|
|
try:
|
|
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
|
|
return message_id
|
|
except RuntimeError as exc:
|
|
print(f"Could not edit existing status message, creating a new one: {exc}", flush=True)
|
|
|
|
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
|
|
new_id = str(message.get("id", "")).strip()
|
|
if not new_id:
|
|
raise RuntimeError("Discord did not return a message id")
|
|
|
|
save_state(state_path, {"message_id": new_id})
|
|
return new_id
|
|
|
|
|
|
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
|
|
results: list[CheckResult] = []
|
|
for index, service in enumerate(services):
|
|
results.append(
|
|
CheckResult(
|
|
service=service,
|
|
ok=index != len(services) - 1,
|
|
status=200 if index != len(services) - 1 else 502,
|
|
latency_ms=42 + (index * 31),
|
|
error=None if index != len(services) - 1 else "HTTP 502",
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
def print_preview(services: list[Service]) -> None:
|
|
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
|
|
print(json.dumps(payload, indent=2))
|
|
|
|
|
|
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
|
|
return {
|
|
"name": result.service.name,
|
|
"group": result.service.group,
|
|
"url": result.service.url,
|
|
"displayUrl": result.service.display_url,
|
|
"ok": result.ok,
|
|
"status": result.status,
|
|
"latencyMs": result.latency_ms,
|
|
"error": result.error,
|
|
}
|
|
|
|
|
|
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
|
|
services = load_services(runtime.config_path)
|
|
results = [check_service(service) for service in services]
|
|
if runtime.dry_run:
|
|
message_id = "dry-run"
|
|
else:
|
|
channel_id = validate_channel_id(channel_settings(runtime)["statusChannelId"], "Status")
|
|
message_id = upsert_status_message(runtime.token, channel_id, runtime.state_path, results)
|
|
|
|
with runtime.lock:
|
|
runtime.last_results = results
|
|
runtime.last_message_id = message_id
|
|
runtime.last_checked_at = datetime.now(timezone.utc)
|
|
runtime.last_error = None
|
|
|
|
return message_id, results
|
|
|
|
|
|
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
services = load_services(runtime.config_path)
|
|
settings = channel_settings(runtime)
|
|
with runtime.lock:
|
|
results = list(runtime.last_results)
|
|
return {
|
|
"services": services_to_jsonable(services),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
"lastError": runtime.last_error,
|
|
"lastMessageId": runtime.last_message_id,
|
|
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
|
|
"channelId": settings["statusChannelId"],
|
|
"channels": settings,
|
|
}
|