Refactor bot and add watch party stream worker

This commit is contained in:
MiTHRAL 2026-05-26 15:06:24 -04:00
parent 81756addf8
commit 4090552e2c
25 changed files with 9888 additions and 2250 deletions

View file

@ -1,11 +1,14 @@
DISCORD_BOT_TOKEN=replace-with-your-discord-bot-token
DISCORD_GATEWAY_ENABLED=true
ARCHIVE_BOT_DB_PATH=state/archive-bot.db
ARCHIVE_STATUS_CONFIG=services.json
ARCHIVE_STATUS_STATE=state/status-message.json
MEDIA_CATALOG_STATE=state/media-catalog.json
MEDIA_LIBRARY_STATE=state/media-library.json
BOT_SETTINGS_STATE=state/bot-settings.json
PUBLIC_CATALOG_URL=
TMDB_API_READ_TOKEN=
TMDB_API_KEY=
JELLYFIN_SYNC_INTERVAL_SECONDS=900
CHECK_INTERVAL_SECONDS=60
HTTP_USER_AGENT=ArchiveStatusBot/1.0
@ -15,3 +18,14 @@ DASHBOARD_AUTH_DISABLED=true
DASHBOARD_HOST=0.0.0.0
DASHBOARD_PORT=8787
DASHBOARD_COOKIE_SECURE=true
WATCHPARTY_WORKER_URL=http://orb-stream-worker:8790
WATCHPARTY_WORKER_TOKEN=replace-with-random-worker-token
WATCHPARTY_DISCORD_USER_TOKEN=
WATCHPARTY_STREAM_WIDTH=1280
WATCHPARTY_STREAM_HEIGHT=720
WATCHPARTY_STREAM_FPS=30
WATCHPARTY_STREAM_BITRATE_KBPS=2000
WATCHPARTY_STREAM_MAX_BITRATE_KBPS=2500
WATCHPARTY_STREAM_CODEC=H264
WATCHPARTY_STREAM_PRESET=ultrafast
WATCHPARTY_STREAM_HARDWARE_ACCELERATION=false

View file

@ -1,11 +1,14 @@
DISCORD_BOT_TOKEN=replace-with-your-discord-bot-token
DISCORD_GATEWAY_ENABLED=true
ARCHIVE_BOT_DB_PATH=state/archive-bot.db
ARCHIVE_STATUS_CONFIG=services.json
ARCHIVE_STATUS_STATE=state/status-message.json
MEDIA_CATALOG_STATE=state/media-catalog.json
MEDIA_LIBRARY_STATE=state/media-library.json
BOT_SETTINGS_STATE=state/bot-settings.json
PUBLIC_CATALOG_URL=
TMDB_API_READ_TOKEN=
TMDB_API_KEY=
JELLYFIN_SYNC_INTERVAL_SECONDS=900
CHECK_INTERVAL_SECONDS=60
HTTP_USER_AGENT=ArchiveStatusBot/1.0
@ -17,3 +20,14 @@ DASHBOARD_USERNAME=admin
DASHBOARD_PASSWORD_HASH=replace-with-generated-pbkdf2-hash
DASHBOARD_SESSION_TTL_SECONDS=28800
DASHBOARD_COOKIE_SECURE=false
WATCHPARTY_WORKER_URL=http://orb-stream-worker:8790
WATCHPARTY_WORKER_TOKEN=replace-with-random-worker-token
WATCHPARTY_DISCORD_USER_TOKEN=
WATCHPARTY_STREAM_WIDTH=1280
WATCHPARTY_STREAM_HEIGHT=720
WATCHPARTY_STREAM_FPS=30
WATCHPARTY_STREAM_BITRATE_KBPS=2000
WATCHPARTY_STREAM_MAX_BITRATE_KBPS=2500
WATCHPARTY_STREAM_CODEC=H264
WATCHPARTY_STREAM_PRESET=ultrafast
WATCHPARTY_STREAM_HARDWARE_ACCELERATION=false

1
.gitignore vendored
View file

@ -5,3 +5,4 @@ __pycache__/
*.py[cod]
.agents/
.codex/
node_modules/

View file

@ -3,11 +3,15 @@ FROM python:3.12-alpine
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
RUN adduser -D -u 1001 -g "" -h /app archive-status
RUN mkdir -p /app/state && chown -R 1001:1001 /app
COPY --chown=1001:1001 archive_bot /app/archive_bot
COPY --chown=1001:1001 status_bot.py /app/status_bot.py
COPY --chown=1001:1001 dashboard.html /app/dashboard.html
COPY --chown=1001:1001 services.example.json /app/services.json
RUN pip install --no-cache-dir -r /app/requirements.txt
RUN adduser -D -u 1001 -g "" -h /app archive-status
USER 1001:1001
CMD ["python", "/app/status_bot.py"]

View file

