938 lines
32 KiB
Python
938 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""Live Discord status message for The Mithral Archive."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import secrets
|
|
import signal
|
|
import socket
|
|
import ssl
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from http.cookies import SimpleCookie
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DISCORD_API = "https://discord.com/api/v10"
|
|
DEFAULT_INTERVAL_SECONDS = 60
|
|
DEFAULT_TIMEOUT_SECONDS = 10
|
|
MAX_DISCORD_EMBEDS = 10
|
|
MAX_REQUEST_BYTES = 1_000_000
|
|
SESSION_COOKIE = "archive_bot_session"
|
|
PBKDF2_ITERATIONS = 390_000
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Service:
|
|
name: str
|
|
url: str
|
|
display_url: str
|
|
method: str
|
|
timeout: float
|
|
expected_statuses: set[int]
|
|
expected_min: int
|
|
expected_max: int
|
|
keyword: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CheckResult:
|
|
service: Service
|
|
ok: bool
|
|
status: int | None
|
|
latency_ms: int | None
|
|
error: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DashboardAuthConfig:
|
|
username: str
|
|
password_hash: str
|
|
session_ttl_seconds: int
|
|
cookie_secure: bool
|
|
|
|
|
|
@dataclass
|
|
class DashboardSession:
|
|
username: str
|
|
csrf_token: str
|
|
expires_at: float
|
|
|
|
|
|
class DashboardAuth:
|
|
def __init__(self, config: DashboardAuthConfig) -> None:
|
|
self.config = config
|
|
self.lock = threading.Lock()
|
|
self.sessions: dict[str, DashboardSession] = {}
|
|
self.failed_logins: dict[str, list[float]] = {}
|
|
|
|
def login_allowed(self, key: str) -> bool:
|
|
now = time.time()
|
|
window_start = now - 900
|
|
with self.lock:
|
|
attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start]
|
|
self.failed_logins[key] = attempts
|
|
return len(attempts) < 10
|
|
|
|
def record_failed_login(self, key: str) -> None:
|
|
now = time.time()
|
|
with self.lock:
|
|
self.failed_logins.setdefault(key, []).append(now)
|
|
|
|
def clear_failed_login(self, key: str) -> None:
|
|
with self.lock:
|
|
self.failed_logins.pop(key, None)
|
|
|
|
def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None:
|
|
if not hmac.compare_digest(username, self.config.username):
|
|
return None
|
|
if not verify_password_hash(self.config.password_hash, password):
|
|
return None
|
|
|
|
session_id = secrets.token_urlsafe(32)
|
|
session = DashboardSession(
|
|
username=username,
|
|
csrf_token=secrets.token_urlsafe(32),
|
|
expires_at=time.time() + self.config.session_ttl_seconds,
|
|
)
|
|
with self.lock:
|
|
self.sessions[session_id] = session
|
|
return session_id, session
|
|
|
|
def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None:
|
|
if not cookie_header:
|
|
return None
|
|
cookie = SimpleCookie()
|
|
cookie.load(cookie_header)
|
|
morsel = cookie.get(SESSION_COOKIE)
|
|
if morsel is None:
|
|
return None
|
|
|
|
session_id = morsel.value
|
|
now = time.time()
|
|
with self.lock:
|
|
session = self.sessions.get(session_id)
|
|
if session is None:
|
|
return None
|
|
if session.expires_at <= now:
|
|
self.sessions.pop(session_id, None)
|
|
return None
|
|
session.expires_at = now + self.config.session_ttl_seconds
|
|
return session_id, session
|
|
|
|
def logout(self, session_id: str) -> None:
|
|
with self.lock:
|
|
self.sessions.pop(session_id, None)
|
|
|
|
|
|
class BotRuntime:
|
|
def __init__(
|
|
self,
|
|
token: str,
|
|
channel_id: str,
|
|
config_path: Path,
|
|
state_path: Path,
|
|
dry_run: bool = False,
|
|
) -> None:
|
|
self.token = token
|
|
self.channel_id = channel_id
|
|
self.config_path = config_path
|
|
self.state_path = state_path
|
|
self.dry_run = dry_run
|
|
self.lock = threading.Lock()
|
|
self.last_results: list[CheckResult] = []
|
|
self.last_error: str | None = None
|
|
self.last_message_id: str | None = None
|
|
self.last_checked_at: datetime | None = None
|
|
|
|
|
|
def env(name: str, default: str | None = None) -> str:
|
|
value = os.getenv(name, default)
|
|
if value is None or not value.strip():
|
|
raise SystemExit(f"Missing required environment variable: {name}")
|
|
return value.strip()
|
|
|
|
|
|
def normalize_discord_token(token: str) -> str:
|
|
cleaned = token.strip().strip("\"'")
|
|
if cleaned.lower().startswith("bot "):
|
|
cleaned = cleaned[4:].strip()
|
|
return cleaned
|
|
|
|
|
|
def load_dotenv(path: Path = Path(".env")) -> None:
|
|
if not path.exists():
|
|
return
|
|
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
|
continue
|
|
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip("\"'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except FileNotFoundError as exc:
|
|
raise ValueError(f"Config file not found: {path}") from exc
|
|
except PermissionError as exc:
|
|
raise ValueError(f"Config file is not readable: {path}") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
|
|
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"Config must be a JSON object: {path}")
|
|
return data
|
|
|
|
|
|
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
|
|
if raw is None:
|
|
return set(), 200, 399
|
|
|
|
if isinstance(raw, str):
|
|
raw = [part.strip() for part in raw.split(",") if part.strip()]
|
|
|
|
if not isinstance(raw, list):
|
|
raise ValueError("expectedStatuses must be a list or comma-separated string")
|
|
|
|
exact: set[int] = set()
|
|
min_status = 999
|
|
max_status = 0
|
|
|
|
for item in raw:
|
|
if isinstance(item, int):
|
|
exact.add(item)
|
|
continue
|
|
if not isinstance(item, str):
|
|
raise ValueError("expectedStatuses entries must be integers or ranges")
|
|
if "-" in item:
|
|
left, right = item.split("-", 1)
|
|
try:
|
|
min_status = min(min_status, int(left))
|
|
max_status = max(max_status, int(right))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status range: {item}") from exc
|
|
continue
|
|
try:
|
|
exact.add(int(item))
|
|
except ValueError as exc:
|
|
raise ValueError(f"Invalid expected status value: {item}") from exc
|
|
|
|
if min_status == 999 and max_status == 0:
|
|
min_status, max_status = 0, -1
|
|
return exact, min_status, max_status
|
|
|
|
|
|
def services_from_data(data: dict[str, Any]) -> list[Service]:
|
|
raw_services = data.get("services")
|
|
if not isinstance(raw_services, list) or not raw_services:
|
|
raise ValueError("Config must include a non-empty services array")
|
|
|
|
services: list[Service] = []
|
|
for index, item in enumerate(raw_services, start=1):
|
|
if not isinstance(item, dict):
|
|
raise ValueError(f"Service #{index} must be an object")
|
|
|
|
name = str(item.get("name", "")).strip()
|
|
url = str(item.get("url", "")).strip()
|
|
if not name or not url:
|
|
raise ValueError(f"Service #{index} must include name and url")
|
|
|
|
parsed = urllib.parse.urlparse(url)
|
|
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
|
|
raise ValueError(f"Service {name} has an invalid http(s) URL")
|
|
|
|
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
|
|
services.append(
|
|
Service(
|
|
name=name,
|
|
url=url,
|
|
display_url=str(item.get("displayUrl", url)).strip() or url,
|
|
method=str(item.get("method", "GET")).strip().upper(),
|
|
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
|
|
expected_statuses=exact,
|
|
expected_min=minimum,
|
|
expected_max=maximum,
|
|
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
|
|
)
|
|
)
|
|
|
|
return services
|
|
|
|
|
|
def load_services(path: Path) -> list[Service]:
|
|
return services_from_data(load_json(path))
|
|
|
|
|
|
def save_services_config(path: Path, data: dict[str, Any]) -> None:
|
|
services_from_data(data)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
try:
|
|
temporary.replace(path)
|
|
except OSError:
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(data, handle, indent=2)
|
|
handle.write("\n")
|
|
temporary.unlink(missing_ok=True)
|
|
|
|
|
|
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for service in services:
|
|
expected: list[int | str] = sorted(service.expected_statuses)
|
|
if service.expected_min <= service.expected_max:
|
|
expected.append(f"{service.expected_min}-{service.expected_max}")
|
|
item: dict[str, Any] = {
|
|
"name": service.name,
|
|
"url": service.url,
|
|
"displayUrl": service.display_url,
|
|
"method": service.method,
|
|
"timeoutSeconds": service.timeout,
|
|
"expectedStatuses": expected or ["200-399"],
|
|
}
|
|
if service.keyword:
|
|
item["keyword"] = service.keyword
|
|
output.append(item)
|
|
return output
|
|
|
|
|
|
def status_expected(service: Service, status: int) -> bool:
|
|
if status in service.expected_statuses:
|
|
return True
|
|
return service.expected_min <= status <= service.expected_max
|
|
|
|
|
|
def check_service(service: Service) -> CheckResult:
|
|
started = time.monotonic()
|
|
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
|
|
request = urllib.request.Request(service.url, headers=headers, method=service.method)
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
|
|
body = response.read(1_000_000) if service.keyword else b""
|
|
status = int(response.status)
|
|
except urllib.error.HTTPError as exc:
|
|
status = int(exc.code)
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
return CheckResult(service, False, None, latency_ms, clean_error(exc))
|
|
|
|
latency_ms = int((time.monotonic() - started) * 1000)
|
|
ok = status_expected(service, status)
|
|
|
|
if ok and service.keyword:
|
|
try:
|
|
text = body.decode("utf-8", errors="ignore")
|
|
except UnicodeDecodeError:
|
|
text = ""
|
|
if service.keyword not in text:
|
|
ok = False
|
|
return CheckResult(service, False, status, latency_ms, "keyword missing")
|
|
|
|
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
|
|
|
|
|
|
def clean_error(exc: BaseException) -> str:
|
|
reason = getattr(exc, "reason", None)
|
|
if reason:
|
|
return str(reason)[:120]
|
|
return str(exc)[:120] or exc.__class__.__name__
|
|
|
|
|
|
def discord_request(
|
|
method: str,
|
|
token: str,
|
|
path: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
body = None
|
|
headers = {
|
|
"Authorization": f"Bot {token}",
|
|
"User-Agent": "ArchiveStatusBot/1.0",
|
|
}
|
|
|
|
if payload is not None:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
request = urllib.request.Request(
|
|
f"{DISCORD_API}{path}",
|
|
data=body,
|
|
headers=headers,
|
|
method=method,
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=20) as response:
|
|
data = response.read()
|
|
if not data:
|
|
return {}
|
|
return json.loads(data.decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="ignore")
|
|
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
|
|
|
|
|
|
def discord_bot_identity(token: str) -> dict[str, Any]:
|
|
return discord_request("GET", token, "/users/@me")
|
|
|
|
|
|
def load_state(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def save_state(path: Path, state: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
temporary = path.with_suffix(f"{path.suffix}.tmp")
|
|
with temporary.open("w", encoding="utf-8") as handle:
|
|
json.dump(state, handle, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
temporary.replace(path)
|
|
|
|
|
|
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
|
|
checked_at = datetime.now(timezone.utc)
|
|
online = sum(1 for result in results if result.ok)
|
|
total = len(results)
|
|
degraded = 0 < online < total
|
|
|
|
if online == total:
|
|
color = 0x10B981
|
|
summary = f"🟢 Operational · {online}/{total} online"
|
|
elif degraded:
|
|
color = 0xF59E0B
|
|
offline = total - online
|
|
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
|
|
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
|
|
else:
|
|
color = 0xEF4444
|
|
summary = f"🔴 Outage · {online}/{total} online"
|
|
|
|
service_lines = []
|
|
state_lines = []
|
|
for result in results:
|
|
icon = "🟢" if result.ok else "🔴"
|
|
label = "Online" if result.ok else "Issue"
|
|
|
|
service_lines.append(f"**{result.service.name}**")
|
|
state_lines.append(f"{icon} {label}")
|
|
|
|
interval = os.getenv("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
|
|
return [
|
|
{
|
|
"title": "The Mithral Archive",
|
|
"description": summary,
|
|
"color": color,
|
|
"fields": [
|
|
{
|
|
"name": "Service",
|
|
"value": "\n".join(service_lines)[:1024] or "No services configured.",
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Status",
|
|
"value": "\n".join(state_lines)[:1024] or "n/a",
|
|
"inline": True,
|
|
}
|
|
],
|
|
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
|
|
"timestamp": checked_at.isoformat(),
|
|
}
|
|
]
|
|
|
|
|
|
def upsert_status_message(
|
|
token: str,
|
|
channel_id: str,
|
|
state_path: Path,
|
|
results: list[CheckResult],
|
|
) -> str:
|
|
state = load_state(state_path)
|
|
message_id = str(state.get("message_id", "")).strip()
|
|
payload = {"content": "", "embeds": render_embeds(results)}
|
|
|
|
if message_id:
|
|
try:
|
|
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
|
|
return message_id
|
|
except RuntimeError as exc:
|
|
print(f"Could not edit existing status message, creating a new one: {exc}", file=sys.stderr)
|
|
|
|
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
|
|
new_id = str(message.get("id", "")).strip()
|
|
if not new_id:
|
|
raise RuntimeError("Discord did not return a message id")
|
|
|
|
save_state(state_path, {"message_id": new_id})
|
|
return new_id
|
|
|
|
|
|
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
|
|
results: list[CheckResult] = []
|
|
for index, service in enumerate(services):
|
|
results.append(
|
|
CheckResult(
|
|
service=service,
|
|
ok=index != len(services) - 1,
|
|
status=200 if index != len(services) - 1 else 502,
|
|
latency_ms=42 + (index * 31),
|
|
error=None if index != len(services) - 1 else "HTTP 502",
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
def print_preview(services: list[Service]) -> None:
|
|
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
|
|
print(json.dumps(payload, indent=2))
|
|
|
|
|
|
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
|
|
return {
|
|
"name": result.service.name,
|
|
"url": result.service.url,
|
|
"displayUrl": result.service.display_url,
|
|
"ok": result.ok,
|
|
"status": result.status,
|
|
"latencyMs": result.latency_ms,
|
|
"error": result.error,
|
|
}
|
|
|
|
|
|
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
|
|
services = load_services(runtime.config_path)
|
|
results = [check_service(service) for service in services]
|
|
if runtime.dry_run:
|
|
message_id = "dry-run"
|
|
else:
|
|
message_id = upsert_status_message(runtime.token, runtime.channel_id, runtime.state_path, results)
|
|
|
|
with runtime.lock:
|
|
runtime.last_results = results
|
|
runtime.last_message_id = message_id
|
|
runtime.last_checked_at = datetime.now(timezone.utc)
|
|
runtime.last_error = None
|
|
|
|
return message_id, results
|
|
|
|
|
|
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
|
|
services = load_services(runtime.config_path)
|
|
with runtime.lock:
|
|
results = list(runtime.last_results)
|
|
return {
|
|
"services": services_to_jsonable(services),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
"lastError": runtime.last_error,
|
|
"lastMessageId": runtime.last_message_id,
|
|
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
|
|
"channelId": runtime.channel_id,
|
|
}
|
|
|
|
|
|
def bool_env(name: str, default: bool = False) -> bool:
|
|
raw = os.getenv(name)
|
|
if raw is None:
|
|
return default
|
|
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def password_hash(password: str) -> str:
|
|
salt = secrets.token_bytes(16)
|
|
digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS)
|
|
salt_text = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=")
|
|
digest_text = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
|
|
return f"pbkdf2_sha256${PBKDF2_ITERATIONS}${salt_text}${digest_text}"
|
|
|
|
|
|
def decode_urlsafe_base64(value: str) -> bytes:
|
|
padding = "=" * (-len(value) % 4)
|
|
return base64.urlsafe_b64decode(value + padding)
|
|
|
|
|
|
def verify_password_hash(encoded: str, password: str) -> bool:
|
|
try:
|
|
algorithm, iterations, salt_text, digest_text = encoded.split("$", 3)
|
|
if algorithm != "pbkdf2_sha256":
|
|
return False
|
|
salt = decode_urlsafe_base64(salt_text)
|
|
expected = decode_urlsafe_base64(digest_text)
|
|
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(iterations))
|
|
except (ValueError, TypeError):
|
|
return False
|
|
return hmac.compare_digest(actual, expected)
|
|
|
|
|
|
def dashboard_auth_from_env() -> DashboardAuth | None:
|
|
if bool_env("DASHBOARD_AUTH_DISABLED", False):
|
|
return None
|
|
|
|
username = env("DASHBOARD_USERNAME")
|
|
encoded_hash = env("DASHBOARD_PASSWORD_HASH")
|
|
ttl = int(os.getenv("DASHBOARD_SESSION_TTL_SECONDS", "28800"))
|
|
secure = bool_env("DASHBOARD_COOKIE_SECURE", False)
|
|
return DashboardAuth(
|
|
DashboardAuthConfig(
|
|
username=username,
|
|
password_hash=encoded_hash,
|
|
session_ttl_seconds=ttl,
|
|
cookie_secure=secure,
|
|
)
|
|
)
|
|
|
|
|
|
def print_password_hash() -> None:
|
|
import getpass
|
|
|
|
first = getpass.getpass("Dashboard password: ")
|
|
second = getpass.getpass("Confirm password: ")
|
|
if first != second:
|
|
raise SystemExit("Passwords did not match")
|
|
if len(first) < 12:
|
|
raise SystemExit("Use at least 12 characters")
|
|
print(password_hash(first))
|
|
|
|
|
|
def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]:
|
|
dashboard_path = Path(__file__).with_name("dashboard.html")
|
|
|
|
class DashboardHandler(BaseHTTPRequestHandler):
|
|
server_version = "ArchiveStatusDashboard/1.0"
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
print(f"[dashboard] {self.address_string()} - {format % args}", flush=True)
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path in {"/", "/dashboard"}:
|
|
self.send_dashboard()
|
|
return
|
|
if self.path == "/favicon.ico":
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
return
|
|
if self.path == "/api/session":
|
|
session = self.require_auth()
|
|
if session is None:
|
|
return
|
|
_session_id, data = session
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"username": data.username,
|
|
"csrfToken": data.csrf_token,
|
|
},
|
|
)
|
|
return
|
|
if self.path == "/api/status":
|
|
if self.require_auth() is None:
|
|
return
|
|
self.send_json(HTTPStatus.OK, runtime_status(runtime))
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def do_POST(self) -> None:
|
|
if self.path == "/api/login":
|
|
self.handle_login()
|
|
return
|
|
session = self.require_auth(require_csrf=True)
|
|
if session is None:
|
|
return
|
|
if self.path == "/api/logout":
|
|
self.handle_logout(session[0])
|
|
return
|
|
if self.path == "/api/check":
|
|
self.handle_check()
|
|
return
|
|
if self.path == "/api/services":
|
|
self.handle_services()
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None:
|
|
if auth is None:
|
|
return "disabled", DashboardSession("local", "disabled", time.time() + 3600)
|
|
|
|
session = auth.session_from_cookie(self.headers.get("Cookie"))
|
|
if session is None:
|
|
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"})
|
|
return None
|
|
|
|
if require_csrf:
|
|
csrf = self.headers.get("X-CSRF-Token", "")
|
|
if not hmac.compare_digest(csrf, session[1].csrf_token):
|
|
self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"})
|
|
return None
|
|
|
|
return session
|
|
|
|
def cookie_attributes(self, max_age: int) -> str:
|
|
attrs = [
|
|
"Path=/",
|
|
"HttpOnly",
|
|
"SameSite=Strict",
|
|
f"Max-Age={max_age}",
|
|
]
|
|
if auth is not None and auth.config.cookie_secure:
|
|
attrs.append("Secure")
|
|
return "; ".join(attrs)
|
|
|
|
def set_session_cookie(self, session_id: str) -> None:
|
|
ttl = auth.config.session_ttl_seconds if auth is not None else 3600
|
|
self.send_header(
|
|
"Set-Cookie",
|
|
f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}",
|
|
)
|
|
|
|
def clear_session_cookie(self) -> None:
|
|
self.send_header(
|
|
"Set-Cookie",
|
|
f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}",
|
|
)
|
|
|
|
def send_dashboard(self) -> None:
|
|
try:
|
|
body = dashboard_path.read_bytes()
|
|
except FileNotFoundError:
|
|
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing")
|
|
return
|
|
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def read_json(self) -> dict[str, Any]:
|
|
raw_length = self.headers.get("Content-Length", "0")
|
|
try:
|
|
length = int(raw_length)
|
|
except ValueError as exc:
|
|
raise ValueError("Invalid Content-Length") from exc
|
|
if length > MAX_REQUEST_BYTES:
|
|
raise ValueError("Request body is too large")
|
|
|
|
body = self.rfile.read(length)
|
|
try:
|
|
data = json.loads(body.decode("utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON: {exc}") from exc
|
|
if not isinstance(data, dict):
|
|
raise ValueError("JSON body must be an object")
|
|
return data
|
|
|
|
def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_login(self) -> None:
|
|
if auth is None:
|
|
self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"})
|
|
return
|
|
|
|
try:
|
|
data = self.read_json()
|
|
except ValueError as exc:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
username = str(data.get("username", ""))
|
|
password = str(data.get("password", ""))
|
|
throttle_key = f"{self.client_address[0]}:{username}"
|
|
if not auth.login_allowed(throttle_key):
|
|
self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."})
|
|
return
|
|
|
|
login = auth.login(username, password)
|
|
if login is None:
|
|
auth.record_failed_login(throttle_key)
|
|
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"})
|
|
return
|
|
|
|
auth.clear_failed_login(throttle_key)
|
|
session_id, session = login
|
|
payload = {
|
|
"username": session.username,
|
|
"csrfToken": session.csrf_token,
|
|
}
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(HTTPStatus.OK)
|
|
self.set_session_cookie(session_id)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_logout(self, session_id: str) -> None:
|
|
if auth is not None:
|
|
auth.logout(session_id)
|
|
body = b'{\n "ok": true\n}'
|
|
self.send_response(HTTPStatus.OK)
|
|
self.clear_session_cookie()
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def handle_check(self) -> None:
|
|
try:
|
|
message_id, results = run_check_cycle(runtime)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"messageId": message_id,
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
},
|
|
)
|
|
|
|
def handle_services(self) -> None:
|
|
try:
|
|
data = self.read_json()
|
|
services_from_data(data)
|
|
save_services_config(runtime.config_path, data)
|
|
message_id, results = run_check_cycle(runtime)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
|
|
return
|
|
|
|
self.send_json(
|
|
HTTPStatus.OK,
|
|
{
|
|
"messageId": message_id,
|
|
"services": data.get("services", []),
|
|
"results": [result_to_jsonable(result) for result in results],
|
|
},
|
|
)
|
|
|
|
return DashboardHandler
|
|
|
|
|
|
def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None:
|
|
if not bool_env("DASHBOARD_ENABLED", False):
|
|
return None
|
|
|
|
host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1"
|
|
port = int(os.getenv("DASHBOARD_PORT", "8787"))
|
|
auth = dashboard_auth_from_env()
|
|
server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth))
|
|
thread = threading.Thread(target=server.serve_forever, name="dashboard", daemon=True)
|
|
thread.start()
|
|
auth_note = "without auth" if auth is None else "with password sessions"
|
|
print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True)
|
|
return server
|
|
|
|
|
|
def main() -> int:
|
|
load_dotenv()
|
|
|
|
if "--hash-password" in sys.argv:
|
|
print_password_hash()
|
|
return 0
|
|
|
|
if "--preview" in sys.argv:
|
|
config_path = Path(os.getenv("ARCHIVE_STATUS_CONFIG", "services.json"))
|
|
print_preview(load_services(config_path))
|
|
return 0
|
|
|
|
token = env("DISCORD_BOT_TOKEN")
|
|
token = normalize_discord_token(token)
|
|
channel_id = env("DISCORD_CHANNEL_ID")
|
|
config_path = Path(env("ARCHIVE_STATUS_CONFIG", "services.json"))
|
|
state_path = Path(env("ARCHIVE_STATUS_STATE", "state/status-message.json"))
|
|
interval = int(env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)))
|
|
runtime = BotRuntime(token, channel_id, config_path, state_path, dry_run=bool_env("DISCORD_DRY_RUN", False))
|
|
dashboard = maybe_start_dashboard(runtime)
|
|
|
|
if runtime.dry_run:
|
|
print("Discord dry run is enabled; no Discord messages will be sent or edited.", flush=True)
|
|
else:
|
|
try:
|
|
identity = discord_bot_identity(token)
|
|
username = identity.get("username", "unknown")
|
|
bot_id = identity.get("id", "unknown")
|
|
print(f"Discord token authenticated as {username} ({bot_id})", flush=True)
|
|
except Exception as exc:
|
|
print(f"Could not verify Discord bot identity: {exc}", file=sys.stderr, flush=True)
|
|
|
|
stopped = False
|
|
|
|
def stop(_signum: int, _frame: Any) -> None:
|
|
nonlocal stopped
|
|
stopped = True
|
|
|
|
signal.signal(signal.SIGINT, stop)
|
|
signal.signal(signal.SIGTERM, stop)
|
|
|
|
while not stopped:
|
|
try:
|
|
message_id, results = run_check_cycle(runtime)
|
|
online = sum(1 for result in results if result.ok)
|
|
print(f"Updated Discord status message {message_id}: {online}/{len(results)} online", flush=True)
|
|
except Exception as exc:
|
|
with runtime.lock:
|
|
runtime.last_error = str(exc)
|
|
print(f"Status update failed: {exc}", file=sys.stderr, flush=True)
|
|
|
|
for _ in range(interval):
|
|
if stopped:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if dashboard is not None:
|
|
dashboard.shutdown()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|