Initial Archive status bot

This commit is contained in:
MiTHRAL 2026-05-13 22:06:44 -04:00
commit c415e82500
12 changed files with 2264 additions and 0 deletions

9
.dockerignore Normal file
View file

@ -0,0 +1,9 @@
.env
.git
.gitignore
__pycache__/
*.py[cod]
state/
services.json
compose.local.yaml
preview.html

14
.env.deploy.example Normal file
View file

@ -0,0 +1,14 @@
DISCORD_BOT_TOKEN=replace-with-your-discord-bot-token
DISCORD_CHANNEL_ID=1504278732070981683
ARCHIVE_STATUS_CONFIG=services.json
ARCHIVE_STATUS_STATE=state/status-message.json
CHECK_INTERVAL_SECONDS=60
HTTP_USER_AGENT=ArchiveStatusBot/1.0
DISCORD_DRY_RUN=false
DASHBOARD_ENABLED=true
DASHBOARD_HOST=0.0.0.0
DASHBOARD_PORT=8787
DASHBOARD_USERNAME=admin
DASHBOARD_PASSWORD_HASH=replace-with-generated-pbkdf2-hash
DASHBOARD_SESSION_TTL_SECONDS=28800
DASHBOARD_COOKIE_SECURE=true

14
.env.example Normal file
View file

@ -0,0 +1,14 @@
DISCORD_BOT_TOKEN=replace-with-your-discord-bot-token
DISCORD_CHANNEL_ID=1504278732070981683
ARCHIVE_STATUS_CONFIG=services.json
ARCHIVE_STATUS_STATE=state/status-message.json
CHECK_INTERVAL_SECONDS=60
HTTP_USER_AGENT=ArchiveStatusBot/1.0
DISCORD_DRY_RUN=false
DASHBOARD_ENABLED=true
DASHBOARD_HOST=0.0.0.0
DASHBOARD_PORT=8787
DASHBOARD_USERNAME=admin
DASHBOARD_PASSWORD_HASH=replace-with-generated-pbkdf2-hash
DASHBOARD_SESSION_TTL_SECONDS=28800
DASHBOARD_COOKIE_SECURE=false

7
.gitignore vendored Normal file
View file

@ -0,0 +1,7 @@
.env
services.json
state/
__pycache__/
*.py[cod]
.agents/
.codex/

14
Dockerfile Normal file
View file

@ -0,0 +1,14 @@
FROM python:3.12-alpine
WORKDIR /app
COPY status_bot.py /app/status_bot.py
COPY dashboard.html /app/dashboard.html
COPY services.example.json /app/services.json
RUN adduser -D -u 1000 -h /app archive-status
RUN mkdir -p /app/state && chown -R archive-status:archive-status /app
USER archive-status
CMD ["python", "/app/status_bot.py"]

178
README.md Normal file
View file

@ -0,0 +1,178 @@
# Archive Bot
An AIO Discord bot foundation for The Mithral Archive. The first module is status monitoring: it checks configured URLs and keeps one live status message updated in the Discord `status` channel.
The message uses a summary embed plus one small embed per service, so each service gets its own green or red Discord color bar.
It does not need Discord gateway intents or slash commands for the status module. It only needs a bot token, the `status` channel ID, and permission to send/edit its own messages.
The bot also includes a small web dashboard for editing monitored services and forcing immediate Discord refreshes.
## Discord Bot Setup
Create a Discord application and bot in the Discord Developer Portal, then invite it with:
```text
https://discord.com/oauth2/authorize?client_id=YOUR_CLIENT_ID&permissions=84992&scope=bot
```
Required channel permissions:
- View Channel
- Send Messages
- Embed Links
- Read Message History
The current `status` channel ID is:
```text
1504278732070981683
```
## Local Setup
```sh
cp .env.example .env
cp services.example.json services.json
```
Edit `.env` and set:
- `DISCORD_BOT_TOKEN`
- `DASHBOARD_USERNAME`
- `DASHBOARD_PASSWORD_HASH`
Generate the dashboard password hash:
```sh
python3 status_bot.py --hash-password
```
Paste the output into `DASHBOARD_PASSWORD_HASH`. The dashboard does not store a reusable browser token; it uses an HttpOnly session cookie and CSRF token after login.
Dashboard auth includes:
- PBKDF2-SHA256 password hashing
- HttpOnly `SameSite=Strict` session cookie
- CSRF token required for write actions
- basic failed-login throttling
Edit `services.json` with the URLs you want displayed. Keep private/internal URLs out of Git if they should not be shared.
Preview the Discord embed payload:
```sh
ARCHIVE_STATUS_CONFIG=services.json python3 status_bot.py --preview
```
Open `preview.html` in a browser to see an approximate Discord-style render. Paste the JSON from `--preview` into the textarea and render it to test changes before sending anything to Discord.
Run it:
```sh
python3 status_bot.py
```
Local runs automatically read `.env` from the current directory.
For dashboard testing without touching Discord, set:
```env
DISCORD_DRY_RUN=true
ARCHIVE_STATUS_STATE=state/status-message.json
```
Open the dashboard:
```text
http://127.0.0.1:8787
```
Sign in with `DASHBOARD_USERNAME` and the password you used when generating `DASHBOARD_PASSWORD_HASH`.
## Docker Setup
```sh
cp .env.deploy.example .env
cp services.example.json services.json
docker compose up -d --build
```
The bot stores the Discord message ID in `state/status-message.json`. Keep that file mounted so the bot edits the same message after restarts.
The deploy compose joins your existing reverse-proxy network:
```yaml
networks:
mediaserver_default:
external: true
```
If that network does not already exist on the deploy host, create it once:
```sh
docker network create mediaserver_default
```
The dashboard is exposed inside Docker on port `8787` for your reverse proxy. It is not published directly to the host by default.
Use this target from your proxy:
```text
http://archive-status-bot:8787
```
For HTTPS behind a reverse proxy, set:
```env
DASHBOARD_COOKIE_SECURE=true
```
Leave it `false` only for direct localhost HTTP testing.
For direct local Docker testing without a proxy:
```sh
docker compose -f compose.yaml -f compose.local.yaml up -d --build
```
Then open:
```text
http://127.0.0.1:8787
```
## Dashboard
The dashboard currently supports:
- viewing monitored services
- adding/removing service rows
- editing check URL, display URL, expected statuses, timeout, and keyword
- saving `services.json`
- forcing an immediate check and Discord message update
The sidebar leaves room for future modules like polls, webhooks, automations, and service integrations without changing the bot shape later.
## Service Config
Each service supports:
- `name`: label shown in Discord
- `url`: URL checked by the bot
- `displayUrl`: URL linked in the embed
- `method`: optional, defaults to `GET`
- `timeoutSeconds`: optional, defaults to `10`
- `expectedStatuses`: optional list such as `["200-399"]` or `[200, 204]`
- `keyword`: optional text that must appear in the response body
Example:
```json
{
"name": "Jellyfin",
"url": "https://jellyfin.mithraic.cloud/health",
"displayUrl": "https://jellyfin.mithraic.cloud",
"expectedStatuses": ["200-399"]
}
```