@ -26,7 +26,7 @@ Required channel permissions for each dashboard-selected channel:
- Attach Files
- Read Message History
Channel IDs are configured from the dashboard and stored in `state/bot-settings.json`.
Channel IDs are configured from the dashboard and stored in the bot's SQLite database.
## Local Setup
@ -42,6 +42,23 @@ Edit `.env` and set:
- `DASHBOARD_USERNAME`
- `DASHBOARD_PASSWORD_HASH`
The bot now stores its editable runtime data in SQLite at:
```env
ARCHIVE_BOT_DB_PATH=state/archive-bot.db
```
`services.json` remains a seed/import-export file for monitored services. Existing JSON state files are migrated into SQLite automatically on startup when present.
For TMDB poster fallback in the public catalog, set either:
```env
TMDB_API_READ_TOKEN=your-tmdb-read-token
TMDB_API_KEY=your-tmdb-v3-api-key
```
The read token is preferred. If neither is set, the catalog only uses Jellyfin poster images.
Use the raw Discord bot token value. Do not include a `Bot ` prefix.
Generate the dashboard password hash:
@ -116,7 +133,7 @@ mkdir -p state
chmod 755 state
```
The container runs as `1001:1001` inside the image. If the mounted `services.json` or `state/` were created by another user, fix ownership once:
The container runs as `1001:1001` inside the image. SQLite lives in `state/archive-bot.db`. If the mounted `services.json` or `state/` were created by another user, fix ownership once:
```sh
sudo chown -R 1001:1001 services.json state
@ -128,7 +145,7 @@ If `state/` or `services.json` were created by a previous container as another u
sudo chown -R "$(id -u):$(id -g)" services.json state
```
The bot stores channel settings in `state/bot-settings.json` and Discord message IDs in `state/status-message.json` and `state/media-catalog.json`. Keep `state/` mounted so the bot edits the same messages after restarts.
The bot stores runtime state, settings, media library data, and Discord message IDs in SQLite. Keep `state/` mounted so the bot uses the same database after restarts. Legacy JSON files named by `ARCHIVE_STATUS_STATE`, `MEDIA_CATALOG_STATE`, `MEDIA_LIBRARY_STATE`, and `BOT_SETTINGS_STATE` are treated as one-time migration sources.
The deploy compose joins your existing reverse-proxy network:
@ -185,7 +202,7 @@ The dashboard currently supports:
- selecting the Discord channel used by the status updater
- adding/removing service rows
- editing check URL, display URL, expected statuses, timeout, and keyword
- saving `services.json`
- saving services into SQLite and exporting the current config to `services.json`
- forcing an immediate check and Discord message update
- syncing movies and shows from Jellyfin
- tracking additions and removals between Jellyfin syncs
@ -206,21 +223,17 @@ The dashboard also serves a read-only catalog page at `/catalog`. Set the public
In Jellyfin, create an API key from the admin dashboard, then enter the Jellyfin URL and key in the `Media` tab. If you have duplicate free/premium libraries, enter only the premium Jellyfin library names in the `Libraries` field, separated by commas. `Reset library` clears the saved media list and sync fingerprints. `Sync now` replaces the editable library with the current Jellyfin movies and shows while comparing against the previously saved library. `Auto-sync changes` checks Jellyfin periodically and republishes only when the catalog fingerprint changes. Jellyfin results are deduplicated across included libraries using provider IDs first, then normalized title and year.
Channel selections are stored in:
The main database path is:
```env
ARCHIVE_BOT_DB_PATH=state/archive-bot.db
```
Legacy migration/import paths remain configurable:
```env
BOT_SETTINGS_STATE=state/bot-settings.json
```
The bot stores media catalog message IDs in:
```env
MEDIA_CATALOG_STATE=state/media-catalog.json
```
The editable media library is stored in:
```env
MEDIA_LIBRARY_STATE=state/media-library.json
```

View file

@ -0,0 +1,240 @@
# Jellyfin VC Watch Party Plan
## Goal
Add Discord voice-channel watch parties to The Orb Bot where users choose media from the Jellyfin-backed catalog and a separate streaming worker pushes the selected video into a Discord VC.
The existing Python bot remains the control plane. The streaming runtime is a separate worker service.
## Why This Split Exists
The current bot is responsible for:
- dashboard and API
- Jellyfin library sync
- persistent state in SQLite
- Discord message/status publishing
It is not a video-streaming runtime. Discord VC video streaming needs a dedicated worker process with:
- Discord streaming session handling
- FFmpeg process control
- queue playback
- reconnect and health reporting
## Target Architecture
### 1. Orb Bot (`archive_bot`)
Responsibilities:
- create/manage watch-party sessions
- browse Jellyfin media
- queue items
- persist session state
- expose API/dashboard controls
- send commands to the stream worker
- read worker state
### 2. Stream Worker (`orb-stream-worker`)
Responsibilities:
- join a Discord VC
- start and stop the stream
- pull the selected item from Jellyfin
- transcode or normalize media as needed
- report current playback state
- handle pause/resume/seek/skip
### 3. Jellyfin
Responsibilities:
- source media metadata
- source playback URLs/files
- optional transcoding source if needed
## Current Foundation Implemented In This Repo
This repo now contains the control-plane foundation:
- SQLite-backed watch-party tables
- session, queue, event, and worker-state models
- HTTP APIs for watch-party orchestration
The stream worker itself is intentionally not implemented inside the Python bot.
## Data Model
### `watch_party_sessions`
- `id`
- `guild_id`
- `voice_channel_id`
- `text_channel_id`
- `owner_user_id`
- `title`
- `status` (`draft`, `queued`, `connecting`, `playing`, `paused`, `stopped`, `error`)
- `worker_session_id`
- `current_queue_entry_id`
- `created_at`
- `updated_at`
### `watch_party_queue`
- `id`
- `session_id`
- `position`
- `media_type`
- `jellyfin_source_id`
- `title`
- `year`
- `runtime`
- `summary`
- `poster_url`
- `status` (`queued`, `playing`, `played`, `skipped`, `failed`)
- `created_at`
### `watch_party_events`
- `id`
- `session_id`
- `event_type`
- `payload_json`
- `created_at`
### `watch_party_worker_state`
- `session_id`
- `worker_status`
- `playback_state`
- `current_title`
- `position_seconds`
- `duration_seconds`
- `last_error`
- `updated_at`
## Worker Contract
The Python bot should speak to the worker through a private HTTP API on the Docker network.
### `POST /sessions`
Create/connect a worker session.
Request:
```json
{
"sessionId": "local-session-id",
"guildId": "123",
"voiceChannelId": "456",
"textChannelId": "789"
}
```
### `POST /sessions/{sessionId}/play`
Start one Jellyfin item.
Request:
```json
{
"queueEntryId": 12,
"jellyfinSourceId": "abc123",
"title": "Movie Title",
"mediaType": "movie",
"playback": {
"itemId": "abc123",
"sourceId": "abc123",
"title": "Movie Title",
"mediaType": "movie",
"streamUrl": "http://jellyfin:8096/Videos/abc123/stream?...",
"downloadUrl": "http://jellyfin:8096/Items/abc123/Download?...",
"durationSeconds": 5420,
"posterUrl": "https://...",
"year": "2024",
"tmdbId": "1234",
"imdbId": "tt1234567",
"episode": null
}
}
```
Notes:
- Movies resolve directly to their Jellyfin item IDs.
- Shows currently resolve to the first playable episode so the worker receives a real video target instead of a series ID.
- The worker should treat `playback.streamUrl` as the primary source and `downloadUrl` as a fallback.
### `POST /sessions/{sessionId}/control`
Supported actions:
- `pause`
- `resume`
- `seek`
- `skip`
- `stop`
### `GET /sessions/{sessionId}`
Worker status:
```json
{
"workerStatus": "connected",
"playbackState": "playing",
"currentTitle": "Movie Title",
"positionSeconds": 381,
"durationSeconds": 5420,
"lastError": null
}
```
## Milestones
### Milestone 1: Control Plane
- DB schema
- watch-party service layer
- dashboard/API endpoints
- queue and state transitions
### Milestone 2: Worker Stub
- second service in Docker Compose
- authenticated private API
- no real VC streaming yet
### Milestone 3: One-Item Playback
- connect to VC
- start one playable Jellyfin item from the control-plane playback contract
- stop/pause/resume
### Milestone 4: Real Watch Parties
- queue autoadvance
- seek/skip
- reconnect handling
- worker heartbeats
- dashboard controls
## Deployment Shape
Recommended services:
- `archive-status-bot`
- `orb-stream-worker`
The worker should not be publicly exposed. It should only be reachable on the internal Docker network.
## Known Risks
- Discord streaming requires a non-standard worker model and operational care.
- Some media will need transcoding.
- Playback control is substantially harder than catalog sync.
- Watch-party commands and Discord UX still need to be added after the worker exists.

2
archive_bot/__init__.py Normal file
View file

@ -0,0 +1,2 @@
"""Archive Bot package."""

159
archive_bot/auth.py Normal file
View file

@ -0,0 +1,159 @@
from __future__ import annotations
import base64
import hashlib
import hmac
import os
import secrets
import threading
import time
from dataclasses import dataclass
from .core import PBKDF2_ITERATIONS, bool_env
@dataclass(frozen=True)
class DashboardAuthConfig:
username: str
password_hash: str
session_ttl_seconds: int
cookie_secure: bool
@dataclass
class DashboardSession:
username: str
csrf_token: str
expires_at: float
class DashboardAuth:
def __init__(self, config: DashboardAuthConfig) -> None:
self.config = config
self.lock = threading.Lock()
self.sessions: dict[str, DashboardSession] = {}
self.failed_logins: dict[str, list[float]] = {}
def login_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - 900
with self.lock:
attempts = [attempt for attempt in self.failed_logins.get(key, []) if attempt >= window_start]
self.failed_logins[key] = attempts
return len(attempts) < 10
def record_failed_login(self, key: str) -> None:
now = time.time()
with self.lock:
self.failed_logins.setdefault(key, []).append(now)
def clear_failed_login(self, key: str) -> None:
with self.lock:
self.failed_logins.pop(key, None)
def login(self, username: str, password: str) -> tuple[str, DashboardSession] | None:
if not hmac.compare_digest(username, self.config.username):
return None
if not verify_password_hash(self.config.password_hash, password):
return None
session_id = secrets.token_urlsafe(32)
session = DashboardSession(
username=username,
csrf_token=secrets.token_urlsafe(32),
expires_at=time.time() + self.config.session_ttl_seconds,
)
with self.lock:
self.sessions[session_id] = session
return session_id, session
def session_from_cookie(self, cookie_header: str | None) -> tuple[str, DashboardSession] | None:
from http.cookies import SimpleCookie
from .core import SESSION_COOKIE
if not cookie_header:
return None
cookie = SimpleCookie()
cookie.load(cookie_header)
morsel = cookie.get(SESSION_COOKIE)
if morsel is None:
return None
session_id = morsel.value
now = time.time()
with self.lock:
session = self.sessions.get(session_id)
if session is None:
return None
if session.expires_at <= now:
self.sessions.pop(session_id, None)
return None
session.expires_at = now + self.config.session_ttl_seconds
return session_id, session
def logout(self, session_id: str) -> None:
with self.lock:
self.sessions.pop(session_id, None)
def password_hash(password: str) -> str:
salt = secrets.token_bytes(16)
digest = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS)
salt_text = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=")
digest_text = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
return f"pbkdf2_sha256${PBKDF2_ITERATIONS}${salt_text}${digest_text}"
def decode_urlsafe_base64(value: str) -> bytes:
padding = "=" * (-len(value) % 4)
return base64.urlsafe_b64decode(value + padding)
def verify_password_hash(encoded: str, password: str) -> bool:
try:
algorithm, iterations, salt_text, digest_text = encoded.split("$", 3)
if algorithm != "pbkdf2_sha256":
return False
salt = decode_urlsafe_base64(salt_text)
expected = decode_urlsafe_base64(digest_text)
actual = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, int(iterations))
except (ValueError, TypeError):
return False
return hmac.compare_digest(actual, expected)
def dashboard_auth_from_env() -> DashboardAuth | None:
if bool_env("DASHBOARD_AUTH_DISABLED", False):
return None
username = os.getenv("DASHBOARD_USERNAME", "").strip()
encoded_hash = os.getenv("DASHBOARD_PASSWORD_HASH", "").strip()
if not username or not encoded_hash:
raise SystemExit(
"Dashboard auth is enabled but DASHBOARD_USERNAME or DASHBOARD_PASSWORD_HASH is missing. "
"Set credentials or explicitly set DASHBOARD_AUTH_DISABLED=true."
)
ttl = int(os.getenv("DASHBOARD_SESSION_TTL_SECONDS", "28800"))
secure = bool_env("DASHBOARD_COOKIE_SECURE", False)
return DashboardAuth(
DashboardAuthConfig(
username=username,
password_hash=encoded_hash,
session_ttl_seconds=ttl,
cookie_secure=secure,
)
)
def print_password_hash() -> None:
import getpass
first = getpass.getpass("Dashboard password: ")
second = getpass.getpass("Confirm password: ")
if first != second:
raise SystemExit("Passwords did not match")
if len(first) < 12:
raise SystemExit("Use at least 12 characters")
print(password_hash(first))

147
archive_bot/core.py Normal file
View file

@ -0,0 +1,147 @@
from __future__ import annotations
import json
import os
import socket
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
DISCORD_API = "https://discord.com/api/v10"
DEFAULT_INTERVAL_SECONDS = 60
DEFAULT_TIMEOUT_SECONDS = 10
MAX_DISCORD_EMBEDS = 10
MAX_REQUEST_BYTES = 8_000_000
SESSION_COOKIE = "archive_bot_session"
PBKDF2_ITERATIONS = 390_000
MEDIA_ITEMS_PER_EMBED = 10
DEFAULT_JELLYFIN_SYNC_INTERVAL_SECONDS = 900
@dataclass(frozen=True)
class Service:
name: str
group: str
url: str
display_url: str
method: str
timeout: float
expected_statuses: set[int]
expected_min: int
expected_max: int
keyword: str | None
@dataclass(frozen=True)
class CheckResult:
service: Service
ok: bool
status: int | None
latency_ms: int | None
error: str | None
@dataclass(frozen=True)
class MediaItem:
title: str
media_type: str
source_id: str | None = None
tmdb_id: str | None = None
imdb_id: str | None = None
year: str | None = None
genres: str | None = None
rating: str | None = None
runtime: str | None = None
summary: str | None = None
seasons: int | None = None
episodes: int | None = None
class BotRuntime:
def __init__(
self,
token: str,
channel_id: str,
config_path: Path,
state_path: Path,
media_state_path: Path,
media_library_path: Path,
settings_path: Path,
dry_run: bool = False,
) -> None:
import threading
self.token = token
self.default_channel_id = channel_id
self.config_path = config_path
self.state_path = state_path
self.media_state_path = media_state_path
self.media_library_path = media_library_path
self.settings_path = settings_path
self.dry_run = dry_run
self.lock = threading.Lock()
self.last_results: list[CheckResult] = []
self.last_error: str | None = None
self.last_message_id: str | None = None
self.last_checked_at: datetime | None = None
def env(name: str, default: str | None = None) -> str:
value = os.getenv(name, default)
if value is None or not value.strip():
raise SystemExit(f"Missing required environment variable: {name}")
return value.strip()
def bool_env(name: str, default: bool = False) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def normalize_discord_token(token: str) -> str:
cleaned = token.strip().strip("\"'")
if cleaned.lower().startswith("bot "):
cleaned = cleaned[4:].strip()
return cleaned
def load_dotenv(path: Path = Path(".env")) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip().strip("\"'")
if key and key not in os.environ:
os.environ[key] = value
def load_json(path: Path) -> dict[str, Any]:
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except FileNotFoundError as exc:
raise ValueError(f"Config file not found: {path}") from exc
except PermissionError as exc:
raise ValueError(f"Config file is not readable: {path}") from exc
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
if not isinstance(data, dict):
raise ValueError(f"Config must be a JSON object: {path}")
return data
def clean_error(exc: BaseException) -> str:
reason = getattr(exc, "reason", None)
if reason:
return str(reason)[:120]
return str(exc)[:120] or exc.__class__.__name__ or socket.__name__

791
archive_bot/dashboard.py Normal file
View file

@ -0,0 +1,791 @@
from __future__ import annotations
import hmac
import json
import os
import time
import urllib.parse
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from .auth import DashboardAuth, DashboardSession, dashboard_auth_from_env
from .core import BotRuntime, MAX_REQUEST_BYTES, SESSION_COOKIE, bool_env
from .media import (
available_media_genres,
catalog_item_for_source_id,
catalog_known_source_ids,
fetch_jellyfin_image,
media_catalog_status,
load_media_library,
media_items_from_data,
publish_media_items,
recommend_media_items,
resolve_jellyfin_playback_source,
reset_media_library,
save_media_library,
sync_jellyfin_library,
update_jellyfin_sync_state,
)
from .status import result_to_jsonable, run_check_cycle, runtime_status, save_services_config, services_from_data
from .storage import (
channel_settings,
jellyfin_settings,
save_catalog_url_setting,
save_channel_settings,
save_jellyfin_settings,
)
from .watchparty import (
add_watch_party_queue_item,
create_watch_party_session,
first_queued_entry,
get_watch_party_session,
list_media_candidates,
list_watch_party_sessions,
update_queue_entry_status,
update_watch_party_status,
update_worker_state,
worker_command_payload,
)
from .worker_client import (
worker_control,
worker_create_session,
worker_enabled,
worker_health,
worker_play,
worker_status,
)
def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]:
dashboard_path = Path(__file__).resolve().parent.parent / "dashboard.html"
class DashboardHandler(BaseHTTPRequestHandler):
server_version = "ArchiveStatusDashboard/1.0"
def log_message(self, format: str, *args: Any) -> None:
print(f"[dashboard] {self.address_string()} - {format % args}", flush=True)
def do_GET(self) -> None:
parsed_request = urllib.parse.urlparse(self.path)
path = parsed_request.path
query = urllib.parse.parse_qs(parsed_request.query)
if path in {"/", "/dashboard"}:
self.send_dashboard()
return
if path == "/catalog":
from .media import render_catalog_html
self.send_catalog(render_catalog_html(runtime))
return
if path.startswith("/catalog/poster/"):
item_id = urllib.parse.unquote(path.removeprefix("/catalog/poster/")).strip()
self.send_catalog_poster(item_id)
return
if path == "/favicon.ico":
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
return
if path == "/api/session":
session = self.require_auth()
if session is None:
return
_session_id, data = session
self.send_json(
HTTPStatus.OK,
{
"username": data.username,
"csrfToken": data.csrf_token,
},
)
return
if path == "/api/status":
if self.require_auth() is None:
return
self.send_json(HTTPStatus.OK, runtime_status(runtime))
return
if path == "/api/media":
if self.require_auth() is None:
return
self.send_json(HTTPStatus.OK, media_catalog_status(runtime))
return
if path == "/api/media/recommendations":
if self.require_auth() is None:
return
movies, shows = load_media_library(runtime)
media_type = str(query.get("type", ["all"])[0] or "all")
genre = str(query.get("genre", ["all"])[0] or "all")
mode = str(query.get("mode", ["balanced"])[0] or "balanced")
search = str(query.get("search", [""])[0] or "").strip()
try:
limit = int(str(query.get("limit", ["6"])[0] or "6"))
except ValueError:
limit = 6
self.send_json(
HTTPStatus.OK,
{
"genres": available_media_genres(movies, shows),
**recommend_media_items(
movies,
shows,
media_type=media_type,
genre=genre,
mode=mode,
search=search,
limit=limit,
),
},
)
return
if path == "/api/settings":
if self.require_auth() is None:
return
self.send_json(HTTPStatus.OK, {"channels": channel_settings(runtime)})
return
if path == "/api/watchparty/sessions":
if self.require_auth() is None:
return
self.send_json(HTTPStatus.OK, {"sessions": list_watch_party_sessions()})
return
if path.startswith("/api/watchparty/sessions/"):
if self.require_auth() is None:
return
suffix = path.removeprefix("/api/watchparty/sessions/")
if suffix.isdigit():
self.send_json(HTTPStatus.OK, get_watch_party_session(int(suffix)))
return
if path == "/api/watchparty/media":
if self.require_auth() is None:
return
media_type = str(query.get("type", ["all"])[0] or "all")
search = str(query.get("search", [""])[0] or "").strip()
try:
limit = int(str(query.get("limit", ["25"])[0] or "25"))
except ValueError:
limit = 25
self.send_json(
HTTPStatus.OK,
{"items": list_media_candidates(runtime, media_type=media_type, search=search, limit=limit)},
)
return
if path == "/api/jellyfin":
if self.require_auth() is None:
return
self.send_json(HTTPStatus.OK, {"jellyfin": jellyfin_settings(runtime)})
return
if path == "/api/watchparty/worker-health":
if self.require_auth() is None:
return
if not worker_enabled():
self.send_json(HTTPStatus.OK, {"configured": False})
return
self.send_json(HTTPStatus.OK, {"configured": True, "worker": worker_health()})
return
self.send_error(HTTPStatus.NOT_FOUND)
def do_POST(self) -> None:
path = urllib.parse.urlparse(self.path).path
if path == "/api/login":
self.handle_login()
return
session = self.require_auth(require_csrf=True)
if session is None:
return
if path == "/api/logout":
self.handle_logout(session[0])
return
if path == "/api/check":
self.handle_check()
return
if path == "/api/services":
self.handle_services()
return
if path == "/api/media":
self.handle_media_catalog()
return
if path == "/api/media/library":
self.handle_media_library()
return
if path == "/api/media/reset":
self.handle_media_reset()
return
if path == "/api/settings":
self.handle_settings()
return
if path == "/api/jellyfin/settings":
self.handle_jellyfin_settings()
return
if path == "/api/jellyfin/sync":
self.handle_jellyfin_sync()
return
if path == "/api/watchparty/sessions":
self.handle_watchparty_sessions()
return
if path == "/api/watchparty/queue":
self.handle_watchparty_queue()
return
if path == "/api/watchparty/session-status":
self.handle_watchparty_session_status()
return
if path == "/api/watchparty/queue-status":
self.handle_watchparty_queue_status()
return
if path == "/api/watchparty/worker-state":
self.handle_watchparty_worker_state()
return
if path == "/api/watchparty/worker-command":
self.handle_watchparty_worker_command()
return
if path == "/api/watchparty/worker-start":
self.handle_watchparty_worker_start()
return
if path == "/api/watchparty/worker-play-next":
self.handle_watchparty_worker_play_next()
return
if path == "/api/watchparty/worker-control":
self.handle_watchparty_worker_control()
return
if path == "/api/watchparty/worker-status":
self.handle_watchparty_worker_status()
return
self.send_error(HTTPStatus.NOT_FOUND)
def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None:
if auth is None:
return "disabled", DashboardSession("local", "disabled", time.time() + 3600)
session = auth.session_from_cookie(self.headers.get("Cookie"))
if session is None:
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"})
return None
if require_csrf:
csrf = self.headers.get("X-CSRF-Token", "")
if not hmac.compare_digest(csrf, session[1].csrf_token):
self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"})
return None
return session
def cookie_attributes(self, max_age: int) -> str:
attrs = [
"Path=/",
"HttpOnly",
"SameSite=Strict",
f"Max-Age={max_age}",
]
if auth is not None and auth.config.cookie_secure:
attrs.append("Secure")
return "; ".join(attrs)
def set_session_cookie(self, session_id: str) -> None:
ttl = auth.config.session_ttl_seconds if auth is not None else 3600
self.send_header(
"Set-Cookie",
f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}",
)
def clear_session_cookie(self) -> None:
self.send_header(
"Set-Cookie",
f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}",
)
def send_dashboard(self) -> None:
try:
body = dashboard_path.read_bytes()
except FileNotFoundError:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing")
return
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_catalog(self, body: bytes) -> None:
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_catalog_poster(self, item_id: str) -> None:
if item_id not in catalog_known_source_ids(runtime):
self.send_error(HTTPStatus.NOT_FOUND)
return
try:
item = catalog_item_for_source_id(runtime, item_id)
image = fetch_jellyfin_image(runtime, item_id)
except Exception as exc:
self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)})
return
if image is None and item is not None:
from .media import tmdb_fallback_poster_url
fallback_url = tmdb_fallback_poster_url(item)
if fallback_url:
self.send_response(HTTPStatus.FOUND)
self.send_header("Location", fallback_url)
self.send_header("Cache-Control", "public, max-age=3600")
self.end_headers()
return
if image is None:
self.send_error(HTTPStatus.NOT_FOUND)
return
body, content_type = image
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "public, max-age=3600")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def read_json(self) -> dict[str, Any]:
raw_length = self.headers.get("Content-Length", "0")
try:
length = int(raw_length)
except ValueError as exc:
raise ValueError("Invalid Content-Length") from exc
if length > MAX_REQUEST_BYTES:
raise ValueError("Request body is too large")
body = self.rfile.read(length)
try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("JSON body must be an object")
return data
def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def handle_login(self) -> None:
if auth is None:
self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"})
return
try:
data = self.read_json()
except ValueError as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
username = str(data.get("username", ""))
password = str(data.get("password", ""))
throttle_key = f"{self.client_address[0]}:{username}"
if not auth.login_allowed(throttle_key):
self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."})
return
login = auth.login(username, password)
if login is None:
auth.record_failed_login(throttle_key)
self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"})
return
auth.clear_failed_login(throttle_key)
session_id, session = login
payload = {
"username": session.username,
"csrfToken": session.csrf_token,
}
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(HTTPStatus.OK)
self.set_session_cookie(session_id)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def handle_logout(self, session_id: str) -> None:
if auth is not None:
auth.logout(session_id)
body = b'{\n "ok": true\n}'
self.send_response(HTTPStatus.OK)
self.clear_session_cookie()
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def handle_check(self) -> None:
try:
message_id, results = run_check_cycle(runtime)
except Exception as exc:
with runtime.lock:
runtime.last_error = str(exc)
self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)})
return
self.send_json(
HTTPStatus.OK,
{
"messageId": message_id,
"results": [result_to_jsonable(result) for result in results],
},
)
def handle_services(self) -> None:
try:
data = self.read_json()
services_from_data(data)
save_services_config(runtime.config_path, data)
message_id, results = run_check_cycle(runtime)
except Exception as exc:
with runtime.lock:
runtime.last_error = str(exc)
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(
HTTPStatus.OK,
{
"messageId": message_id,
"services": data.get("services", []),
"results": [result_to_jsonable(result) for result in results],
},
)
def handle_media_catalog(self) -> None:
try:
data = self.read_json()
if "movies" in data or "shows" in data:
if "catalogUrl" in data:
save_catalog_url_setting(runtime, str(data.get("catalogUrl", "")))
movies = media_items_from_data(data.get("movies", []), "movie")
shows = media_items_from_data(data.get("shows", []), "show")
save_media_library(runtime, movies, shows)
result = publish_media_items(
runtime=runtime,
channel_id=str(data.get("channelId", "")),
movies_all=movies,
shows_all=shows,
)
self.send_json(HTTPStatus.OK, result)
return
raise ValueError("Publish requires the saved Jellyfin media library")
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
def handle_media_library(self) -> None:
try:
data = self.read_json()
movies = media_items_from_data(data.get("movies", []), "movie")
shows = media_items_from_data(data.get("shows", []), "show")
saved = save_media_library(runtime, movies, shows)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(
HTTPStatus.OK,
{
"library": {
"movies": saved.get("movies", []),
"shows": saved.get("shows", []),
},
"movieCount": len(movies),
"showCount": len(shows),
},
)
def handle_media_reset(self) -> None:
try:
result = reset_media_library(runtime)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_settings(self) -> None:
try:
data = self.read_json()
channels = data.get("channels", data)
if not isinstance(channels, dict):
raise ValueError("Settings payload must include a channels object")
result = save_channel_settings(
runtime,
str(channels.get("statusChannelId", "")),
str(channels.get("mediaChannelId", "")),
str(channels.get("catalogUrl", "")),
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, {"channels": result})
def handle_jellyfin_settings(self) -> None:
try:
data = self.read_json()
result = save_jellyfin_settings(runtime, data)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, {"jellyfin": result})
def handle_jellyfin_sync(self) -> None:
try:
data = self.read_json()
if data:
save_jellyfin_settings(runtime, data)
result = sync_jellyfin_library(runtime, force_publish=bool(data.get("forcePublish", False)))
except Exception as exc:
update_jellyfin_sync_state(runtime, error=str(exc)[:240])
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc), "jellyfin": jellyfin_settings(runtime)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_sessions(self) -> None:
try:
data = self.read_json()
result = create_watch_party_session(
runtime,
guild_id=str(data.get("guildId", "")).strip(),
voice_channel_id=str(data.get("voiceChannelId", "")).strip(),
text_channel_id=str(data.get("textChannelId", "")).strip(),
owner_user_id=str(data.get("ownerUserId", "")).strip(),
title=str(data.get("title", "Watch Party")).strip() or "Watch Party",
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_queue(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
source_id = str(data.get("jellyfinSourceId", "")).strip()
result = add_watch_party_queue_item(runtime, session_id, source_id)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_session_status(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
status = str(data.get("status", "")).strip()
worker_session_id = data.get("workerSessionId")
current_queue_entry_id = data.get("currentQueueEntryId")
result = update_watch_party_status(
session_id,
status,
worker_session_id=str(worker_session_id).strip() if worker_session_id is not None else None,
current_queue_entry_id=int(current_queue_entry_id) if current_queue_entry_id not in {None, ""} else None,
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_queue_status(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
queue_entry_id = int(str(data.get("queueEntryId", "")).strip())
status = str(data.get("status", "")).strip()
result = update_queue_entry_status(session_id, queue_entry_id, status)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_worker_state(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
result = update_worker_state(
session_id,
worker_status=str(data.get("workerStatus", "idle")).strip() or "idle",
playback_state=str(data.get("playbackState", "idle")).strip() or "idle",
current_title=str(data.get("currentTitle", "")).strip(),
position_seconds=int(data.get("positionSeconds", 0) or 0),
duration_seconds=int(data.get("durationSeconds", 0) or 0),
last_error=str(data.get("lastError", "")).strip(),
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_worker_command(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
action = str(data.get("action", "")).strip()
command_data = data.get("data", {})
if not isinstance(command_data, dict):
raise ValueError("Worker command data must be an object")
result = worker_command_payload(session_id, action, command_data)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, result)
def handle_watchparty_worker_start(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
session_payload = get_watch_party_session(session_id)
session = session_payload["session"]
worker = worker_create_session(
session_id=session_id,
guild_id=str(session["guildId"]),
voice_channel_id=str(session["voiceChannelId"]),
text_channel_id=str(session["textChannelId"]),
)
result = update_watch_party_status(
session_id,
"connecting",
worker_session_id=str(worker.get("workerSessionId", session_id)),
current_queue_entry_id=session["currentQueueEntryId"],
)
result = update_worker_state(
session_id,
worker_status=str(worker.get("workerStatus", "idle")),
playback_state=str(worker.get("playbackState", "idle")),
current_title=str(worker.get("currentTitle", "")),
position_seconds=int(worker.get("positionSeconds", 0) or 0),
duration_seconds=int(worker.get("durationSeconds", 0) or 0),
last_error=str(worker.get("lastError", "")),
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, {"worker": worker, **result})
def handle_watchparty_worker_play_next(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
queue_entry = first_queued_entry(session_id)
if queue_entry is None:
raise ValueError("No queued items available")
media_item = catalog_item_for_source_id(runtime, str(queue_entry["jellyfinSourceId"]))
if media_item is None:
raise ValueError(f"Queued media item no longer exists in the saved library: {queue_entry['jellyfinSourceId']}")
playback = resolve_jellyfin_playback_source(runtime, media_item)
worker = worker_play(
session_id=session_id,
queue_entry_id=int(queue_entry["id"]),
jellyfin_source_id=str(queue_entry["jellyfinSourceId"]),
title=str(queue_entry["title"]),
media_type=str(queue_entry["mediaType"]),
playback=playback,
)
update_queue_entry_status(session_id, int(queue_entry["id"]), "playing")
update_watch_party_status(
session_id,
"playing",
current_queue_entry_id=int(queue_entry["id"]),
)
result = update_worker_state(
session_id,
worker_status=str(worker.get("workerStatus", "idle")),
playback_state=str(worker.get("playbackState", "idle")),
current_title=str(worker.get("currentTitle", "")),
position_seconds=int(worker.get("positionSeconds", 0) or 0),
duration_seconds=int(worker.get("durationSeconds", 0) or 0),
last_error=str(worker.get("lastError", "")),
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, {"worker": worker, "playback": playback, **result})
def handle_watchparty_worker_control(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
action = str(data.get("action", "")).strip()
command_data = data.get("data", {})
if not isinstance(command_data, dict):
raise ValueError("Worker control data must be an object")
worker = worker_control(session_id=session_id, action=action, data=command_data)
if action == "pause":
update_watch_party_status(session_id, "paused")
elif action == "resume":
update_watch_party_status(session_id, "playing")
elif action == "stop":
update_watch_party_status(session_id, "stopped")
result = update_worker_state(
session_id,
worker_status=str(worker.get("workerStatus", "idle")),
playback_state=str(worker.get("playbackState", "idle")),
current_title=str(worker.get("currentTitle", "")),
position_seconds=int(worker.get("positionSeconds", 0) or 0),
duration_seconds=int(worker.get("durationSeconds", 0) or 0),
last_error=str(worker.get("lastError", "")),
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, {"worker": worker, **result})
def handle_watchparty_worker_status(self) -> None:
try:
data = self.read_json()
session_id = int(str(data.get("sessionId", "")).strip())
worker = worker_status(session_id)
result = update_worker_state(
session_id,
worker_status=str(worker.get("workerStatus", "idle")),
playback_state=str(worker.get("playbackState", "idle")),
current_title=str(worker.get("currentTitle", "")),
position_seconds=int(worker.get("positionSeconds", 0) or 0),
duration_seconds=int(worker.get("durationSeconds", 0) or 0),
last_error=str(worker.get("lastError", "")),
)
except Exception as exc:
self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)})
return
self.send_json(HTTPStatus.OK, {"worker": worker, **result})
return DashboardHandler
def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None:
if not bool_env("DASHBOARD_ENABLED", False):
return None
host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1"
port = int(os.getenv("DASHBOARD_PORT", "8787"))
auth = dashboard_auth_from_env()
server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth))
thread = __import__("threading").Thread(target=server.serve_forever, name="dashboard", daemon=True)
thread.start()
auth_note = "without auth" if auth is None else "with password sessions"
print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True)
return server

111
archive_bot/db.py Normal file
View file

@ -0,0 +1,111 @@
from __future__ import annotations
import json
import os
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
DEFAULT_DB_PATH = Path("state/archive-bot.db")
def database_path() -> Path:
raw = os.getenv("ARCHIVE_BOT_DB_PATH", "").strip()
return Path(raw) if raw else DEFAULT_DB_PATH
def document_name(path: Path) -> str:
return path.as_posix()
def connect_db() -> sqlite3.Connection:
db_path = database_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
return connection
def initialize_database() -> None:
with connect_db() as connection:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS documents (
name TEXT PRIMARY KEY,
content TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
connection.execute(
"""
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
connection.execute(
"""
INSERT INTO metadata(key, value)
VALUES ('schema_version', '1')
ON CONFLICT(key) DO UPDATE SET value=excluded.value
"""
)
def load_document(path: Path) -> dict[str, Any] | None:
initialize_database()
with connect_db() as connection:
row = connection.execute(
"SELECT content FROM documents WHERE name = ?",
(document_name(path),),
).fetchone()
if row is None:
return None
try:
data = json.loads(str(row["content"]))
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}
def save_document(path: Path, data: dict[str, Any]) -> None:
initialize_database()
payload = json.dumps(data, indent=2, sort_keys=True)
now = datetime.now(timezone.utc).isoformat()
with connect_db() as connection:
connection.execute(
"""
INSERT INTO documents(name, content, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(name) DO UPDATE
SET content = excluded.content,
updated_at = excluded.updated_at
""",
(document_name(path), payload, now),
)
connection.commit()
def migrate_json_document(path: Path) -> None:
existing = load_document(path)
if existing is not None:
return
if not path.exists():
return
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (OSError, json.JSONDecodeError):
return
if isinstance(data, dict):
save_document(path, data)
def migrate_runtime_documents(paths: list[Path]) -> None:
initialize_database()
for path in paths:
migrate_json_document(path)

178
archive_bot/discord_api.py Normal file
View file

@ -0,0 +1,178 @@
from __future__ import annotations
import asyncio
import json
import secrets
import sys
import threading
import urllib.error
import urllib.request
from typing import Any
from .core import DISCORD_API
class DiscordGatewayManager:
def __init__(self, token: str) -> None:
self.token = token
self.thread: threading.Thread | None = None
self.loop: asyncio.AbstractEventLoop | None = None
self.client: Any = None
self.ready = threading.Event()
self._disconnecting = threading.Event()
def start(self) -> None:
if self.thread is not None:
return
self.thread = threading.Thread(target=self._run, name="discord-gateway", daemon=True)
self.thread.start()
def stop(self) -> None:
self._disconnecting.set()
if self.loop is not None and self.client is not None:
asyncio.run_coroutine_threadsafe(self.client.close(), self.loop)
if self.thread is not None:
self.thread.join(timeout=15)
def _run(self) -> None:
try:
import discord
except ImportError:
print(
"discord.py is not installed. Install requirements.txt to keep the bot online.",
file=sys.stderr,
flush=True,
)
self.ready.set()
return
class GatewayClient(discord.Client):
def __init__(self, manager: DiscordGatewayManager) -> None:
intents = discord.Intents.default()
intents.guilds = True
super().__init__(intents=intents)
self.manager = manager
async def on_ready(self) -> None:
await self.change_presence(status=discord.Status.online)
user = self.user
name = user.name if user is not None else "unknown"
bot_id = user.id if user is not None else "unknown"
print(f"Discord gateway connected as {name} ({bot_id})", flush=True)
self.manager.ready.set()
async def on_disconnect(self) -> None:
self.manager.ready.clear()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.client = GatewayClient(self)
try:
self.loop.run_until_complete(self.client.start(self.token))
except Exception as exc:
if not self._disconnecting.is_set():
print(f"Discord gateway stopped: {exc}", file=sys.stderr, flush=True)
finally:
self.ready.set()
try:
pending = asyncio.all_tasks(self.loop)
for task in pending:
task.cancel()
if pending:
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
finally:
self.loop.close()
def discord_request(
method: str,
token: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
body = None
headers = {
"Authorization": f"Bot {token}",
"User-Agent": "ArchiveStatusBot/1.0",
}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(
f"{DISCORD_API}{path}",
data=body,
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(request, timeout=20) as response:
data = response.read()
if not data:
return {}
return json.loads(data.decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
def discord_multipart_request(
method: str,
token: str,
path: str,
payload: dict[str, Any],
files: list[tuple[str, str, str, bytes]],
) -> dict[str, Any]:
boundary = f"----ArchiveBot{secrets.token_hex(16)}"
body = bytearray()
def add_part(name: str, content: bytes, content_type: str, filename: str | None = None) -> None:
body.extend(f"--{boundary}\r\n".encode("ascii"))
disposition = f'Content-Disposition: form-data; name="{name}"'
if filename is not None:
disposition += f'; filename="{filename}"'
body.extend(f"{disposition}\r\n".encode("utf-8"))
body.extend(f"Content-Type: {content_type}\r\n\r\n".encode("ascii"))
body.extend(content)
body.extend(b"\r\n")
add_part("payload_json", json.dumps(payload).encode("utf-8"), "application/json")
for field_name, filename, content_type, content in files:
add_part(field_name, content, content_type, filename)
body.extend(f"--{boundary}--\r\n".encode("ascii"))
request = urllib.request.Request(
f"{DISCORD_API}{path}",
data=bytes(body),
headers={
"Authorization": f"Bot {token}",
"User-Agent": "ArchiveStatusBot/1.0",
"Content-Type": f"multipart/form-data; boundary={boundary}",
},
method=method,
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
data = response.read()
if not data:
return {}
return json.loads(data.decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Discord API {method} {path} failed: {exc.code} {detail}") from exc
def discord_delete_message(token: str, channel_id: str, message_id: str) -> None:
try:
discord_request("DELETE", token, f"/channels/{channel_id}/messages/{message_id}")
except RuntimeError as exc:
print(f"Could not delete old media catalog message {message_id}: {exc}", file=sys.stderr)
def discord_bot_identity(token: str) -> dict[str, Any]:
return discord_request("GET", token, "/users/@me")

116
archive_bot/main.py Normal file
View file

@ -0,0 +1,116 @@
from __future__ import annotations
import os
import signal
import sys
import time
from pathlib import Path
from typing import Any
from .auth import print_password_hash
from .core import BotRuntime, bool_env, env, load_dotenv, normalize_discord_token
from .db import initialize_database, migrate_runtime_documents
from .dashboard import maybe_start_dashboard
from .discord_api import DiscordGatewayManager, discord_bot_identity
from .media import maybe_run_jellyfin_sync
from .status import load_services, print_preview, run_check_cycle
from .watchparty import initialize_watchparty_schema
def main() -> int:
load_dotenv()
if "--hash-password" in sys.argv:
print_password_hash()
return 0
if "--preview" in sys.argv:
config_path = Path(os.getenv("ARCHIVE_STATUS_CONFIG", "services.json"))
initialize_database()
migrate_runtime_documents([config_path])
print_preview(load_services(config_path))
return 0
token = normalize_discord_token(env("DISCORD_BOT_TOKEN"))
channel_id = os.getenv("DISCORD_CHANNEL_ID", "").strip()
config_path = Path(env("ARCHIVE_STATUS_CONFIG", "services.json"))
state_path = Path(env("ARCHIVE_STATUS_STATE", "state/status-message.json"))
media_state_path = Path(env("MEDIA_CATALOG_STATE", "state/media-catalog.json"))
media_library_path = Path(env("MEDIA_LIBRARY_STATE", "state/media-library.json"))
settings_path = Path(env("BOT_SETTINGS_STATE", "state/bot-settings.json"))
initialize_database()
initialize_watchparty_schema()
migrate_runtime_documents(
[
config_path,
state_path,
media_state_path,
media_library_path,
settings_path,
]
)
interval = int(env("CHECK_INTERVAL_SECONDS", "60"))
runtime = BotRuntime(
token,
channel_id,
config_path,
state_path,
media_state_path,
media_library_path,
settings_path,
dry_run=bool_env("DISCORD_DRY_RUN", False),
)
gateway = None
if bool_env("DISCORD_GATEWAY_ENABLED", True) and not runtime.dry_run:
gateway = DiscordGatewayManager(token)
gateway.start()
dashboard = maybe_start_dashboard(runtime)
if runtime.dry_run:
print("Discord dry run is enabled; no Discord messages will be sent or edited.", flush=True)
else:
try:
identity = discord_bot_identity(token)
username = identity.get("username", "unknown")
bot_id = identity.get("id", "unknown")
print(f"Discord token authenticated as {username} ({bot_id})", flush=True)
except Exception as exc:
print(f"Could not verify Discord bot identity: {exc}", file=sys.stderr, flush=True)
stopped = False
def stop(_signum: int, _frame: Any) -> None:
nonlocal stopped
stopped = True
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
while not stopped:
try:
message_id, results = run_check_cycle(runtime)
online = sum(1 for result in results if result.ok)
print(f"Updated Discord status message {message_id}: {online}/{len(results)} online", flush=True)
sync_result = maybe_run_jellyfin_sync(runtime)
if sync_result is not None:
action = "published" if sync_result["published"] else "checked"
print(
f"Jellyfin sync {action}: {sync_result['movieCount']} movies, {sync_result['showCount']} shows",
flush=True,
)
except Exception as exc:
with runtime.lock:
runtime.last_error = str(exc)
print(f"Status update failed: {exc}", file=sys.stderr, flush=True)
for _ in range(interval):
if stopped:
break
time.sleep(1)
if dashboard is not None:
dashboard.shutdown()
if gateway is not None:
gateway.stop()
return 0

1517
archive_bot/media.py Normal file

File diff suppressed because it is too large Load diff

352
archive_bot/status.py Normal file
View file

@ -0,0 +1,352 @@
from __future__ import annotations
import json
import socket
import ssl
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .core import (
BotRuntime,
CheckResult,
DEFAULT_INTERVAL_SECONDS,
DEFAULT_TIMEOUT_SECONDS,
Service,
clean_error,
env,
load_json,
)
from .db import load_document, save_document
from .discord_api import discord_request
from .storage import channel_settings, load_state, save_state, validate_channel_id
def parse_expected_statuses(raw: Any) -> tuple[set[int], int, int]:
if raw is None:
return set(), 200, 399
if isinstance(raw, str):
raw = [part.strip() for part in raw.split(",") if part.strip()]
if not isinstance(raw, list):
raise ValueError("expectedStatuses must be a list or comma-separated string")
exact: set[int] = set()
min_status = 999
max_status = 0
for item in raw:
if isinstance(item, int):
exact.add(item)
continue
if not isinstance(item, str):
raise ValueError("expectedStatuses entries must be integers or ranges")
if "-" in item:
left, right = item.split("-", 1)
try:
min_status = min(min_status, int(left))
max_status = max(max_status, int(right))
except ValueError as exc:
raise ValueError(f"Invalid expected status range: {item}") from exc
continue
try:
exact.add(int(item))
except ValueError as exc:
raise ValueError(f"Invalid expected status value: {item}") from exc
if min_status == 999 and max_status == 0:
min_status, max_status = 0, -1
return exact, min_status, max_status
def services_from_data(data: dict[str, Any]) -> list[Service]:
raw_services = data.get("services")
if not isinstance(raw_services, list) or not raw_services:
raise ValueError("Config must include a non-empty services array")
services: list[Service] = []
for index, item in enumerate(raw_services, start=1):
if not isinstance(item, dict):
raise ValueError(f"Service #{index} must be an object")
name = str(item.get("name", "")).strip()
url = str(item.get("url", "")).strip()
if not name or not url:
raise ValueError(f"Service #{index} must include name and url")
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError(f"Service {name} has an invalid http(s) URL")
exact, minimum, maximum = parse_expected_statuses(item.get("expectedStatuses"))
services.append(
Service(
name=name,
group=str(item.get("group", "Main Services")).strip() or "Main Services",
url=url,
display_url=str(item.get("displayUrl", url)).strip() or url,
method=str(item.get("method", "GET")).strip().upper(),
timeout=float(item.get("timeoutSeconds", DEFAULT_TIMEOUT_SECONDS)),
expected_statuses=exact,
expected_min=minimum,
expected_max=maximum,
keyword=(str(item["keyword"]).strip() if item.get("keyword") else None),
)
)
return services
def load_services(path: Path) -> list[Service]:
data = load_document(path)
if data is None:
data = load_json(path)
return services_from_data(data)
def save_services_config(path: Path, data: dict[str, Any]) -> None:
services_from_data(data)
save_document(path, data)
path.parent.mkdir(parents=True, exist_ok=True)
temporary = path.with_suffix(f"{path.suffix}.tmp")
with temporary.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
try:
temporary.replace(path)
except OSError:
with path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
temporary.unlink(missing_ok=True)
def services_to_jsonable(services: list[Service]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for service in services:
expected: list[int | str] = sorted(service.expected_statuses)
if service.expected_min <= service.expected_max:
expected.append(f"{service.expected_min}-{service.expected_max}")
item: dict[str, Any] = {
"name": service.name,
"group": service.group,
"url": service.url,
"displayUrl": service.display_url,
"method": service.method,
"timeoutSeconds": service.timeout,
"expectedStatuses": expected or ["200-399"],
}
if service.keyword:
item["keyword"] = service.keyword
output.append(item)
return output
def status_expected(service: Service, status: int) -> bool:
if status in service.expected_statuses:
return True
return service.expected_min <= status <= service.expected_max
def check_service(service: Service) -> CheckResult:
started = time.monotonic()
headers = {"User-Agent": env("HTTP_USER_AGENT", "ArchiveStatusBot/1.0")}
request = urllib.request.Request(service.url, headers=headers, method=service.method)
try:
context = ssl.create_default_context()
with urllib.request.urlopen(request, timeout=service.timeout, context=context) as response:
body = response.read(1_000_000) if service.keyword else b""
status = int(response.status)
except urllib.error.HTTPError as exc:
status = int(exc.code)
latency_ms = int((time.monotonic() - started) * 1000)
ok = status_expected(service, status)
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
except (urllib.error.URLError, TimeoutError, socket.timeout, ssl.SSLError) as exc:
latency_ms = int((time.monotonic() - started) * 1000)
return CheckResult(service, False, None, latency_ms, clean_error(exc))
latency_ms = int((time.monotonic() - started) * 1000)
ok = status_expected(service, status)
if ok and service.keyword:
try:
text = body.decode("utf-8", errors="ignore")
except UnicodeDecodeError:
text = ""
if service.keyword not in text:
return CheckResult(service, False, status, latency_ms, "keyword missing")
return CheckResult(service, ok, status, latency_ms, None if ok else f"HTTP {status}")
def render_embeds(results: list[CheckResult]) -> list[dict[str, Any]]:
checked_at = datetime.now(timezone.utc)
online = sum(1 for result in results if result.ok)
total = len(results)
degraded = 0 < online < total
if total == 0:
color = 0x6B7280
summary = "No services configured."
elif online == total:
color = 0x10B981
summary = f"🟢 Operational · {online}/{total} online"
elif degraded:
color = 0xF59E0B
offline = total - online
attention = "1 service needs attention" if offline == 1 else f"{offline} services need attention"
summary = f"🟡 Degraded · {online}/{total} online · {attention}"
else:
color = 0xEF4444
summary = f"🔴 Outage · {online}/{total} online"
groups: dict[str, list[CheckResult]] = {}
for result in results:
groups.setdefault(result.service.group, []).append(result)
fields = []
for group_name, group_results in groups.items():
service_lines = []
state_lines = []
for result in group_results:
icon = "🟢" if result.ok else "🔴"
label = "Online" if result.ok else "Issue"
service_lines.append(f"**{result.service.name}**")
state_lines.append(f"{icon} {label}")
fields.append(
{
"name": group_name,
"value": "\n".join(service_lines)[:1024] or "None",
"inline": True,
}
)
fields.append(
{
"name": "Status",
"value": "\n".join(state_lines)[:1024] or "None",
"inline": True,
}
)
fields.append(
{
"name": "\u200b",
"value": "\u200b",
"inline": False,
}
)
if fields and fields[-1]["name"] == "\u200b":
fields.pop()
interval = env("CHECK_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)).strip()
return [
{
"title": "The Mithral Archive",
"description": summary,
"color": color,
"fields": fields[:25],
"footer": {"text": f"Refreshes every {interval}s • Last checked {checked_at.strftime('%Y-%m-%d %H:%M:%S')} UTC"},
"timestamp": checked_at.isoformat(),
}
]
def upsert_status_message(
token: str,
channel_id: str,
state_path: Path,
results: list[CheckResult],
) -> str:
state = load_state(state_path)
message_id = str(state.get("message_id", "")).strip()
payload = {"content": "", "embeds": render_embeds(results)}
if message_id:
try:
discord_request("PATCH", token, f"/channels/{channel_id}/messages/{message_id}", payload)
return message_id
except RuntimeError as exc:
print(f"Could not edit existing status message, creating a new one: {exc}", flush=True)
message = discord_request("POST", token, f"/channels/{channel_id}/messages", payload)
new_id = str(message.get("id", "")).strip()
if not new_id:
raise RuntimeError("Discord did not return a message id")
save_state(state_path, {"message_id": new_id})
return new_id
def fake_preview_results(services: list[Service]) -> list[CheckResult]:
results: list[CheckResult] = []
for index, service in enumerate(services):
results.append(
CheckResult(
service=service,
ok=index != len(services) - 1,
status=200 if index != len(services) - 1 else 502,
latency_ms=42 + (index * 31),
error=None if index != len(services) - 1 else "HTTP 502",
)
)
return results
def print_preview(services: list[Service]) -> None:
payload = {"content": "", "embeds": render_embeds(fake_preview_results(services))}
print(json.dumps(payload, indent=2))
def result_to_jsonable(result: CheckResult) -> dict[str, Any]:
return {
"name": result.service.name,
"group": result.service.group,
"url": result.service.url,
"displayUrl": result.service.display_url,
"ok": result.ok,
"status": result.status,
"latencyMs": result.latency_ms,
"error": result.error,
}
def run_check_cycle(runtime: BotRuntime) -> tuple[str, list[CheckResult]]:
services = load_services(runtime.config_path)
results = [check_service(service) for service in services]
if runtime.dry_run:
message_id = "dry-run"
else:
channel_id = validate_channel_id(channel_settings(runtime)["statusChannelId"], "Status")
message_id = upsert_status_message(runtime.token, channel_id, runtime.state_path, results)
with runtime.lock:
runtime.last_results = results
runtime.last_message_id = message_id
runtime.last_checked_at = datetime.now(timezone.utc)
runtime.last_error = None
return message_id, results
def runtime_status(runtime: BotRuntime) -> dict[str, Any]:
services = load_services(runtime.config_path)
settings = channel_settings(runtime)
with runtime.lock:
results = list(runtime.last_results)
return {
"services": services_to_jsonable(services),
"results": [result_to_jsonable(result) for result in results],
"lastError": runtime.last_error,
"lastMessageId": runtime.last_message_id,
"lastCheckedAt": runtime.last_checked_at.isoformat() if runtime.last_checked_at else None,
"channelId": settings["statusChannelId"],
"channels": settings,
}

139
archive_bot/storage.py Normal file
View file

@ -0,0 +1,139 @@
from __future__ import annotations
import json
import os
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .core import BotRuntime
from .db import load_document, save_document
def load_state(path: Path) -> dict[str, Any]:
data = load_document(path)
if data is None:
return {}
return data if isinstance(data, dict) else {}
def save_state(path: Path, state: dict[str, Any]) -> None:
save_document(path, state)
def validate_channel_id(value: str, label: str) -> str:
channel_id = value.strip()
if not channel_id:
raise ValueError(f"{label} channel ID is required")
if not channel_id.isdigit():
raise ValueError(f"{label} channel ID must be a Discord numeric channel ID")
return channel_id
def channel_settings(runtime: BotRuntime) -> dict[str, str]:
data = load_state(runtime.settings_path)
status_channel = str(data.get("status_channel_id", "")).strip() or runtime.default_channel_id
media_channel = str(data.get("media_channel_id", "")).strip() or status_channel
catalog_url = str(data.get("catalog_url", "")).strip() or os.getenv("PUBLIC_CATALOG_URL", "").strip()
return {
"statusChannelId": status_channel,
"mediaChannelId": media_channel,
"catalogUrl": catalog_url,
}
def validate_catalog_url(value: str) -> str:
catalog_url = value.strip()
if not catalog_url:
return ""
parsed = urllib.parse.urlparse(catalog_url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("Catalog URL must be a valid http(s) URL")
return catalog_url
def save_channel_settings(
runtime: BotRuntime,
status_channel_id: str,
media_channel_id: str,
catalog_url: str = "",
) -> dict[str, str]:
status_channel = validate_channel_id(status_channel_id, "Status")
media_channel = validate_channel_id(media_channel_id or status_channel, "Media")
state = load_state(runtime.settings_path)
state.update(
{
"status_channel_id": status_channel,
"media_channel_id": media_channel,
"catalog_url": validate_catalog_url(catalog_url),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
)
save_state(runtime.settings_path, state)
return channel_settings(runtime)
def save_media_channel_setting(runtime: BotRuntime, media_channel_id: str) -> dict[str, str]:
media_channel = validate_channel_id(media_channel_id, "Media")
state = load_state(runtime.settings_path)
state["media_channel_id"] = media_channel
state["updated_at"] = datetime.now(timezone.utc).isoformat()
save_state(runtime.settings_path, state)
return channel_settings(runtime)
def jellyfin_settings(runtime: BotRuntime) -> dict[str, Any]:
data = load_state(runtime.settings_path)
api_key = str(data.get("jellyfin_api_key", "")).strip()
library_names = data.get("jellyfin_library_names", [])
if not isinstance(library_names, list):
library_names = []
return {
"url": str(data.get("jellyfin_url", "")).strip(),
"configured": bool(api_key),
"libraryNames": [str(name) for name in library_names if str(name).strip()],
"autoSync": bool(data.get("jellyfin_auto_sync", False)),
"lastSyncAt": data.get("jellyfin_last_sync_at"),
"lastSyncError": data.get("jellyfin_last_sync_error"),
"lastPublishedAt": data.get("jellyfin_last_published_at"),
"lastFingerprint": data.get("jellyfin_last_fingerprint"),
"lastPublishedFingerprint": data.get("jellyfin_last_published_fingerprint"),
"lastChanges": data.get("jellyfin_last_changes"),
}
def save_jellyfin_settings(runtime: BotRuntime, data: dict[str, Any]) -> dict[str, Any]:
state = load_state(runtime.settings_path)
url = str(data.get("url", "")).strip().rstrip("/")
if url:
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("Jellyfin URL must be a valid http(s) URL")
api_key = str(data.get("apiKey", "")).strip()
state["jellyfin_url"] = url
if api_key:
state["jellyfin_api_key"] = api_key
elif data.get("clearApiKey"):
state["jellyfin_api_key"] = ""
raw_library_names = data.get("libraryNames", [])
if isinstance(raw_library_names, str):
library_names = [name.strip() for name in raw_library_names.split(",") if name.strip()]
elif isinstance(raw_library_names, list):
library_names = [str(name).strip() for name in raw_library_names if str(name).strip()]
else:
raise ValueError("Jellyfin libraries must be a list or comma-separated string")
state["jellyfin_library_names"] = library_names
state["jellyfin_auto_sync"] = bool(data.get("autoSync", False))
state["updated_at"] = datetime.now(timezone.utc).isoformat()
save_state(runtime.settings_path, state)
return jellyfin_settings(runtime)
def save_catalog_url_setting(runtime: BotRuntime, catalog_url: str) -> dict[str, str]:
state = load_state(runtime.settings_path)
state["catalog_url"] = validate_catalog_url(catalog_url)
state["updated_at"] = datetime.now(timezone.utc).isoformat()
save_state(runtime.settings_path, state)
return channel_settings(runtime)

508
archive_bot/watchparty.py Normal file
View file

@ -0,0 +1,508 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from .core import BotRuntime, MediaItem
from .db import connect_db, initialize_database
from .media import load_media_library, media_item_card_payload
WATCH_PARTY_STATUSES = {
"draft",
"queued",
"connecting",
"playing",
"paused",
"stopped",
"error",
}
QUEUE_STATUSES = {"queued", "playing", "played", "skipped", "failed"}
@dataclass(frozen=True)
class WatchPartySession:
id: int
guild_id: str
voice_channel_id: str
text_channel_id: str
owner_user_id: str
title: str
status: str
worker_session_id: str
current_queue_entry_id: int | None
created_at: str
updated_at: str
def initialize_watchparty_schema() -> None:
initialize_database()
with connect_db() as connection:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS watch_party_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
guild_id TEXT NOT NULL,
voice_channel_id TEXT NOT NULL,
text_channel_id TEXT NOT NULL,
owner_user_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT NOT NULL,
worker_session_id TEXT NOT NULL DEFAULT '',
current_queue_entry_id INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
connection.execute(
"""
CREATE TABLE IF NOT EXISTS watch_party_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
position INTEGER NOT NULL,
media_type TEXT NOT NULL,
jellyfin_source_id TEXT NOT NULL,
title TEXT NOT NULL,
year TEXT NOT NULL DEFAULT '',
runtime TEXT NOT NULL DEFAULT '',
summary TEXT NOT NULL DEFAULT '',
poster_url TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES watch_party_sessions(id) ON DELETE CASCADE
)
"""
)
connection.execute(
"""
CREATE TABLE IF NOT EXISTS watch_party_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
event_type TEXT NOT NULL,
payload_json TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES watch_party_sessions(id) ON DELETE CASCADE
)
"""
)
connection.execute(
"""
CREATE TABLE IF NOT EXISTS watch_party_worker_state (
session_id INTEGER PRIMARY KEY,
worker_status TEXT NOT NULL DEFAULT 'idle',
playback_state TEXT NOT NULL DEFAULT 'idle',
current_title TEXT NOT NULL DEFAULT '',
position_seconds INTEGER NOT NULL DEFAULT 0,
duration_seconds INTEGER NOT NULL DEFAULT 0,
last_error TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES watch_party_sessions(id) ON DELETE CASCADE
)
"""
)
connection.commit()
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def normalize_watch_party_status(status: str) -> str:
cleaned = status.strip().lower()
if cleaned not in WATCH_PARTY_STATUSES:
raise ValueError(f"Invalid watch party status: {status}")
return cleaned
def normalize_queue_status(status: str) -> str:
cleaned = status.strip().lower()
if cleaned not in QUEUE_STATUSES:
raise ValueError(f"Invalid queue status: {status}")
return cleaned
def watch_party_session_from_row(row: Any) -> WatchPartySession:
return WatchPartySession(
id=int(row["id"]),
guild_id=str(row["guild_id"]),
voice_channel_id=str(row["voice_channel_id"]),
text_channel_id=str(row["text_channel_id"]),
owner_user_id=str(row["owner_user_id"]),
title=str(row["title"]),
status=str(row["status"]),
worker_session_id=str(row["worker_session_id"] or ""),
current_queue_entry_id=int(row["current_queue_entry_id"]) if row["current_queue_entry_id"] is not None else None,
created_at=str(row["created_at"]),
updated_at=str(row["updated_at"]),
)
def session_to_jsonable(session: WatchPartySession) -> dict[str, Any]:
return {
"id": session.id,
"guildId": session.guild_id,
"voiceChannelId": session.voice_channel_id,
"textChannelId": session.text_channel_id,
"ownerUserId": session.owner_user_id,
"title": session.title,
"status": session.status,
"workerSessionId": session.worker_session_id,
"currentQueueEntryId": session.current_queue_entry_id,
"createdAt": session.created_at,
"updatedAt": session.updated_at,
}
def queue_entry_to_jsonable(row: Any) -> dict[str, Any]:
return {
"id": int(row["id"]),
"sessionId": int(row["session_id"]),
"position": int(row["position"]),
"mediaType": str(row["media_type"]),
"jellyfinSourceId": str(row["jellyfin_source_id"]),
"title": str(row["title"]),
"year": str(row["year"]),
"runtime": str(row["runtime"]),
"summary": str(row["summary"]),
"posterUrl": str(row["poster_url"]),
"status": str(row["status"]),
"createdAt": str(row["created_at"]),
}
def worker_state_to_jsonable(row: Any) -> dict[str, Any]:
return {
"sessionId": int(row["session_id"]),
"workerStatus": str(row["worker_status"]),
"playbackState": str(row["playback_state"]),
"currentTitle": str(row["current_title"]),
"positionSeconds": int(row["position_seconds"]),
"durationSeconds": int(row["duration_seconds"]),
"lastError": str(row["last_error"]) or "",
"updatedAt": str(row["updated_at"]),
}
def log_watch_party_event(connection: Any, session_id: int, event_type: str, payload: dict[str, Any]) -> None:
connection.execute(
"""
INSERT INTO watch_party_events(session_id, event_type, payload_json, created_at)
VALUES (?, ?, ?, ?)
""",
(session_id, event_type, json.dumps(payload, sort_keys=True), utc_now()),
)
def list_media_candidates(runtime: BotRuntime, *, media_type: str = "all", search: str = "", limit: int = 25) -> list[dict[str, Any]]:
movies, shows = load_media_library(runtime)
pools = {
"movie": movies,
"show": shows,
"all": [*movies, *shows],
}
selected = pools.get(media_type, pools["all"])
query = search.strip().casefold()
if query:
selected = [
item
for item in selected
if query in " ".join([item.title, item.year or "", item.genres or "", item.summary or ""]).casefold()
]
selected = selected[: max(1, min(limit, 100))]
return [media_item_card_payload(item) for item in selected]
def find_media_item_by_source_id(runtime: BotRuntime, source_id: str) -> MediaItem | None:
movies, shows = load_media_library(runtime)
for item in [*movies, *shows]:
if item.source_id and item.source_id == source_id:
return item
return None
def create_watch_party_session(
runtime: BotRuntime,
*,
guild_id: str,
voice_channel_id: str,
text_channel_id: str,
owner_user_id: str,
title: str,
) -> dict[str, Any]:
del runtime
initialize_watchparty_schema()
if not guild_id.strip():
raise ValueError("guildId is required")
if not voice_channel_id.strip():
raise ValueError("voiceChannelId is required")
if not text_channel_id.strip():
raise ValueError("textChannelId is required")
if not owner_user_id.strip():
raise ValueError("ownerUserId is required")
now = utc_now()
with connect_db() as connection:
cursor = connection.execute(
"""
INSERT INTO watch_party_sessions(
guild_id, voice_channel_id, text_channel_id, owner_user_id,
title, status, worker_session_id, current_queue_entry_id, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, 'draft', '', NULL, ?, ?)
""",
(guild_id.strip(), voice_channel_id.strip(), text_channel_id.strip(), owner_user_id.strip(), title.strip(), now, now),
)
session_id = int(cursor.lastrowid)
connection.execute(
"""
INSERT INTO watch_party_worker_state(
session_id, worker_status, playback_state, current_title, position_seconds,
duration_seconds, last_error, updated_at
)
VALUES (?, 'idle', 'idle', '', 0, 0, '', ?)
""",
(session_id, now),
)
log_watch_party_event(connection, session_id, "session.created", {"title": title.strip(), "ownerUserId": owner_user_id.strip()})
connection.commit()
return get_watch_party_session(session_id)
def list_watch_party_sessions() -> list[dict[str, Any]]:
initialize_watchparty_schema()
with connect_db() as connection:
rows = connection.execute(
"SELECT * FROM watch_party_sessions ORDER BY updated_at DESC, id DESC"
).fetchall()
return [session_to_jsonable(watch_party_session_from_row(row)) for row in rows]
def get_watch_party_session(session_id: int) -> dict[str, Any]:
initialize_watchparty_schema()
with connect_db() as connection:
row = connection.execute(
"SELECT * FROM watch_party_sessions WHERE id = ?",
(session_id,),
).fetchone()
if row is None:
raise ValueError(f"Watch party session not found: {session_id}")
queue_rows = connection.execute(
"SELECT * FROM watch_party_queue WHERE session_id = ? ORDER BY position ASC, id ASC",
(session_id,),
).fetchall()
worker_row = connection.execute(
"SELECT * FROM watch_party_worker_state WHERE session_id = ?",
(session_id,),
).fetchone()
event_rows = connection.execute(
"SELECT id, event_type, payload_json, created_at FROM watch_party_events WHERE session_id = ? ORDER BY id DESC LIMIT 25",
(session_id,),
).fetchall()
return {
"session": session_to_jsonable(watch_party_session_from_row(row)),
"queue": [queue_entry_to_jsonable(entry) for entry in queue_rows],
"worker": worker_state_to_jsonable(worker_row) if worker_row is not None else None,
"events": [
{
"id": int(event["id"]),
"eventType": str(event["event_type"]),
"payload": json.loads(str(event["payload_json"])),
"createdAt": str(event["created_at"]),
}
for event in event_rows
],
}
def next_queue_position(connection: Any, session_id: int) -> int:
row = connection.execute(
"SELECT COALESCE(MAX(position), 0) AS position FROM watch_party_queue WHERE session_id = ?",
(session_id,),
).fetchone()
return int(row["position"]) + 1
def add_watch_party_queue_item(runtime: BotRuntime, session_id: int, jellyfin_source_id: str) -> dict[str, Any]:
initialize_watchparty_schema()
if not jellyfin_source_id.strip():
raise ValueError("jellyfinSourceId is required")
item = find_media_item_by_source_id(runtime, jellyfin_source_id.strip())
if item is None:
raise ValueError(f"Media item not found in saved library: {jellyfin_source_id}")
with connect_db() as connection:
session_row = connection.execute(
"SELECT * FROM watch_party_sessions WHERE id = ?",
(session_id,),
).fetchone()
if session_row is None:
raise ValueError(f"Watch party session not found: {session_id}")
position = next_queue_position(connection, session_id)
cursor = connection.execute(
"""
INSERT INTO watch_party_queue(
session_id, position, media_type, jellyfin_source_id, title,
year, runtime, summary, poster_url, status, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?)
""",
(
session_id,
position,
item.media_type,
item.source_id or "",
item.title,
item.year or "",
item.runtime or "",
item.summary or "",
media_item_card_payload(item).get("posterUrl", ""),
utc_now(),
),
)
queue_id = int(cursor.lastrowid)
connection.execute(
"UPDATE watch_party_sessions SET status = ?, updated_at = ? WHERE id = ?",
("queued", utc_now(), session_id),
)
log_watch_party_event(
connection,
session_id,
"queue.added",
{"queueEntryId": queue_id, "jellyfinSourceId": jellyfin_source_id.strip(), "title": item.title},
)
connection.commit()
return get_watch_party_session(session_id)
def update_watch_party_status(session_id: int, status: str, *, worker_session_id: str | None = None, current_queue_entry_id: int | None = None) -> dict[str, Any]:
initialize_watchparty_schema()
normalized_status = normalize_watch_party_status(status)
updates = ["status = ?", "updated_at = ?"]
values: list[Any] = [normalized_status, utc_now()]
if worker_session_id is not None:
updates.append("worker_session_id = ?")
values.append(worker_session_id.strip())
if current_queue_entry_id is not None:
updates.append("current_queue_entry_id = ?")
values.append(current_queue_entry_id)
values.append(session_id)
with connect_db() as connection:
if connection.execute("SELECT id FROM watch_party_sessions WHERE id = ?", (session_id,)).fetchone() is None:
raise ValueError(f"Watch party session not found: {session_id}")
connection.execute(
f"UPDATE watch_party_sessions SET {', '.join(updates)} WHERE id = ?",
tuple(values),
)
log_watch_party_event(
connection,
session_id,
"session.status",
{"status": normalized_status, "workerSessionId": worker_session_id or "", "currentQueueEntryId": current_queue_entry_id},
)
connection.commit()
return get_watch_party_session(session_id)
def update_queue_entry_status(session_id: int, queue_entry_id: int, status: str) -> dict[str, Any]:
initialize_watchparty_schema()
normalized_status = normalize_queue_status(status)
with connect_db() as connection:
row = connection.execute(
"SELECT id FROM watch_party_queue WHERE id = ? AND session_id = ?",
(queue_entry_id, session_id),
).fetchone()
if row is None:
raise ValueError(f"Queue entry not found: {queue_entry_id}")
connection.execute(
"UPDATE watch_party_queue SET status = ? WHERE id = ?",
(normalized_status, queue_entry_id),
)
log_watch_party_event(connection, session_id, "queue.status", {"queueEntryId": queue_entry_id, "status": normalized_status})
connection.commit()
return get_watch_party_session(session_id)
def update_worker_state(
session_id: int,
*,
worker_status: str,
playback_state: str,
current_title: str = "",
position_seconds: int = 0,
duration_seconds: int = 0,
last_error: str = "",
) -> dict[str, Any]:
initialize_watchparty_schema()
with connect_db() as connection:
if connection.execute("SELECT id FROM watch_party_sessions WHERE id = ?", (session_id,)).fetchone() is None:
raise ValueError(f"Watch party session not found: {session_id}")
connection.execute(
"""
INSERT INTO watch_party_worker_state(
session_id, worker_status, playback_state, current_title, position_seconds,
duration_seconds, last_error, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET
worker_status = excluded.worker_status,
playback_state = excluded.playback_state,
current_title = excluded.current_title,
position_seconds = excluded.position_seconds,
duration_seconds = excluded.duration_seconds,
last_error = excluded.last_error,
updated_at = excluded.updated_at
""",
(
session_id,
worker_status.strip(),
playback_state.strip(),
current_title.strip(),
max(0, position_seconds),
max(0, duration_seconds),
last_error.strip(),
utc_now(),
),
)
log_watch_party_event(
connection,
session_id,
"worker.state",
{
"workerStatus": worker_status.strip(),
"playbackState": playback_state.strip(),
"currentTitle": current_title.strip(),
"positionSeconds": max(0, position_seconds),
"durationSeconds": max(0, duration_seconds),
"lastError": last_error.strip(),
},
)
connection.commit()
return get_watch_party_session(session_id)
def worker_command_payload(session_id: int, action: str, data: dict[str, Any]) -> dict[str, Any]:
payload = get_watch_party_session(session_id)
payload["workerCommand"] = {
"action": action,
"sessionId": session_id,
"data": data,
}
return payload
def first_queued_entry(session_id: int) -> dict[str, Any] | None:
initialize_watchparty_schema()
with connect_db() as connection:
row = connection.execute(
"""
SELECT * FROM watch_party_queue
WHERE session_id = ? AND status = 'queued'
ORDER BY position ASC, id ASC
LIMIT 1
""",
(session_id,),
).fetchone()
return queue_entry_to_jsonable(row) if row is not None else None

View file

@ -0,0 +1,109 @@
from __future__ import annotations
import json
import os
import urllib.error
import urllib.request
from typing import Any
from .core import clean_error
def worker_base_url() -> str:
return os.getenv("WATCHPARTY_WORKER_URL", "").strip().rstrip("/")
def worker_token() -> str:
return os.getenv("WATCHPARTY_WORKER_TOKEN", "").strip()
def worker_enabled() -> bool:
return bool(worker_base_url() and worker_token())
def worker_request(method: str, path: str, payload: dict[str, Any] | None = None) -> dict[str, Any]:
base_url = worker_base_url()
token = worker_token()
if not base_url or not token:
raise RuntimeError("Watch-party worker is not configured")
body = None
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(
f"{base_url}{path}",
data=body,
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(request, timeout=20) as response:
data = response.read()
if not data:
return {}
return json.loads(data.decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Worker API failed: HTTP {exc.code} {detail}") from exc
except (urllib.error.URLError, TimeoutError) as exc:
raise RuntimeError(f"Worker API failed: {clean_error(exc)}") from exc
def worker_health() -> dict[str, Any]:
return worker_request("GET", "/health")
def worker_create_session(*, session_id: int, guild_id: str, voice_channel_id: str, text_channel_id: str) -> dict[str, Any]:
return worker_request(
"POST",
"/sessions",
{
"sessionId": str(session_id),
"guildId": guild_id,
"voiceChannelId": voice_channel_id,
"textChannelId": text_channel_id,
},
)
def worker_play(
*,
session_id: int,
queue_entry_id: int,
jellyfin_source_id: str,
title: str,
media_type: str,
playback: dict[str, Any],
) -> dict[str, Any]:
return worker_request(
"POST",
f"/sessions/{session_id}/play",
{
"queueEntryId": queue_entry_id,
"jellyfinSourceId": jellyfin_source_id,
"title": title,
"mediaType": media_type,
"playback": playback,
},
)
def worker_control(*, session_id: int, action: str, data: dict[str, Any] | None = None) -> dict[str, Any]:
return worker_request(
"POST",
f"/sessions/{session_id}/control",
{
"action": action,
"data": data or {},
},
)
def worker_status(session_id: int) -> dict[str, Any]:
return worker_request("GET", f"/sessions/{session_id}")

View file

@ -5,6 +5,8 @@ services:
restart: unless-stopped
env_file:
- .env
environment:
ARCHIVE_BOT_DB_PATH: /app/state/archive-bot.db
expose:
- "8787"
volumes:
@ -12,6 +14,16 @@ services:
- ./state:/app/state
networks:
- mediaserver_default
orb-stream-worker:
build: ./orb_stream_worker
container_name: orb-stream-worker
restart: unless-stopped
env_file:
- .env
expose:
- "8790"
networks:
- mediaserver_default
networks:
mediaserver_default:

View file

@ -350,6 +350,10 @@
margin-top: 16px;
}
.recommend-panel {
margin-top: 16px;
}
.media-toolbar {
display: flex;
flex-wrap: wrap;
@ -400,6 +404,113 @@
padding: 18px 14px;
}
.recommend-controls {
display: grid;
grid-template-columns: minmax(160px, 0.8fr) minmax(160px, 0.8fr) minmax(160px, 0.8fr) minmax(220px, 1.4fr) auto;
gap: 8px;
padding: 12px 14px;
border-bottom: 1px solid var(--line);
}
.recommend-results {
display: grid;
gap: 1px;
background: var(--line);
}
.recommend-card {
display: grid;
grid-template-columns: 64px minmax(0, 1fr);
gap: 12px;
background: var(--panel);
padding: 12px 14px;
}
.recommend-card strong {
display: block;
font-size: 15px;
margin-bottom: 4px;
}
.recommend-card .reason {
margin-top: 8px;
color: var(--muted);
}
.recommendation-poster {
width: 64px;
aspect-ratio: 2 / 3;
border: 1px solid var(--line);
background: #14161a;
overflow: hidden;
display: flex;
align-items: center;
justify-content: center;
color: var(--muted);
font-size: 11px;
text-align: center;
}
.recommendation-poster img {
width: 100%;
height: 100%;
object-fit: cover;
display: block;
}
.watch-layout {
display: grid;
gap: 16px;
}
.watch-grid {
display: grid;
grid-template-columns: minmax(260px, 0.8fr) minmax(0, 1.2fr);
gap: 16px;
}
.watch-form {
display: grid;
grid-template-columns: repeat(3, minmax(180px, 1fr));
gap: 12px;
padding: 14px;
border-bottom: 1px solid var(--line);
}
.watch-list,
.watch-results,
.watch-queue,
.watch-events {
display: grid;
gap: 1px;
background: var(--line);
}
.watch-item,
.watch-result,
.watch-queue-item,
.watch-event {
background: var(--panel);
padding: 12px 14px;
}
.watch-item.active {
outline: 1px solid var(--line-strong);
outline-offset: -1px;
}
.watch-toolbar,
.watch-controls {
display: flex;
flex-wrap: wrap;
gap: 8px;
}
.watch-toolbar {
padding: 12px 14px;
border-bottom: 1px solid var(--line);
}
.token-screen {
max-width: 420px;
margin: 80px auto;
@ -439,7 +550,10 @@
.channel-form,
.library-row,
.library-row.movies,
.media-form {
.media-form,
.recommend-controls,
.watch-form,
.watch-grid {
grid-template-columns: 1fr 1fr;
}
}
@ -465,7 +579,10 @@
.library-row,
.library-row.movies,
.media-form,
.media-status {
.media-status,
.recommend-controls,
.watch-form,
.watch-grid {
grid-template-columns: 1fr;
}
}
@ -489,7 +606,7 @@
<nav aria-label="Bot modules">
<button class="nav-item active" type="button" data-view="status"><span>Status</span><span>Ready</span></button>
<button class="nav-item" type="button" data-view="media"><span>Media</span><span>Jellyfin</span></button>
<div class="nav-item disabled"><span>Polls</span><span>Later</span></div>
<button class="nav-item" type="button" data-view="watch"><span>Watch Party</span><span>Worker</span></button>
<div class="nav-item disabled"><span>Automations</span><span>Later</span></div>
</nav>
</aside>
@ -637,6 +754,144 @@
</div>
<div id="mediaLibrary" class="library-list"></div>
</section>
<section class="panel recommend-panel">
<div class="panel-header">
<div class="panel-title">Recommender</div>
<div id="recommendMessage" class="message"></div>
</div>
<div class="recommend-controls">
<div class="media-field">
<label for="recommendType">Type</label>
<select id="recommendType">
<option value="all">Movies and shows</option>
<option value="movie">Movies</option>
<option value="show">Shows</option>
</select>
</div>
<div class="media-field">
<label for="recommendGenre">Genre</label>
<select id="recommendGenre">
<option value="all">All genres</option>
</select>
</div>
<div class="media-field">
<label for="recommendMode">Mode</label>
<select id="recommendMode">
<option value="balanced">Balanced</option>
<option value="quick">Quick watch</option>
<option value="deep">Deep dive</option>
<option value="recent">Newer picks</option>
<option value="comfort">Comfort picks</option>
</select>
</div>
<div class="media-field">
<label for="recommendSearch">Search cue</label>
<input id="recommendSearch" placeholder="Optional title, genre, or vibe">
</div>
<div class="media-field">
<label>&nbsp;</label>
<button id="recommendRun" type="button">Pick for me</button>
</div>
</div>
<div id="recommendResults" class="recommend-results"></div>
</section>
</section>
<section id="watchView" class="view" hidden>
<div class="topbar">
<h1>Watch Parties</h1>
<div class="actions">
<button id="watchRefresh" type="button">Refresh</button>
<button id="watchCreate" class="primary" type="button">Create session</button>
</div>
</div>
<div class="watch-layout">
<section class="panel">
<div class="panel-header">
<div class="panel-title">Session Draft</div>
<div id="watchWorkerState" class="message"></div>
</div>
<div class="watch-form">
<div class="media-field">
<label for="watchGuildId">Guild ID</label>
<input id="watchGuildId" placeholder="Discord guild ID">
</div>
<div class="media-field">
<label for="watchVoiceChannelId">Voice Channel ID</label>
<input id="watchVoiceChannelId" placeholder="Voice channel ID">
</div>
<div class="media-field">
<label for="watchTextChannelId">Text Channel ID</label>
<input id="watchTextChannelId" placeholder="Text channel ID">
</div>
<div class="media-field">
<label for="watchOwnerUserId">Owner User ID</label>
<input id="watchOwnerUserId" placeholder="Discord user ID">
</div>
<div class="media-field">
<label for="watchTitle">Session Title</label>
<input id="watchTitle" placeholder="Friday Night Movie">
</div>
</div>
</section>
<div class="watch-grid">
<section class="panel">
<div class="panel-header">
<div class="panel-title">Sessions</div>
<div id="watchMessage" class="message"></div>
</div>
<div id="watchSessions" class="watch-list"></div>
</section>
<section class="panel">
<div class="panel-header">
<div class="panel-title">Selected Session</div>
<div id="watchSessionState" class="message"></div>
</div>
<div class="watch-toolbar">
<div class="watch-controls">
<button id="watchWorkerStart" type="button">Connect Worker</button>
<button id="watchPlayNext" type="button">Play Next</button>
<button id="watchPause" type="button">Pause</button>
<button id="watchResume" type="button">Resume</button>
<button id="watchStop" type="button">Stop</button>
<button id="watchWorkerRefresh" type="button">Refresh Worker</button>
</div>
</div>
<div id="watchQueue" class="watch-queue"></div>
<div id="watchEvents" class="watch-events"></div>
</section>
</div>
<section class="panel">
<div class="panel-header">
<div class="panel-title">Add Media To Queue</div>
<div id="watchSearchMessage" class="message"></div>
</div>
<div class="watch-form">
<div class="media-field">
<label for="watchSearchType">Type</label>
<select id="watchSearchType">
<option value="all">Movies and shows</option>
<option value="movie">Movies</option>
<option value="show">Shows</option>
</select>
</div>
<div class="media-field">
<label for="watchSearchText">Search</label>
<input id="watchSearchText" placeholder="Movie, show, or keyword">
</div>
<div class="media-field">
<label>&nbsp;</label>
<button id="watchSearchRun" type="button">Search Library</button>
</div>
</div>
<div id="watchResults" class="watch-results"></div>
</section>
</div>
</section>
</main>
</div>
@ -646,6 +901,7 @@
const loginEl = document.querySelector("#login");
const statusViewEl = document.querySelector("#statusView");
const mediaViewEl = document.querySelector("#mediaView");
const watchViewEl = document.querySelector("#watchView");
const servicesEl = document.querySelector("#services");
const mediaLibraryEl = document.querySelector("#mediaLibrary");
const libraryCountEl = document.querySelector("#libraryCount");
@ -653,6 +909,16 @@
const mediaMessageEl = document.querySelector("#mediaMessage");
const jellyfinStateEl = document.querySelector("#jellyfinState");
const loginMessageEl = document.querySelector("#loginMessage");
const recommendMessageEl = document.querySelector("#recommendMessage");
const recommendResultsEl = document.querySelector("#recommendResults");
const watchMessageEl = document.querySelector("#watchMessage");
const watchWorkerStateEl = document.querySelector("#watchWorkerState");
const watchSessionStateEl = document.querySelector("#watchSessionState");
const watchSearchMessageEl = document.querySelector("#watchSearchMessage");
const watchSessionsEl = document.querySelector("#watchSessions");
const watchResultsEl = document.querySelector("#watchResults");
const watchQueueEl = document.querySelector("#watchQueue");
const watchEventsEl = document.querySelector("#watchEvents");
let services = [];
let results = new Map();
@ -661,6 +927,11 @@
let mediaLibrary = { movies: [], shows: [] };
let activeMediaTab = "movies";
let jellyfin = { url: "", configured: false, libraryNames: [], autoSync: false };
let mediaGenres = [];
let recommendationCache = {};
let watchSessions = [];
let watchResults = [];
let selectedWatchSessionId = null;
function headers(method = "GET") {
const base = { "Content-Type": "application/json" };
@ -702,9 +973,35 @@
jellyfinStateEl.classList.toggle("error", isError);
}
function setRecommendMessage(text, isError = false) {
recommendMessageEl.textContent = text;
recommendMessageEl.classList.toggle("error", isError);
}
function setWatchMessage(text, isError = false) {
watchMessageEl.textContent = text;
watchMessageEl.classList.toggle("error", isError);
}
function setWatchWorkerMessage(text, isError = false) {
watchWorkerStateEl.textContent = text;
watchWorkerStateEl.classList.toggle("error", isError);
}
function setWatchSessionMessage(text, isError = false) {
watchSessionStateEl.textContent = text;
watchSessionStateEl.classList.toggle("error", isError);
}
function setWatchSearchMessage(text, isError = false) {
watchSearchMessageEl.textContent = text;
watchSearchMessageEl.classList.toggle("error", isError);
}
function showView(name) {
statusViewEl.hidden = name !== "status";
mediaViewEl.hidden = name !== "media";
watchViewEl.hidden = name !== "watch";
document.querySelectorAll("[data-view]").forEach((item) => {
item.classList.toggle("active", item.dataset.view === name);
});
@ -864,6 +1161,13 @@
mediaLibrary = normalizeMediaLibrary(payload.library);
renderMediaLibrary();
}
mediaGenres = Array.isArray(payload.genres) ? payload.genres : mediaGenres;
renderRecommendationGenres();
if (payload.recommendations) {
recommendationCache = payload.recommendations;
const initialMode = document.querySelector("#recommendMode").value;
renderRecommendations(recommendationCache[initialMode] || recommendationCache.balanced || { items: [], summary: "" });
}
document.querySelector("#mediaMovieCount").textContent = payload.movieCount == null ? "Not published" : payload.movieCount;
document.querySelector("#mediaShowCount").textContent = payload.showCount == null ? "Not published" : payload.showCount;
document.querySelector("#mediaMessageCount").textContent = Array.isArray(payload.messageIds) ? payload.messageIds.length : 0;
@ -971,10 +1275,288 @@
};
}
function renderRecommendationGenres() {
const select = document.querySelector("#recommendGenre");
const current = select.value || "all";
select.innerHTML = '<option value="all">All genres</option>';
mediaGenres.forEach((genre) => {
const option = document.createElement("option");
option.value = genre;
option.textContent = genre;
select.append(option);
});
select.value = mediaGenres.includes(current) ? current : "all";
}
function renderRecommendations(payload = {}) {
recommendResultsEl.innerHTML = "";
const items = Array.isArray(payload.items) ? payload.items : [];
setRecommendMessage(payload.summary || "");
if (!items.length) {
const empty = document.createElement("div");
empty.className = "empty-library";
empty.textContent = "No recommendations available for the current filters.";
recommendResultsEl.append(empty);
return;
}
items.forEach((item) => {
const card = document.createElement("article");
card.className = "recommend-card";
const counts = item.mediaType === "show"
? [item.seasons ? `${item.seasons} season${item.seasons === 1 ? "" : "s"}` : "", item.episodes ? `${item.episodes} episodes` : ""].filter(Boolean).join(" · ")
: item.runtime;
const meta = [item.year, item.genres, item.rating, counts].filter(Boolean).join(" · ");
const poster = item.posterUrl
? `<div class="recommendation-poster"><img src="${escapeAttr(item.posterUrl)}" alt="${escapeAttr(item.title)} poster" loading="lazy"></div>`
: `<div class="recommendation-poster">No poster</div>`;
card.innerHTML = `
${poster}
<div>
<strong>${escapeAttr(item.title)}</strong>
<div class="meta">${escapeAttr(meta)}</div>
${item.summary ? `<p>${escapeAttr(item.summary)}</p>` : ""}
${item.reason ? `<div class="reason">${escapeAttr(item.reason)}</div>` : ""}
</div>
`;
recommendResultsEl.append(card);
});
}
async function loadRecommendations() {
const params = new URLSearchParams({
type: document.querySelector("#recommendType").value,
genre: document.querySelector("#recommendGenre").value,
mode: document.querySelector("#recommendMode").value,
search: document.querySelector("#recommendSearch").value.trim(),
limit: "6"
});
const payload = await api(`/api/media/recommendations?${params.toString()}`);
mediaGenres = Array.isArray(payload.genres) ? payload.genres : mediaGenres;
renderRecommendationGenres();
renderRecommendations(payload);
return payload;
}
function renderWatchSessions() {
watchSessionsEl.innerHTML = "";
if (!watchSessions.length) {
const empty = document.createElement("div");
empty.className = "empty-library";
empty.textContent = "No watch-party sessions created yet.";
watchSessionsEl.append(empty);
return;
}
watchSessions.forEach((session) => {
const item = document.createElement("button");
item.type = "button";
item.className = "watch-item";
if (selectedWatchSessionId === session.id) item.classList.add("active");
item.dataset.watchSessionId = String(session.id);
item.innerHTML = `<strong>${escapeAttr(session.title)}</strong><div class="meta">${escapeAttr(session.status)} · VC ${escapeAttr(session.voiceChannelId)}</div>`;
watchSessionsEl.append(item);
});
}
function renderWatchDetail(payload = {}) {
const session = payload.session;
watchQueueEl.innerHTML = "";
watchEventsEl.innerHTML = "";
if (!session) {
const empty = document.createElement("div");
empty.className = "empty-library";
empty.textContent = "Select a watch-party session.";
watchQueueEl.append(empty);
return;
}
const worker = payload.worker || {};
setWatchSessionMessage(`Status: ${session.status} · Worker: ${worker.workerStatus || "idle"} · Playback: ${worker.playbackState || "idle"}`);
const queue = Array.isArray(payload.queue) ? payload.queue : [];
if (!queue.length) {
const empty = document.createElement("div");
empty.className = "empty-library";
empty.textContent = "Queue is empty.";
watchQueueEl.append(empty);
} else {
queue.forEach((entry) => {
const item = document.createElement("div");
item.className = "watch-queue-item";
item.innerHTML = `<strong>${escapeAttr(entry.title)}</strong><div class="meta">${escapeAttr(entry.status)} · #${entry.position}</div>`;
watchQueueEl.append(item);
});
}
const events = Array.isArray(payload.events) ? payload.events : [];
if (!events.length) {
const empty = document.createElement("div");
empty.className = "empty-library";
empty.textContent = "No recent events.";
watchEventsEl.append(empty);
} else {
events.forEach((event) => {
const item = document.createElement("div");
item.className = "watch-event";
item.innerHTML = `<strong>${escapeAttr(event.eventType)}</strong><div class="meta">${escapeAttr(new Date(event.createdAt).toLocaleString())}</div>`;
watchEventsEl.append(item);
});
}
}
function renderWatchResults() {
watchResultsEl.innerHTML = "";
if (!watchResults.length) {
const empty = document.createElement("div");
empty.className = "empty-library";
empty.textContent = "Search the saved Jellyfin library to add titles.";
watchResultsEl.append(empty);
return;
}
watchResults.forEach((item) => {
const card = document.createElement("div");
card.className = "watch-result";
const counts = item.mediaType === "show"
? [item.seasons ? `${item.seasons} seasons` : "", item.episodes ? `${item.episodes} episodes` : ""].filter(Boolean).join(" · ")
: item.runtime;
const meta = [item.year, item.genres, counts].filter(Boolean).join(" · ");
card.innerHTML = `
<strong>${escapeAttr(item.title)}</strong>
<div class="meta">${escapeAttr(meta)}</div>
${item.summary ? `<p>${escapeAttr(item.summary)}</p>` : ""}
<div class="watch-controls">
<button type="button" data-watch-add="${escapeAttr(item.sourceId || "")}">Add to queue</button>
</div>
`;
watchResultsEl.append(card);
});
}
async function loadWatchWorkerHealth() {
const payload = await api("/api/watchparty/worker-health");
if (!payload.configured) {
setWatchWorkerMessage("Worker not configured.", true);
return payload;
}
const worker = payload.worker || {};
const statusBits = [];
statusBits.push(worker.streamingImplemented ? "stream runtime ready" : `stream runtime unavailable: ${worker.streamingError || "unknown error"}`);
statusBits.push(worker.discordConfigured ? (worker.discordReady ? "Discord ready" : "Discord login pending") : "missing Discord user token");
statusBits.push(`sessions ${worker.sessions ?? "0"}`);
setWatchWorkerMessage(statusBits.join(" · "), !worker.streamingImplemented || !worker.discordConfigured);
return payload;
}
async function loadWatchSessions() {
const payload = await api("/api/watchparty/sessions");
watchSessions = Array.isArray(payload.sessions) ? payload.sessions : [];
if (!selectedWatchSessionId && watchSessions.length) selectedWatchSessionId = watchSessions[0].id;
renderWatchSessions();
if (selectedWatchSessionId) {
await loadWatchSessionDetail(selectedWatchSessionId);
} else {
renderWatchDetail({});
}
return payload;
}
async function loadWatchSessionDetail(sessionId) {
const payload = await api(`/api/watchparty/sessions/${sessionId}`);
selectedWatchSessionId = payload.session.id;
renderWatchSessions();
renderWatchDetail(payload);
return payload;
}
async function createWatchSession() {
const payload = await api("/api/watchparty/sessions", {
method: "POST",
body: JSON.stringify({
guildId: document.querySelector("#watchGuildId").value.trim(),
voiceChannelId: document.querySelector("#watchVoiceChannelId").value.trim(),
textChannelId: document.querySelector("#watchTextChannelId").value.trim(),
ownerUserId: document.querySelector("#watchOwnerUserId").value.trim(),
title: document.querySelector("#watchTitle").value.trim() || "Watch Party"
})
});
setWatchMessage("Created watch-party session.");
selectedWatchSessionId = payload.session.id;
await loadWatchSessions();
return payload;
}
async function searchWatchMedia() {
const params = new URLSearchParams({
type: document.querySelector("#watchSearchType").value,
search: document.querySelector("#watchSearchText").value.trim(),
limit: "25"
});
const payload = await api(`/api/watchparty/media?${params.toString()}`);
watchResults = Array.isArray(payload.items) ? payload.items : [];
renderWatchResults();
setWatchSearchMessage(`Found ${watchResults.length} library matches.`);
return payload;
}
async function addWatchQueue(sourceId) {
if (!selectedWatchSessionId) throw new Error("Select a watch-party session first.");
const payload = await api("/api/watchparty/queue", {
method: "POST",
body: JSON.stringify({ sessionId: selectedWatchSessionId, jellyfinSourceId: sourceId })
});
renderWatchDetail(payload);
setWatchMessage("Added item to watch-party queue.");
return payload;
}
async function watchWorkerStart() {
if (!selectedWatchSessionId) throw new Error("Select a watch-party session first.");
const payload = await api("/api/watchparty/worker-start", {
method: "POST",
body: JSON.stringify({ sessionId: selectedWatchSessionId })
});
renderWatchDetail(payload);
setWatchWorkerMessage("Worker session connected.");
return payload;
}
async function watchWorkerPlayNext() {
if (!selectedWatchSessionId) throw new Error("Select a watch-party session first.");
const payload = await api("/api/watchparty/worker-play-next", {
method: "POST",
body: JSON.stringify({ sessionId: selectedWatchSessionId })
});
renderWatchDetail(payload);
setWatchSessionMessage("Requested next queue item.");
return payload;
}
async function watchWorkerControl(action) {
if (!selectedWatchSessionId) throw new Error("Select a watch-party session first.");
const payload = await api("/api/watchparty/worker-control", {
method: "POST",
body: JSON.stringify({ sessionId: selectedWatchSessionId, action, data: {} })
});
renderWatchDetail(payload);
setWatchSessionMessage(`Sent ${action} to worker.`);
return payload;
}
async function watchWorkerRefresh() {
if (!selectedWatchSessionId) throw new Error("Select a watch-party session first.");
const payload = await api("/api/watchparty/worker-status", {
method: "POST",
body: JSON.stringify({ sessionId: selectedWatchSessionId })
});
renderWatchDetail(payload);
setWatchSessionMessage("Refreshed worker state.");
return payload;
}
function normalizeMediaItem(item = {}, mediaType = "movie") {
return {
title: item.title || "",
mediaType,
sourceId: item.sourceId || "",
tmdbId: item.tmdbId || "",
imdbId: item.imdbId || "",
posterUrl: item.posterUrl || "",
year: item.year || "",
genres: item.genres || "",
rating: item.rating || "",
@ -991,6 +1573,10 @@
row.querySelectorAll("[data-media-key]").forEach((input) => {
item[input.dataset.mediaKey] = input.value.trim();
});
item.sourceId = row.dataset.sourceId || "";
item.tmdbId = row.dataset.tmdbId || "";
item.imdbId = row.dataset.imdbId || "";
item.posterUrl = row.dataset.posterUrl || "";
return normalizeMediaItem(item, activeMediaTab === "movies" ? "movie" : "show");
});
mediaLibrary[activeMediaTab] = currentItems;
@ -1002,6 +1588,10 @@
const row = document.createElement("div");
row.className = `library-row ${activeMediaTab}`;
row.dataset.index = String(index);
row.dataset.sourceId = item.sourceId || "";
row.dataset.tmdbId = item.tmdbId || "";
row.dataset.imdbId = item.imdbId || "";
row.dataset.posterUrl = item.posterUrl || "";
const countFields = type === "show"
? `
<div class="field">
@ -1232,6 +1822,47 @@
publishMedia().catch((error) => setMediaMessage(error.message, true));
});
document.querySelector("#recommendRun").addEventListener("click", () => {
loadRecommendations().catch((error) => setRecommendMessage(error.message, true));
});
document.querySelector("#watchRefresh").addEventListener("click", () => {
Promise.all([loadWatchWorkerHealth(), loadWatchSessions()])
.catch((error) => setWatchMessage(error.message, true));
});
document.querySelector("#watchCreate").addEventListener("click", () => {
createWatchSession().catch((error) => setWatchMessage(error.message, true));
});
document.querySelector("#watchSearchRun").addEventListener("click", () => {
searchWatchMedia().catch((error) => setWatchSearchMessage(error.message, true));
});
document.querySelector("#watchWorkerStart").addEventListener("click", () => {
watchWorkerStart().catch((error) => setWatchWorkerMessage(error.message, true));
});
document.querySelector("#watchPlayNext").addEventListener("click", () => {
watchWorkerPlayNext().catch((error) => setWatchSessionMessage(error.message, true));
});
document.querySelector("#watchPause").addEventListener("click", () => {
watchWorkerControl("pause").catch((error) => setWatchSessionMessage(error.message, true));
});
document.querySelector("#watchResume").addEventListener("click", () => {
watchWorkerControl("resume").catch((error) => setWatchSessionMessage(error.message, true));
});
document.querySelector("#watchStop").addEventListener("click", () => {
watchWorkerControl("stop").catch((error) => setWatchSessionMessage(error.message, true));
});
document.querySelector("#watchWorkerRefresh").addEventListener("click", () => {
watchWorkerRefresh().catch((error) => setWatchSessionMessage(error.message, true));
});
document.querySelector("#logout").addEventListener("click", () => {
logout();
});
@ -1242,6 +1873,9 @@
if (item.dataset.view === "media") {
Promise.all([loadMediaStatus(), loadJellyfinStatus()])
.catch((error) => setMediaMessage(error.message, true));
} else if (item.dataset.view === "watch") {
Promise.all([loadWatchWorkerHealth(), loadWatchSessions()])
.catch((error) => setWatchMessage(error.message, true));
}
});
});
@ -1288,6 +1922,18 @@
renderMediaLibrary();
});
watchSessionsEl.addEventListener("click", (event) => {
const button = event.target.closest("[data-watch-session-id]");
if (!button) return;
loadWatchSessionDetail(Number(button.dataset.watchSessionId)).catch((error) => setWatchMessage(error.message, true));
});
watchResultsEl.addEventListener("click", (event) => {
const button = event.target.closest("[data-watch-add]");
if (!button) return;
addWatchQueue(button.dataset.watchAdd).catch((error) => setWatchSearchMessage(error.message, true));
});
let draggedRow = null;
servicesEl.addEventListener("dragstart", (e) => {

View file

@ -0,0 +1,13 @@
FROM node:20-alpine
WORKDIR /app
RUN apk add --no-cache ffmpeg python3 make g++
COPY package.json /app/package.json
RUN npm install --omit=dev
COPY server.js /app/server.js
EXPOSE 8790
CMD ["node", "/app/server.js"]

4240
orb_stream_worker/package-lock.json generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,16 @@
{
"name": "orb-stream-worker",
"version": "0.1.0",
"private": true,
"description": "Stream worker for Jellyfin-backed Discord VC watch parties",
"main": "server.js",
"scripts": {
"start": "node server.js"
},
"dependencies": {
"@dank074/discord-video-stream": "6.0.0",
"discord.js-selfbot-v13": "^3.7.1",
"dotenv": "^17.3.1",
"node-gyp": "^11.2.0"
}
}

525
orb_stream_worker/server.js Normal file
View file

@ -0,0 +1,525 @@
const http = require("http");
const { URL } = require("url");
try {
require("dotenv").config();
} catch (_) {
// Optional in local dev when dependencies are not installed yet.
}
const PORT = Number(process.env.PORT || "8790");
const TOKEN = String(process.env.WATCHPARTY_WORKER_TOKEN || "").trim();
const DISCORD_USER_TOKEN = String(
process.env.WATCHPARTY_DISCORD_USER_TOKEN ||
process.env.DISCORD_USER_TOKEN ||
process.env.TOKEN ||
""
).trim();
const STREAM_WIDTH = Number(process.env.WATCHPARTY_STREAM_WIDTH || "1280");
const STREAM_HEIGHT = Number(process.env.WATCHPARTY_STREAM_HEIGHT || "720");
const STREAM_FPS = Number(process.env.WATCHPARTY_STREAM_FPS || "30");
const STREAM_BITRATE_KBPS = Number(process.env.WATCHPARTY_STREAM_BITRATE_KBPS || "2000");
const STREAM_MAX_BITRATE_KBPS = Number(process.env.WATCHPARTY_STREAM_MAX_BITRATE_KBPS || "2500");
const STREAM_CODEC = String(process.env.WATCHPARTY_STREAM_CODEC || "H264").trim().toUpperCase();
const STREAM_PRESET = String(process.env.WATCHPARTY_STREAM_PRESET || "ultrafast").trim().toLowerCase();
const STREAM_HARDWARE_ACCELERATION = ["1", "true", "yes", "on"].includes(
String(process.env.WATCHPARTY_STREAM_HARDWARE_ACCELERATION || "").trim().toLowerCase()
);
if (!TOKEN) {
console.error("Missing WATCHPARTY_WORKER_TOKEN");
process.exit(1);
}
function loadStreamingStack() {
try {
const { Client } = require("discord.js-selfbot-v13");
const streamLib = require("@dank074/discord-video-stream");
return {
available: true,
Client,
Streamer: streamLib.Streamer,
Utils: streamLib.Utils,
prepareStream: streamLib.prepareStream,
playStream: streamLib.playStream,
error: "",
};
} catch (error) {
return {
available: false,
error: error && error.message ? String(error.message) : "Streaming dependencies are unavailable",
};
}
}
const streamingStack = loadStreamingStack();
const sessions = new Map();
const runtime = {
client: null,
streamer: null,
ready: false,
readyPromise: null,
activeSessionId: null,
};
function json(response, statusCode, payload) {
const body = Buffer.from(JSON.stringify(payload, null, 2));
response.writeHead(statusCode, {
"Content-Type": "application/json; charset=utf-8",
"Cache-Control": "no-store",
"Content-Length": String(body.length),
});
response.end(body);
}
function unauthorized(response) {
json(response, 401, { error: "Unauthorized" });
}
function notFound(response) {
json(response, 404, { error: "Not found" });
}
function badRequest(response, error) {
json(response, 400, { error: error instanceof Error ? error.message : String(error) });
}
function requireAuth(request, response) {
const auth = String(request.headers.authorization || "");
return auth === `Bearer ${TOKEN}` ? true : (unauthorized(response), false);
}
function readJson(request) {
return new Promise((resolve, reject) => {
const chunks = [];
request.on("data", (chunk) => chunks.push(chunk));
request.on("end", () => {
const body = Buffer.concat(chunks).toString("utf-8");
if (!body) {
resolve({});
return;
}
try {
const parsed = JSON.parse(body);
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
reject(new Error("JSON body must be an object"));
return;
}
resolve(parsed);
} catch (error) {
reject(new Error(`Invalid JSON: ${error.message}`));
}
});
request.on("error", reject);
});
}
function nowSeconds() {
return Math.floor(Date.now() / 1000);
}
function ensureSession(sessionId) {
const session = sessions.get(String(sessionId));
if (!session) {
throw new Error(`Unknown worker session: ${sessionId}`);
}
return session;
}
function streamOptions() {
const options = {
width: STREAM_WIDTH,
height: STREAM_HEIGHT,
frameRate: STREAM_FPS,
bitrateVideo: STREAM_BITRATE_KBPS,
bitrateVideoMax: STREAM_MAX_BITRATE_KBPS,
hardwareAcceleratedDecoding: STREAM_HARDWARE_ACCELERATION,
minimizeLatency: false,
h26xPreset: STREAM_PRESET,
};
if (streamingStack.available && streamingStack.Utils && typeof streamingStack.Utils.normalizeVideoCodec === "function") {
options.videoCodec = streamingStack.Utils.normalizeVideoCodec(STREAM_CODEC);
} else {
options.videoCodec = STREAM_CODEC;
}
return options;
}
function streamUrlAtOffset(url, startSeconds) {
const parsed = new URL(url);
parsed.searchParams.set("StartTimeTicks", String(Math.max(0, Math.floor(startSeconds)) * 10000000));
return parsed.toString();
}
function sessionPlaybackPosition(session) {
if (session.playbackState !== "playing") {
return session.positionSeconds;
}
const startedAtSeconds = Number(session.startedAtSeconds || 0);
if (!startedAtSeconds) {
return session.positionSeconds;
}
const elapsed = Math.max(0, nowSeconds() - startedAtSeconds);
const total = Math.max(0, Number(session.positionSeconds || 0) + elapsed);
if (session.durationSeconds > 0) {
return Math.min(total, session.durationSeconds);
}
return total;
}
function sessionPayload(session) {
return {
workerStatus: session.workerStatus,
playbackState: session.playbackState,
currentTitle: session.currentTitle,
positionSeconds: sessionPlaybackPosition(session),
durationSeconds: session.durationSeconds,
lastError: session.lastError,
workerSessionId: session.workerSessionId,
guildId: session.guildId,
voiceChannelId: session.voiceChannelId,
textChannelId: session.textChannelId,
queueEntryId: session.queueEntryId,
jellyfinSourceId: session.jellyfinSourceId,
mediaType: session.mediaType,
playback: session.playback,
streamingAvailable: streamingStack.available,
discordReady: runtime.ready,
};
}
async function ensureStreamingRuntime() {
if (!streamingStack.available) {
throw new Error(`Streaming runtime unavailable: ${streamingStack.error}`);
}
if (!DISCORD_USER_TOKEN) {
throw new Error("WATCHPARTY_DISCORD_USER_TOKEN is required for Discord VC streaming");
}
if (runtime.ready) {
return runtime;
}
if (runtime.readyPromise) {
await runtime.readyPromise;
return runtime;
}
runtime.readyPromise = (async () => {
const client = new streamingStack.Client();
client.on("error", (error) => {
console.error("[worker] discord client error:", error);
});
runtime.client = client;
runtime.streamer = new streamingStack.Streamer(client);
await client.login(DISCORD_USER_TOKEN);
if (client.readyAt) {
runtime.ready = true;
return;
}
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error("Discord client did not become ready in time")), 30000);
client.once("ready", () => {
clearTimeout(timeout);
runtime.ready = true;
resolve();
});
client.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
})();
try {
await runtime.readyPromise;
} catch (error) {
runtime.readyPromise = null;
runtime.ready = false;
runtime.client = null;
runtime.streamer = null;
throw error;
}
return runtime;
}
async function ensureVoiceConnection(session) {
await ensureStreamingRuntime();
if (!runtime.streamer || !runtime.client) {
throw new Error("Worker runtime is not ready");
}
if (runtime.activeSessionId && runtime.activeSessionId !== session.workerSessionId) {
await stopSessionPlayback(ensureSession(runtime.activeSessionId), false);
}
await runtime.streamer.joinVoice(session.guildId, session.voiceChannelId);
runtime.activeSessionId = session.workerSessionId;
session.workerStatus = "connected";
session.lastError = "";
if (runtime.client.user && typeof runtime.client.user.setActivity === "function") {
runtime.client.user.setActivity(`Watch Party: ${session.currentTitle || session.title || "idle"}`).catch(() => {});
}
await new Promise((resolve) => setTimeout(resolve, 2000));
}
function canPauseCommand(command) {
return Boolean(command && command.ffmpegProc && typeof command.ffmpegProc.kill === "function");
}
async function stopSessionPlayback(session, leaveVoice = true) {
session.manualStop = true;
if (session.abortController) {
try {
session.abortController.abort();
} catch (_) {}
}
if (runtime.streamer) {
try {
runtime.streamer.stopStream();
} catch (_) {}
if (leaveVoice) {
try {
runtime.streamer.leaveVoice();
} catch (_) {}
}
}
session.workerStatus = leaveVoice ? "connected" : session.workerStatus;
session.playbackState = leaveVoice ? "stopped" : "idle";
session.abortController = null;
session.ffmpegCommand = null;
session.startedAtSeconds = 0;
session.positionSeconds = 0;
if (leaveVoice && runtime.activeSessionId === session.workerSessionId) {
runtime.activeSessionId = null;
}
}
async function startPlayback(session, playback, startSeconds = 0) {
if (!playback || typeof playback !== "object") {
throw new Error("playback payload is required");
}
const input = String(playback.streamUrl || playback.downloadUrl || "").trim();
if (!input) {
throw new Error("Playback payload is missing a Jellyfin media URL");
}
await ensureVoiceConnection(session);
if (!runtime.streamer || !streamingStack.prepareStream || !streamingStack.playStream) {
throw new Error("Streaming runtime is not available");
}
if (session.abortController) {
await stopSessionPlayback(session, false);
}
const abortController = new AbortController();
session.abortController = abortController;
session.playback = playback;
session.currentTitle = String(playback.title || session.currentTitle || "").trim();
session.durationSeconds = Number(playback.durationSeconds || 0) || 0;
session.positionSeconds = Math.max(0, Number(startSeconds || 0));
session.startedAtSeconds = nowSeconds();
session.playbackState = "playing";
session.workerStatus = "streaming";
session.manualStop = false;
session.lastError = "";
const options = streamOptions();
const prepared = streamingStack.prepareStream(
session.positionSeconds > 0 ? streamUrlAtOffset(input, session.positionSeconds) : input,
options,
abortController.signal,
);
const command = prepared && prepared.command ? prepared.command : null;
const output = prepared && prepared.output ? prepared.output : null;
if (!command || !output) {
throw new Error("Failed to prepare FFmpeg stream");
}
session.ffmpegCommand = command;
command.on("error", (error) => {
if (session.manualStop || abortController.signal.aborted) {
return;
}
session.lastError = error && error.message ? String(error.message) : "FFmpeg stream failed";
session.playbackState = "error";
session.workerStatus = "error";
});
streamingStack.playStream(output, runtime.streamer, undefined, abortController.signal)
.then(() => {
if (session.manualStop || abortController.signal.aborted) {
return;
}
session.positionSeconds = session.durationSeconds > 0 ? session.durationSeconds : sessionPlaybackPosition(session);
session.startedAtSeconds = 0;
session.playbackState = "idle";
session.workerStatus = "connected";
session.ffmpegCommand = null;
session.abortController = null;
})
.catch((error) => {
if (session.manualStop || abortController.signal.aborted) {
return;
}
session.lastError = error && error.message ? String(error.message) : "Streaming failed";
session.startedAtSeconds = 0;
session.playbackState = "error";
session.workerStatus = "error";
session.ffmpegCommand = null;
session.abortController = null;
});
}
function createSessionRecord(body) {
const sessionId = String(body.sessionId || "").trim();
if (!sessionId) {
throw new Error("sessionId is required");
}
return {
workerSessionId: sessionId,
guildId: String(body.guildId || "").trim(),
voiceChannelId: String(body.voiceChannelId || "").trim(),
textChannelId: String(body.textChannelId || "").trim(),
title: String(body.title || "").trim(),
workerStatus: "idle",
playbackState: "idle",
currentTitle: "",
positionSeconds: 0,
durationSeconds: 0,
lastError: "",
queueEntryId: null,
jellyfinSourceId: "",
mediaType: "",
playback: null,
startedAtSeconds: 0,
manualStop: false,
abortController: null,
ffmpegCommand: null,
};
}
const server = http.createServer(async (request, response) => {
try {
const url = new URL(request.url, `http://127.0.0.1:${PORT}`);
if (request.method === "GET" && url.pathname === "/health") {
if (!requireAuth(request, response)) return;
json(response, 200, {
ok: true,
service: "orb-stream-worker",
streamingImplemented: streamingStack.available,
streamingError: streamingStack.available ? "" : streamingStack.error,
discordConfigured: Boolean(DISCORD_USER_TOKEN),
discordReady: runtime.ready,
sessions: sessions.size,
activeSessionId: runtime.activeSessionId,
});
return;
}
if (request.method === "POST" && url.pathname === "/sessions") {
if (!requireAuth(request, response)) return;
const body = await readJson(request);
const session = createSessionRecord(body);
sessions.set(session.workerSessionId, session);
try {
await ensureStreamingRuntime();
await ensureVoiceConnection(session);
} catch (error) {
session.workerStatus = "error";
session.lastError = error instanceof Error ? error.message : String(error);
}
json(response, 200, sessionPayload(session));
return;
}
const playMatch = request.method === "POST" && url.pathname.match(/^\/sessions\/([^/]+)\/play$/);
if (playMatch) {
if (!requireAuth(request, response)) return;
const session = ensureSession(decodeURIComponent(playMatch[1]));
const body = await readJson(request);
session.queueEntryId = Number(body.queueEntryId || 0) || null;
session.jellyfinSourceId = String(body.jellyfinSourceId || "").trim();
session.currentTitle = String(body.title || "").trim();
session.mediaType = String(body.mediaType || "").trim();
try {
await startPlayback(session, body.playback);
} catch (error) {
session.lastError = error instanceof Error ? error.message : String(error);
session.workerStatus = "error";
session.playbackState = "error";
}
json(response, 200, sessionPayload(session));
return;
}
const controlMatch = request.method === "POST" && url.pathname.match(/^\/sessions\/([^/]+)\/control$/);
if (controlMatch) {
if (!requireAuth(request, response)) return;
const session = ensureSession(decodeURIComponent(controlMatch[1]));
const body = await readJson(request);
const action = String(body.action || "").trim();
if (action === "pause") {
if (!canPauseCommand(session.ffmpegCommand)) {
throw new Error("Pause is not available because FFmpeg is not active");
}
session.positionSeconds = sessionPlaybackPosition(session);
session.startedAtSeconds = 0;
session.ffmpegCommand.ffmpegProc.kill("SIGSTOP");
session.playbackState = "paused";
session.workerStatus = "connected";
} else if (action === "resume") {
if (!canPauseCommand(session.ffmpegCommand)) {
throw new Error("Resume is not available because FFmpeg is not active");
}
session.startedAtSeconds = nowSeconds();
session.ffmpegCommand.ffmpegProc.kill("SIGCONT");
session.playbackState = "playing";
session.workerStatus = "streaming";
} else if (action === "stop" || action === "skip") {
await stopSessionPlayback(session, true);
} else if (action === "seek") {
if (!session.playback) {
throw new Error("Seek is not available because nothing has been loaded");
}
const requestedPosition = Math.max(0, Number((body.data && body.data.positionSeconds) || 0) || 0);
await stopSessionPlayback(session, false);
await startPlayback(session, session.playback, requestedPosition);
} else {
throw new Error(`Unsupported control action: ${action}`);
}
json(response, 200, sessionPayload(session));
return;
}
const sessionMatch = request.method === "GET" && url.pathname.match(/^\/sessions\/([^/]+)$/);
if (sessionMatch) {
if (!requireAuth(request, response)) return;
const session = ensureSession(decodeURIComponent(sessionMatch[1]));
json(response, 200, sessionPayload(session));
return;
}
notFound(response);
} catch (error) {
if (error instanceof Error && /Unknown worker session/.test(error.message)) {
notFound(response);
return;
}
if (error instanceof Error && /required|invalid|Unsupported|not implemented|not available|missing/i.test(error.message)) {
badRequest(response, error);
return;
}
json(response, 500, { error: error && error.message ? error.message : "Internal error" });
}
});
server.listen(PORT, "0.0.0.0", () => {
console.log(`orb-stream-worker listening on ${PORT}`);
});

File diff suppressed because it is too large Load diff