4
compose.local.yaml Normal file
View file

@ -0,0 +1,4 @@
services:
archive-status-bot:
ports:
- 127.0.0.1:8787:8787

18
compose.yaml Normal file
View file

@ -0,0 +1,18 @@
services:
archive-status-bot:
build: .
container_name: archive-status-bot
restart: unless-stopped
env_file:
- .env
expose:
- "8787"
volumes:
- ./services.json:/app/services.json
- ./state:/app/state
networks:
- mediaserver_default
networks:
mediaserver_default:
external: true

648
dashboard.html Normal file
View file

@ -0,0 +1,648 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Archive Bot Dashboard</title>
<style>
:root {
color-scheme: dark;
--bg: #111214;
--panel: #1b1d21;
--panel-2: #202328;
--line: #30343b;
--line-strong: #3d424b;
--text: #f1f2f4;
--muted: #a2a8b3;
--faint: #757d8a;
--ok: #10b981;
--warn: #f59e0b;
--bad: #ef4444;
--action: #e7e9ed;
--action-text: #15171a;
}
* {
box-sizing: border-box;
}
body {
margin: 0;
background: var(--bg);
color: var(--text);
font: 14px/1.45 ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", sans-serif;
}
button,
input,
select {
font: inherit;
}
button {
border: 1px solid var(--line-strong);
border-radius: 6px;
background: var(--panel-2);
color: var(--text);
padding: 8px 10px;
font-weight: 650;
cursor: pointer;
}
button:hover {
border-color: #5a606b;
}
button.primary {
background: var(--action);
border-color: var(--action);
color: var(--action-text);
}
button.danger {
color: #fecaca;
}
input,
select {
width: 100%;
border: 1px solid var(--line);
border-radius: 6px;
background: #14161a;
color: var(--text);
padding: 8px 9px;
min-height: 36px;
}
input:focus,
select:focus {
outline: 2px solid #5f6875;
outline-offset: 1px;
}
.app {
min-height: 100vh;
display: grid;
grid-template-columns: 240px 1fr;
}
aside {
border-right: 1px solid var(--line);
background: #15171a;
padding: 18px 14px;
}
.brand {
margin: 0 0 20px;
font-size: 15px;
font-weight: 750;
}
nav {
display: grid;
gap: 4px;
}
.nav-item {
display: flex;
justify-content: space-between;
gap: 12px;
border-radius: 6px;
padding: 8px 9px;
color: var(--muted);
}
.nav-item.active {
background: var(--panel);
color: var(--text);
}
.nav-item.disabled {
color: var(--faint);
}
main {
min-width: 0;
padding: 22px 24px 36px;
}
.topbar {
display: flex;
align-items: center;
justify-content: space-between;
gap: 16px;
margin-bottom: 20px;
}
h1 {
margin: 0;
font-size: 20px;
letter-spacing: 0;
}
.actions {
display: flex;
flex-wrap: wrap;
gap: 8px;
justify-content: flex-end;
}
.panel {
border: 1px solid var(--line);
border-radius: 8px;
background: var(--panel);
}
.panel-header {
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 13px 14px;
border-bottom: 1px solid var(--line);
}
.panel-title {
font-weight: 700;
}
.summary {
display: grid;
grid-template-columns: repeat(4, minmax(0, 1fr));
gap: 1px;
overflow: hidden;
margin-bottom: 16px;
background: var(--line);
}
.summary-item {
background: var(--panel);
padding: 14px;
}
.summary-item span {
display: block;
color: var(--muted);
margin-bottom: 5px;
}
.summary-item strong {
font-size: 18px;
}
.service-list {
display: grid;
gap: 1px;
background: var(--line);
}
.service-row {
display: grid;
grid-template-columns: minmax(130px, 0.8fr) minmax(220px, 1.5fr) minmax(180px, 1.2fr) 116px 88px 120px 40px;
gap: 10px;
align-items: end;
background: var(--panel);
padding: 12px 14px;
}
.field label {
display: block;
color: var(--muted);
font-size: 12px;
font-weight: 650;
margin-bottom: 5px;
}
.row-status {
align-self: center;
color: var(--muted);
font-size: 12px;
line-height: 1.35;
}
.state {
display: inline-flex;
align-items: center;
font-weight: 700;
}
.state.ok {
color: var(--ok);
}
.state.bad {
color: var(--bad);
}
.message {
margin-top: 12px;
min-height: 20px;
color: var(--muted);
}
.message.error {
color: #fca5a5;
}
.token-screen {
max-width: 420px;
margin: 80px auto;
padding: 18px;
}
.token-screen p {
color: var(--muted);
margin: 8px 0 16px;
}
.token-screen input + input {
margin-top: 8px;
}
.token-actions {
display: flex;
gap: 8px;
margin-top: 10px;
}
@media (max-width: 1100px) {
.app {
grid-template-columns: 1fr;
}
aside {
border-right: 0;
border-bottom: 1px solid var(--line);
}
nav {
grid-template-columns: repeat(4, minmax(0, 1fr));
}
.service-row {
grid-template-columns: 1fr 1fr;
}
}
@media (max-width: 680px) {
main {
padding: 18px 14px 28px;
}
.topbar,
.panel-header {
align-items: flex-start;
flex-direction: column;
}
.summary {
grid-template-columns: 1fr 1fr;
}
nav,
.service-row {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<div id="login" class="token-screen panel" hidden>
<h1>Archive Bot</h1>
<p>Sign in with the dashboard account configured on the bot.</p>
<input id="username" type="text" autocomplete="username" placeholder="Username">
<input id="password" type="password" autocomplete="current-password" placeholder="Password">
<div class="token-actions">
<button id="loginButton" class="primary" type="button">Sign in</button>
</div>
<div id="loginMessage" class="message error"></div>
</div>
<div id="app" class="app" hidden>
<aside>
<div class="brand">Archive Bot</div>
<nav aria-label="Bot modules">
<div class="nav-item active"><span>Status</span><span>Ready</span></div>
<div class="nav-item disabled"><span>Polls</span><span>Later</span></div>
<div class="nav-item disabled"><span>Webhooks</span><span>Later</span></div>
<div class="nav-item disabled"><span>Automations</span><span>Later</span></div>
</nav>
</aside>
<main>
<div class="topbar">
<h1>Status Services</h1>
<div class="actions">
<button id="refresh" type="button">Refresh</button>
<button id="checkNow" type="button">Check now</button>
<button id="addService" type="button">Add service</button>
<button id="save" class="primary" type="button">Save and update Discord</button>
<button id="logout" type="button">Logout</button>
</div>
</div>
<section class="summary panel" aria-label="Current status summary">
<div class="summary-item">
<span>Services</span>
<strong id="serviceCount">0</strong>
</div>
<div class="summary-item">
<span>Online</span>
<strong id="onlineCount">0</strong>
</div>
<div class="summary-item">
<span>Issues</span>
<strong id="issueCount">0</strong>
</div>
<div class="summary-item">
<span>Last check</span>
<strong id="lastCheck">Never</strong>
</div>
</section>
<section class="panel">
<div class="panel-header">
<div class="panel-title">Monitored Links</div>
<div id="message" class="message"></div>
</div>
<div id="services" class="service-list"></div>
</section>
</main>
</div>
<script>
const appEl = document.querySelector("#app");
const loginEl = document.querySelector("#login");
const servicesEl = document.querySelector("#services");
const messageEl = document.querySelector("#message");
const loginMessageEl = document.querySelector("#loginMessage");
let services = [];
let results = new Map();
let csrfToken = "";
function headers(method = "GET") {
const base = { "Content-Type": "application/json" };
if (method !== "GET") {
base["X-CSRF-Token"] = csrfToken;
}
return base;
}
async function api(path, options = {}) {
const method = options.method || "GET";
const response = await fetch(path, {
...options,
credentials: "same-origin",
headers: { ...headers(method), ...(options.headers || {}) }
});
const data = await response.json().catch(() => ({}));
if (!response.ok) {
const error = new Error(data.error || `Request failed: ${response.status}`);
error.status = response.status;
throw error;
}
return data;
}
function setMessage(text, isError = false) {
messageEl.textContent = text;
messageEl.classList.toggle("error", isError);
}
function showLogin(message = "") {
appEl.hidden = true;
loginEl.hidden = false;
loginMessageEl.textContent = message;
document.querySelector("#password").value = "";
}
function showApp() {
loginEl.hidden = true;
appEl.hidden = false;
}
function normalizeService(service = {}) {
return {
name: service.name || "",
url: service.url || "",
displayUrl: service.displayUrl || service.url || "",
method: service.method || "GET",
timeoutSeconds: service.timeoutSeconds || 10,
expectedStatuses: Array.isArray(service.expectedStatuses) ? service.expectedStatuses : ["200-399"],
keyword: service.keyword || ""
};
}
function statusFor(service) {
return results.get(service.name) || null;
}
function renderSummary(payload) {
const online = payload.results.filter((result) => result.ok).length;
document.querySelector("#serviceCount").textContent = payload.services.length;
document.querySelector("#onlineCount").textContent = online;
document.querySelector("#issueCount").textContent = Math.max(payload.results.length - online, 0);
document.querySelector("#lastCheck").textContent = payload.lastCheckedAt
? new Date(payload.lastCheckedAt).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" })
: "Never";
}
function serviceRow(service, index) {
const result = statusFor(service);
const row = document.createElement("div");
row.className = "service-row";
row.dataset.index = String(index);
row.innerHTML = `
<div class="field">
<label>Name</label>
<input data-key="name" value="${escapeAttr(service.name)}">
</div>
<div class="field">
<label>Check URL</label>
<input data-key="url" value="${escapeAttr(service.url)}">
</div>
<div class="field">
<label>Display URL</label>
<input data-key="displayUrl" value="${escapeAttr(service.displayUrl)}">
</div>
<div class="field">
<label>Expected</label>
<input data-key="expectedStatuses" value="${escapeAttr(service.expectedStatuses.join(", "))}">
</div>
<div class="field">
<label>Timeout</label>
<input data-key="timeoutSeconds" type="number" min="1" max="60" value="${escapeAttr(service.timeoutSeconds)}">
</div>
<div class="row-status">
${renderResult(result)}
</div>
<button class="danger" type="button" data-remove="${index}" title="Remove service">Remove</button>
`;
return row;
}
function renderResult(result) {
if (!result) {
return `<span class="state">Unchecked</span>`;
}
const stateClass = result.ok ? "ok" : "bad";
const label = result.ok ? "Online" : "Issue";
const status = result.status ? `HTTP ${result.status}` : "No response";
const latency = result.latencyMs == null ? "n/a" : `${result.latencyMs} ms`;
return `<span class="state ${stateClass}">${label}</span><br>${status}<br>${latency}`;
}
function escapeAttr(value) {
return String(value ?? "")
.replace(/&/g, "&amp;")
.replace(/"/g, "&quot;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;");
}
function renderServices() {
servicesEl.innerHTML = "";
services.forEach((service, index) => {
servicesEl.append(serviceRow(service, index));
});
}
function collectServices() {
return [...servicesEl.querySelectorAll(".service-row")].map((row) => {
const item = {};
row.querySelectorAll("[data-key]").forEach((input) => {
const key = input.dataset.key;
if (key === "expectedStatuses") {
item[key] = input.value.split(",").map((part) => part.trim()).filter(Boolean);
} else if (key === "timeoutSeconds") {
item[key] = Number(input.value || 10);
} else {
item[key] = input.value.trim();
}
});
item.method = "GET";
if (!item.displayUrl) item.displayUrl = item.url;
return item;
});
}
async function loadStatus() {
const payload = await api("/api/status");
services = payload.services.map(normalizeService);
results = new Map(payload.results.map((result) => [result.name, result]));
renderSummary(payload);
renderServices();
setMessage(payload.lastError ? `Last error: ${payload.lastError}` : "");
showApp();
}
async function loadSession() {
const session = await api("/api/session");
csrfToken = session.csrfToken || "";
return session;
}
async function login() {
const username = document.querySelector("#username").value.trim();
const password = document.querySelector("#password").value;
const response = await fetch("/api/login", {
method: "POST",
credentials: "same-origin",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ username, password })
});
const data = await response.json().catch(() => ({}));
if (!response.ok) {
const error = new Error(data.error || `Login failed: ${response.status}`);
error.status = response.status;
throw error;
}
csrfToken = data.csrfToken || "";
await loadStatus();
}
async function logout() {
await api("/api/logout", { method: "POST", body: "{}" }).catch(() => {});
csrfToken = "";
showLogin();
}
async function saveServices() {
services = collectServices().map(normalizeService);
const payload = await api("/api/services", {
method: "POST",
body: JSON.stringify({ services })
});
results = new Map(payload.results.map((result) => [result.name, result]));
renderSummary({ services, results: payload.results, lastCheckedAt: new Date().toISOString() });
renderServices();
setMessage("Saved and updated Discord.");
}
async function checkNow() {
const payload = await api("/api/check", { method: "POST", body: "{}" });
results = new Map(payload.results.map((result) => [result.name, result]));
renderSummary({ services, results: payload.results, lastCheckedAt: new Date().toISOString() });
renderServices();
setMessage("Checked services and updated Discord.");
}
document.querySelector("#loginButton").addEventListener("click", async () => {
try {
await login();
} catch (error) {
showLogin(error.message);
}
});
document.querySelector("#password").addEventListener("keydown", (event) => {
if (event.key === "Enter") {
document.querySelector("#loginButton").click();
}
});
document.querySelector("#refresh").addEventListener("click", () => {
loadStatus().catch((error) => setMessage(error.message, true));
});
document.querySelector("#save").addEventListener("click", () => {
saveServices().catch((error) => setMessage(error.message, true));
});
document.querySelector("#checkNow").addEventListener("click", () => {
checkNow().catch((error) => setMessage(error.message, true));
});
document.querySelector("#logout").addEventListener("click", () => {
logout();
});
document.querySelector("#addService").addEventListener("click", () => {
services.push(normalizeService({ name: "New Service", url: "https://example.com" }));
renderServices();
});
servicesEl.addEventListener("click", (event) => {
const button = event.target.closest("[data-remove]");
if (!button) return;
services.splice(Number(button.dataset.remove), 1);
renderServices();
});
loadSession().then(loadStatus).catch((error) => {
if (error.status === 401) {
showLogin();
} else {
showLogin(error.message);
}
});
</script>
</body>
</html>

390
preview.html Normal file
View file

@ -0,0 +1,390 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Archive Status Preview</title>
<style>
:root {
color-scheme: dark;
--page: #1e1f22;
--channel: #313338;
--message: #2b2d31;
--text: #dbdee1;
--muted: #949ba4;
--link: #00a8fc;
--border: #3f4147;
--green: #10b981;
--orange: #f59e0b;
--red: #ef4444;
--gray: #6b7280;
}
* {
box-sizing: border-box;
}
body {
margin: 0;
background: var(--page);
color: var(--text);
font: 14px/1.45 system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", sans-serif;
}
main {
width: min(980px, calc(100vw - 32px));
margin: 0 auto;
padding: 32px 0;
}
h1 {
margin: 0 0 16px;
font-size: 20px;
font-weight: 650;
letter-spacing: 0;
}
.channel {
background: var(--channel);
border: 1px solid #26272b;
border-radius: 8px;
min-height: 560px;
overflow: hidden;
}
.channel-header {
height: 48px;
display: flex;
align-items: center;
padding: 0 16px;
border-bottom: 1px solid #26272b;
color: #f2f3f5;
font-weight: 650;
}
.message {
display: grid;
grid-template-columns: 40px 1fr;
gap: 12px;
padding: 20px 18px;
}
.avatar {
width: 40px;
height: 40px;
border-radius: 50%;
background: #5865f2;
display: grid;
place-items: center;
color: #fff;
font-weight: 700;
}
.message-meta {
display: flex;
align-items: baseline;
gap: 8px;
margin-bottom: 8px;
}
.author {
color: #f2f3f5;
font-weight: 650;
}
.bot-tag {
padding: 1px 4px;
border-radius: 3px;
background: #5865f2;
color: #fff;
font-size: 10px;
font-weight: 700;
line-height: 1.3;
}
.time {
color: var(--muted);
font-size: 12px;
}
.embeds {
display: grid;
gap: 8px;
max-width: 560px;
}
.embed {
position: relative;
background: var(--message);
border: 1px solid var(--border);
border-radius: 4px;
padding: 10px 12px 10px 16px;
overflow: hidden;
}
.embed::before {
content: "";
position: absolute;
inset: 0 auto 0 0;
width: 4px;
background: var(--embed-color, var(--gray));
}
.embed-title {
margin: 0 0 6px;
color: #f2f3f5;
font-size: 14px;
font-weight: 650;
}
.embed-description {
margin: 0;
white-space: pre-line;
color: var(--text);
}
.embed-description a {
color: var(--link);
text-decoration: none;
}
.embed-description a:hover {
text-decoration: underline;
}
.embed-footer {
margin-top: 8px;
color: var(--muted);
font-size: 12px;
}
.embed-fields {
display: grid;
grid-template-columns: minmax(0, 1.3fr) minmax(0, 1fr);
gap: 12px;
margin-top: 10px;
}
.embed-field-name {
color: #f2f3f5;
font-weight: 650;
margin-bottom: 4px;
}
.embed-field-value {
color: var(--text);
white-space: pre-line;
}
.embed-field-value a {
color: var(--link);
text-decoration: none;
}
.embed-field-value a:hover {
text-decoration: underline;
}
.controls {
display: grid;
gap: 8px;
margin-top: 18px;
max-width: 560px;
}
label {
color: var(--muted);
font-weight: 600;
}
textarea {
width: 100%;
min-height: 220px;
resize: vertical;
border: 1px solid #3f4147;
border-radius: 6px;
background: #232428;
color: var(--text);
padding: 10px;
font: 12px/1.45 ui-monospace, SFMono-Regular, Menlo, Consolas, monospace;
}
button {
justify-self: start;
border: 1px solid #4e5058;
border-radius: 6px;
background: #404249;
color: #fff;
padding: 8px 12px;
font-weight: 650;
cursor: pointer;
}
button:hover {
background: #4e5058;
}
.error {
color: #fca5a5;
min-height: 20px;
}
@media (max-width: 640px) {
main {
width: 100%;
padding: 0;
}
h1 {
padding: 16px;
margin: 0;
}
.channel {
border-left: 0;
border-right: 0;
border-radius: 0;
}
.embeds,
.controls {
max-width: 100%;
}
}
</style>
</head>
<body>
<main>
<h1>Archive Status Preview</h1>
<section class="channel" aria-label="Discord status channel preview">
<div class="channel-header"># status</div>
<article class="message">
<div class="avatar">A</div>
<div>
<div class="message-meta">
<span class="author">Archive Status</span>
<span class="bot-tag">BOT</span>
<span class="time">Today at 9:03 PM</span>
</div>
<div id="embeds" class="embeds"></div>
<div class="controls">
<label for="payload">Preview payload</label>
<textarea id="payload" spellcheck="false"></textarea>
<button type="button" id="render">Render payload</button>
<div id="error" class="error" role="status"></div>
</div>
</div>
</article>
</section>
</main>
<script>
const samplePayload = {
content: "",
embeds: [
{
title: "The Mithral Archive",
description: "🟡 Degraded · 5/6 online · 1 service needs attention",
color: 16096779,
fields: [
{
name: "Service",
value: "**Portal**\n**Jellyfin**\n**Navidrome**\n**Voting Hub**\n**Forgejo**\n**Support**",
inline: true
},
{
name: "Status",
value: "🟢 Online\n🟢 Online\n🟢 Online\n🟢 Online\n🟢 Online\n🔴 Issue",
inline: true
}
],
footer: { text: "Refreshes every 60s • Last checked 2026-05-14 01:56:29 UTC" }
}
]
};
const embedsEl = document.querySelector("#embeds");
const payloadEl = document.querySelector("#payload");
const errorEl = document.querySelector("#error");
function colorToHex(value) {
const number = Number(value);
if (!Number.isFinite(number)) return "#6b7280";
return `#${number.toString(16).padStart(6, "0").slice(-6)}`;
}
function renderMarkdownLinks(text) {
const escaped = text
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;");
return escaped.replace(/\[([^\]]+)]\((https?:\/\/[^)\s]+)\)/g, '<a href="$2">$1</a>');
}
function render(payload) {
embedsEl.innerHTML = "";
const embeds = Array.isArray(payload.embeds) ? payload.embeds : [];
embeds.forEach((embed) => {
const article = document.createElement("section");
article.className = "embed";
article.style.setProperty("--embed-color", colorToHex(embed.color));
const title = document.createElement("h2");
title.className = "embed-title";
title.textContent = embed.title || "Untitled";
article.append(title);
if (embed.description) {
const description = document.createElement("p");
description.className = "embed-description";
description.innerHTML = renderMarkdownLinks(String(embed.description));
article.append(description);
}
if (Array.isArray(embed.fields) && embed.fields.length) {
const fields = document.createElement("div");
fields.className = "embed-fields";
embed.fields.forEach((field) => {
const item = document.createElement("div");
item.className = "embed-field";
const name = document.createElement("div");
name.className = "embed-field-name";
name.textContent = field.name || "\u200b";
const value = document.createElement("div");
value.className = "embed-field-value";
value.innerHTML = renderMarkdownLinks(String(field.value || "\u200b"));
item.append(name, value);
fields.append(item);
});
article.append(fields);
}
if (embed.footer?.text) {
const footer = document.createElement("div");
footer.className = "embed-footer";
footer.textContent = embed.footer.text;
article.append(footer);
}
embedsEl.append(article);
});
}
document.querySelector("#render").addEventListener("click", () => {
try {
const payload = JSON.parse(payloadEl.value);
errorEl.textContent = "";
render(payload);
} catch (error) {
errorEl.textContent = error.message;
}
});
payloadEl.value = JSON.stringify(samplePayload, null, 2);
render(samplePayload);
</script>
</body>
</html>

40
services.example.json Normal file
View file

@ -0,0 +1,40 @@
{
"services": [
{
"name": "Portal",
"url": "https://mithraic.cloud",
"displayUrl": "https://mithraic.cloud",
"expectedStatuses": ["200-399"]
},
{
"name": "Jellyfin",
"url": "https://jellyfin.mithraic.cloud/health",
"displayUrl": "https://jellyfin.mithraic.cloud",
"expectedStatuses": ["200-399"]
},
{
"name": "Navidrome",
"url": "https://listen.mithraic.cloud",
"displayUrl": "https://listen.mithraic.cloud",
"expectedStatuses": ["200-399"]
},
{
"name": "Voting Hub",
"url": "https://vote.mithraic.cloud",
"displayUrl": "https://vote.mithraic.cloud",
"expectedStatuses": ["200-399"]
},
{
"name": "Forgejo",
"url": "https://git.mithraic.cloud",
"displayUrl": "https://git.mithraic.cloud",
"expectedStatuses": ["200-399"]
},
{
"name": "Support",
"url": "https://support.mithraic.cloud",
"displayUrl": "https://support.mithraic.cloud",
"expectedStatuses": ["200-399"]
}
]
}

928
status_bot.py Normal file
View file

@ -0,0 +1,928 @@
#!/usr/bin/env python3
"""Live Discord status message for The Mithral Archive."""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import secrets
import signal
import socket
import ssl
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from http.cookies import SimpleCookie
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
DISCORD_API = "https://discord.com/api/v10"
DEFAULT_INTERVAL_SECONDS = 60
DEFAULT_TIMEOUT_SECONDS = 10
MAX_DISCORD_EMBEDS = 10
MAX_REQUEST_BYTES = 1_000_000
SESSION_COOKIE = "archive_bot_session"
PBKDF2_ITERATIONS = 390_000
@dataclass(frozen=True)
class Service:
name: str
url: str
display_url: str
method: str
timeout: float
expected_statuses: set[int]
expected_min: int
expected_max: int
keyword: str | None
@dataclass(frozen=True)
class CheckResult:
service: Service
ok: bool
status: int | None
latency_ms: int | None
error: str | None
@dataclass(frozen=True)
class DashboardAuthConfig:
username: str
password_hash: str
session_ttl_seconds: int
cookie_secure: bool
@dataclass
class DashboardSession:
username: str
csrf_token: str
expires_at: float
class DashboardAuth:
def __init__(self, config: DashboardAuthConfig) -> None:
self.config = config
self.lock = threading.Lock()
self.sessions: dict[str, DashboardSession] = {}
self.failed_logins: dict[str, list[float]] = {}
def login_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - 900
with self.lock:
attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start]
self.failed_logins[key] = attempts
return len(attempts) < 10
def record_failed_login(self, key: str) -> None:
now = time.time()
with self.lock:
self.failed_logins.setdefault(key, []).append(now)
def clear_failed_login(self, key: str) -> None:
with self.lock:
self.failed_logins.pop(key, None)
def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None:
if not hmac.compare_digest(username, self.config.username):
return None
if not verify_password_hash(self.config.password_hash, password):
return None
session_id = secrets.token_urlsafe(32)
session = DashboardSession(
username=username,
csrf_token=secrets.token_urlsafe(32),
expires_at=time.time() + self.config.session_ttl_seconds,
)
with self.lock:
self.sessions[session_id] = session
return session_id, session
def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None:
if not cookie_header:
return None
cookie = SimpleCookie()
cookie.load(cookie_header)
morsel = cookie.get(SESSION_COOKIE)
if morsel is None:
return None
session_id = morsel.value
now = time.time()
with self.lock:
session = self.sessions.get(session_id)
if session is None:
return None
if session.expires_at <= now:
self.sessions.pop(session_id, None)
return None
session.expires_at = now + self.config.session_ttl_seconds
return session_id, session
def logout(self, session_id: str) -> None:
with self.lock:
self.sessions.pop(session_id, None)
class BotRuntime:
def __init__(
self,
token: str,
channel_id: str,
config_path: Path,
state_path: Path,
dry_run: bool = False,
) -> None:
self.token = token
self.channel_id = channel_id
self.config_path = config_path
self.state_path = state_path
self.dry_run = dry_run
self.lock = threading.Lock()
self.last_results: list[CheckResult] = []
self.last_error: str | None = None
self.last_message_id: str | None = None
self.last_checked_at: datetime | None = None
def env(name: str, default: str | None = None) -> str:
value = os.getenv(name, default)
if value is None or not value.strip():
raise SystemExit(f"Missing required environment variable: {name}")
return value.strip()
def load_dotenv(path: Path = Path(".env")) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip("\"'")
if key and key not in os.environ:
os.environ[key] = value
def load_json(path: Path) -> dict[str, Any]:
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except FileNotFoundError as exc:
raise ValueError(f"Config file not found: {path}") from exc
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
if not isinstance(data, dict):
raise ValueError(f"Config must be a JSON object: {path}")
return data
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
if raw is None:
return set(), 200, 399
if isinstance(raw, str):
raw = [part.strip() for part in raw.split(",") if part.strip()]
if not isinstance(raw, list):
raise ValueError("expectedStatuses must be a list or comma-separated string")
exact: set[int] = set()
min_status = 999
max_status = 0
for item in raw:
if isinstance(item, int):
exact.add(item)
continue
if not isinstance(item, str):
raise ValueError("expectedStatuses entries must be integers or ranges")
if "-" in item:
left, right = item.split("-", 1)
try:
min_status = min(min_status, int(left))
max_status = max(max_status, int(right))
except ValueError as exc:
raise ValueError(f"Invalid expected status range: {item}") from exc
continue
try:
exact.add(int(item))
except ValueError as exc:
raise ValueError(f"Invalid expected status value: {item}") from exc
if min_status == 999 and max_status == 0:
min_status, max_status = 0, -1
return exact, min_status, max_status
def services_from_data(data: dict[str, Any]) -> list[Service]:
raw_services = data.get("services")
if not isinstance(raw_services, list) or not raw_services:
raise ValueError("Config must include a non-empty services array")
services: list[Service] = []
for index, item in enumerate(raw_services, start=1):
if not isinstance(item, dict):
raise ValueError(f"Service #{index} must be an object")
name = str(item.get("name", "")).strip()
url = str(item.get("url", "")).strip()
if not name or not url:
raise ValueError(f"Service #{index} must include name and url")
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError(f"Service {name} has an invalid http(s) URL")
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
services.append(
Service(
name=name,
url=url,
display_url=str(item.get("displayUrl", url)).strip() or url,
method=str(item.get("method", "GET")).strip().upper(),
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
expected_statuses=exact,
expected_min=minimum,
expected_max=maximum,
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
)
)
return services
def load_services(path: Path) -> list[Service]:
return services_from_data(load_json(path))
def save_services_config(path: Path, data: dict[str, Any]) -> None:
services_from_data(data)
path.parent.mkdir(parents=True, exist_ok=True)
temporary = path.with_suffix(f"{path.suffix}.tmp")
with temporary.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
try:
temporary.replace(path)
except OSError:
with path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
temporary.unlink(missing_ok=True)
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for service in services:
expected: list[int | str] = sorted(service.expected_statuses)
if service.expected_min <= service.expected_max:
expected.append(f"{service.expected_min}-{service.expected_max}")
item: dict[str, Any] = {
"name": service.name,
"url": service.url,
"displayUrl": service.display_url,
"method": service.method,
"timeoutSeconds": service.timeout,
"expectedStatuses": expected or ["200-399"],
}
if service.keyword:
item["keyword"] = service.keyword
output.append(item)
return output
def status_expected(service: Service, status: int) -> bool:
if status in service.expected_statuses:
return True
return service.expected_min <= status <= service.expected_max
def check_service(service: Service) -> CheckResult:
started = time.monotonic()
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
request = urllib.request.Request(service.url, headers=headers, method=service.method)
try:
context = ssl.create_default_context()
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
body = response.read(1_000_000) if service.keyword else b""
status = int(response.status)
except urllib.error.HTTPError as exc:
status = int(exc.code)
latency_ms = int((time.monotonic() - started) * 1000)
ok = status_expected(service, status)
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
latency_ms = int((time.monotonic() - started) * 1000)
return CheckResult(service, False, None, latency_ms, clean_error(exc))
latency_ms = int((time.monotonic() - started) * 1000)
ok = status_expected(service, status)
if ok and service.keyword:
try:
text = body.decode("utf-8", errors="ignore")
except UnicodeDecodeError:
text = ""
if service.keyword not in text:
ok = False
return CheckResult(service, False, status, latency_ms, "keyword missing")
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
def clean_error(exc: BaseException) -> str:
reason = getattr(exc, "reason", None)
if reason:
return str(reason)[:120]
return str(exc)[:120] or exc.__class__.__name__
def discord_request(
method: str,
token: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
body = None
headers = {
"Authorization": f"Bot {token}",
"User-Agent": "ArchiveStatusBot/1.0",
}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(
f"{DISCORD_API}{path}",
data=body,
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(request, timeout=20) as response:
data = response.read()
if not data:
return {}
return json.loads(data.decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
def discord_bot_identity(token: str) -> dict[str, Any]:
return discord_request("GET", token, "/users/@me")
def load_state(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}
def save_state(path: Path, state: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
temporary = path.with_suffix(f"{path.suffix}.tmp")
with temporary.open("w", encoding="utf-8") as handle:
json.dump(state, handle, indent=2, sort_keys=True)
handle.write("\n")
temporary.replace(path)
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
checked_at = datetime.now(timezone.utc)
online = sum(1 for result in results if result.ok)
total = len(results)
degraded = 0 < online < total
if online == total:
color = 0x10B981
summary = f"🟢 Operational · {online}/{total} online"
elif degraded:
color = 0xF59E0B
offline = total - online
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
else:
color = 0xEF4444
summary = f"🔴 Outage · {online}/{total} online"
service_lines = []
state_lines = []
for result in results:
icon = "🟢" if result.ok else "🔴"
label = "Online" if result.ok else "Issue"
service_lines.append(f"**{result.service.name}**")
state_lines.append(f"{icon} {label}")
interval = os.getenv("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
return [
{
"title": "The Mithral Archive",
"description": summary,
"color": color,
"fields": [
{
"name": "Service",
"value": "\n".join(service_lines)[:1024] or "No services configured.",
"inline": True,
},
{
"name": "Status",
"value": "\n".join(state_lines)[:1024] or "n/a",
"inline": True,
}
],
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
"timestamp": checked_at.isoformat(),
}
]
def upsert_status_message(
token: str,
channel_id: str,
state_path: Path,
results: list[CheckResult],
) -> str:
state = load_state(state_path)
message_id = str(state.get("message_id", "")).strip()
payload = {"content": "", "embeds": render_embeds(results)}
if message_id:
try:
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
return message_id
except RuntimeError as exc:
print(f"Could not edit existing status message, creating a new one: {exc}", file=sys.stderr)
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
new_id = str(message.get("id", "")).strip()
if not new_id:
raise RuntimeError("Discord did not return a message id")
save_state(state_path, {"message_id": new_id})
return new_id
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
results: list[CheckResult] = []
for index, service in enumerate(services):
results.append(
CheckResult(
service=service,
ok=index != len(services) - 1,
status=200 if index != len(services) - 1 else 502,
latency_ms=42 + (index * 31),
error=None if index != len(services) - 1 else "HTTP 502",
)
)
return results
def print_preview(services: list[Service]) -> None:
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
print(json.dumps(payload, indent=2))
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
return {
"name": result.service.name,
"url": result.service.url,
"displayUrl": result.service.display_url,
"ok": result.ok,
"status": result.status,
"latencyMs": result.latency_ms,
"error": result.error,
}
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
services = load_services(runtime.config_path)
results = [check_service(service) for service in services]
if runtime.dry_run:
message_id = "dry-run"
else:
message_id = upsert_status_message(runtime.token, runtime.channel_id, runtime.state_path, results)
with runtime.lock:
runtime.last_results = results
runtime.last_message_id = message_id
runtime.last_checked_at = datetime.now(timezone.utc)
runtime.last_error = None
return message_id, results
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
services = load_services(runtime.config_path)
with runtime.lock:
results = list(runtime.last_results)
return {
"services": services_to_jsonable(services),
"results": [result_to_jsonable(result) for result in results],
"lastError": runtime.last_error,
"lastMessageId": runtime.last_message_id,
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
"channelId": runtime.channel_id,
}
def bool_env(name: str, default: bool = False) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def password_hash(password: str) -> str:
salt = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS)
salt_text = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=")
digest_text = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
return f"pbkdf2_sha256${PBKDF2_ITERATIONS}${salt_text}${digest_text}"
def decode_urlsafe_base64(value: str) -> bytes:
padding = "=" * (-len(value) % 4)
return base64.urlsafe_b64decode(value + padding)
def verify_password_hash(encoded: str, password: str) -> bool:
try:
algorithm, iterations, salt_text, digest_text = encoded.split("$", 3)
if algorithm != "pbkdf2_sha256":
return False
salt = decode_urlsafe_base64(salt_text)
expected = decode_urlsafe_base64(digest_text)
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(iterations))
except (ValueError, TypeError):
return False
return hmac.compare_digest(actual, expected)
def dashboard_auth_from_env() -> DashboardAuth | None:
if bool_env("DASHBOARD_AUTH_DISABLED", False):
return None
username = env("DASHBOARD_USERNAME")
encoded_hash = env("DASHBOARD_PASSWORD_HASH")
ttl = int(os.getenv("DASHBOARD_SESSION_TTL_SECONDS", "28800"))
secure = bool_env("DASHBOARD_COOKIE_SECURE", False)
return DashboardAuth(
DashboardAuthConfig(
username=username,
password_hash=encoded_hash,
session_ttl_seconds=ttl,
cookie_secure=secure,
)
)
def print_password_hash() -> None:
import getpass
first = getpass.getpass("Dashboard password: ")
second = getpass.getpass("Confirm password: ")
if first != second:
raise SystemExit("Passwords did not match")
if len(first) < 12:
raise SystemExit("Use at least 12 characters")
print(password_hash(first))
def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]:
dashboard_path = Path(__file__).with_name("dashboard.html")
class DashboardHandler(BaseHTTPRequestHandler):
server_version = "ArchiveStatusDashboard/1.0"
def log_message(self, format: str, *args: Any) -> None:
print(f"[dashboard] {self.address_string()} - {format % args}", flush=True)
def do_GET(self) -> None:
if self.path in {"/", "/dashboard"}:
self.send_dashboard()
return
if self.path == "/favicon.ico":
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
return
if self.path == "/api/session":
session = self.require_auth()
if session is None:
return
_session_id, data = session
self.send_json(
HTTPStatus.OK,
{
"username": data.username,
"csrfToken": data.csrf_token,
},
)
return
if self.path == "/api/status":
if self.require_auth() is None:
return
self.send_json(HTTPStatus.OK, runtime_status(runtime))
return
self.send_error(HTTPStatus.NOT_FOUND)
def do_POST(self) -> None:
if self.path == "/api/login":
self.handle_login()
return
session = self.require_auth(require_csrf=True)
if session is None:
return
if self.path == "/api/logout":
self.handle_logout(session[0])
return
if self.path == "/api/check":
self.handle_check()
return
if self.path == "/api/services":
self.handle_services()
return
self.send_error(HTTPStatus.NOT_FOUND)
def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None:
if auth is None:
return "disabled", DashboardSession("local", "disabled", time.time() + 3600)
session = auth.session_from_cookie(self.headers.get("Cookie"))
if session is None:
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"})
return None
if require_csrf:
csrf = self.headers.get("X-CSRF-Token", "")
if not hmac.compare_digest(csrf, session[1].csrf_token):
self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"})
return None
return session
def cookie_attributes(self, max_age: int) -> str:
attrs = [
"Path=/",
"HttpOnly",
"SameSite=Strict",
f"Max-Age={max_age}",
]
if auth is not None and auth.config.cookie_secure:
attrs.append("Secure")
return "; ".join(attrs)
def set_session_cookie(self, session_id: str) -> None:
ttl = auth.config.session_ttl_seconds if auth is not None else 3600
self.send_header(
"Set-Cookie",
f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}",
)
def clear_session_cookie(self) -> None:
self.send_header(
"Set-Cookie",
f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}",
)
def send_dashboard(self) -> None:
try:
body = dashboard_path.read_bytes()
except FileNotFoundError:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing")
return
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def read_json(self) -> dict[str, Any]:
raw_length = self.headers.get("Content-Length", "0")
try:
length = int(raw_length)
except ValueError as exc:
raise ValueError("Invalid Content-Length") from exc
if length > MAX_REQUEST_BYTES:
raise ValueError("Request body is too large")
body = self.rfile.read(length)
try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("JSON body must be an object")
return data
def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def handle_login(self) -> None:
if auth is None:
self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"})
return
try:
data = self.read_json()
except ValueError as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
username = str(data.get("username", ""))
password = str(data.get("password", ""))
throttle_key = f"{self.client_address[0]}:{username}"
if not auth.login_allowed(throttle_key):
self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."})
return
login = auth.login(username, password)
if login is None:
auth.record_failed_login(throttle_key)
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"})
return
auth.clear_failed_login(throttle_key)
session_id, session = login
payload = {
"username": session.username,
"csrfToken": session.csrf_token,
}
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(HTTPStatus.OK)
self.set_session_cookie(session_id)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def handle_logout(self, session_id: str) -> None:
if auth is not None:
auth.logout(session_id)
body = b'{\n "ok": true\n}'
self.send_response(HTTPStatus.OK)
self.clear_session_cookie()
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def handle_check(self) -> None:
try:
message_id, results = run_check_cycle(runtime)
except Exception as exc:
with runtime.lock:
runtime.last_error = str(exc)
self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)})
return
self.send_json(
HTTPStatus.OK,
{
"messageId": message_id,
"results": [result_to_jsonable(result) for result in results],
},
)
def handle_services(self) -> None:
try:
data = self.read_json()
services_from_data(data)
save_services_config(runtime.config_path, data)
message_id, results = run_check_cycle(runtime)
except Exception as exc:
with runtime.lock:
runtime.last_error = str(exc)
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(
HTTPStatus.OK,
{
"messageId": message_id,
"services": data.get("services", []),
"results": [result_to_jsonable(result) for result in results],
},
)
return DashboardHandler
def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None:
if not bool_env("DASHBOARD_ENABLED", False):
return None
host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1"
port = int(os.getenv("DASHBOARD_PORT", "8787"))
auth = dashboard_auth_from_env()
server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth))
thread = threading.Thread(target=server.serve_forever, name="dashboard", daemon=True)
thread.start()
auth_note = "without auth" if auth is None else "with password sessions"
print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True)
return server
def main() -> int:
load_dotenv()
if "--hash-password" in sys.argv:
print_password_hash()
return 0
if "--preview" in sys.argv:
config_path = Path(os.getenv("ARCHIVE_STATUS_CONFIG", "services.json"))
print_preview(load_services(config_path))
return 0
token = env("DISCORD_BOT_TOKEN")
channel_id = env("DISCORD_CHANNEL_ID")
config_path = Path(env("ARCHIVE_STATUS_CONFIG", "services.json"))
state_path = Path(env("ARCHIVE_STATUS_STATE", "state/status-message.json"))
interval = int(env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)))
runtime = BotRuntime(token, channel_id, config_path, state_path, dry_run=bool_env("DISCORD_DRY_RUN", False))
dashboard = maybe_start_dashboard(runtime)
if runtime.dry_run:
print("Discord dry run is enabled; no Discord messages will be sent or edited.", flush=True)
else:
try:
identity = discord_bot_identity(token)
username = identity.get("username", "unknown")
bot_id = identity.get("id", "unknown")
print(f"Discord token authenticated as {username} ({bot_id})", flush=True)
except Exception as exc:
print(f"Could not verify Discord bot identity: {exc}", file=sys.stderr, flush=True)
stopped = False
def stop(_signum: int, _frame: Any) -> None:
nonlocal stopped
stopped = True
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
while not stopped:
try:
message_id, results = run_check_cycle(runtime)
online = sum(1 for result in results if result.ok)
print(f"Updated Discord status message {message_id}: {online}/{len(results)} online", flush=True)
except Exception as exc:
with runtime.lock:
runtime.last_error = str(exc)
print(f"Status update failed: {exc}", file=sys.stderr, flush=True)
for _ in range(interval):
if stopped:
break
time.sleep(1)
if dashboard is not None:
dashboard.shutdown()
return 0
if __name__ == "__main__":
raise SystemExit(main())