TheOrb/orb_stream_worker/server.js
2026-05-26 15:39:05 -04:00

530 lines
17 KiB
JavaScript

const http = require("http");
const { URL } = require("url");
const WebSocket = require("ws");
if (typeof globalThis.WebSocket === "undefined") {
globalThis.WebSocket = WebSocket;
}
try {
require("dotenv").config();
} catch (_) {
// Optional in local dev when dependencies are not installed yet.
}
const PORT = Number(process.env.PORT || "8790");
const TOKEN = String(process.env.WATCHPARTY_WORKER_TOKEN || "").trim();
const DISCORD_USER_TOKEN = String(
process.env.WATCHPARTY_DISCORD_USER_TOKEN ||
process.env.DISCORD_USER_TOKEN ||
process.env.TOKEN ||
""
).trim();
const STREAM_WIDTH = Number(process.env.WATCHPARTY_STREAM_WIDTH || "1280");
const STREAM_HEIGHT = Number(process.env.WATCHPARTY_STREAM_HEIGHT || "720");
const STREAM_FPS = Number(process.env.WATCHPARTY_STREAM_FPS || "30");
const STREAM_BITRATE_KBPS = Number(process.env.WATCHPARTY_STREAM_BITRATE_KBPS || "2000");
const STREAM_MAX_BITRATE_KBPS = Number(process.env.WATCHPARTY_STREAM_MAX_BITRATE_KBPS || "2500");
const STREAM_CODEC = String(process.env.WATCHPARTY_STREAM_CODEC || "H264").trim().toUpperCase();
const STREAM_PRESET = String(process.env.WATCHPARTY_STREAM_PRESET || "ultrafast").trim().toLowerCase();
const STREAM_HARDWARE_ACCELERATION = ["1", "true", "yes", "on"].includes(
String(process.env.WATCHPARTY_STREAM_HARDWARE_ACCELERATION || "").trim().toLowerCase()
);
if (!TOKEN) {
console.error("Missing WATCHPARTY_WORKER_TOKEN");
process.exit(1);
}
function loadStreamingStack() {
try {
const { Client } = require("discord.js-selfbot-v13");
const streamLib = require("@dank074/discord-video-stream");
return {
available: true,
Client,
Streamer: streamLib.Streamer,
Utils: streamLib.Utils,
prepareStream: streamLib.prepareStream,
playStream: streamLib.playStream,
error: "",
};
} catch (error) {
return {
available: false,
error: error && error.message ? String(error.message) : "Streaming dependencies are unavailable",
};
}
}
const streamingStack = loadStreamingStack();
const sessions = new Map();
const runtime = {
client: null,
streamer: null,
ready: false,
readyPromise: null,
activeSessionId: null,
};
function json(response, statusCode, payload) {
const body = Buffer.from(JSON.stringify(payload, null, 2));
response.writeHead(statusCode, {
"Content-Type": "application/json; charset=utf-8",
"Cache-Control": "no-store",
"Content-Length": String(body.length),
});
response.end(body);
}
function unauthorized(response) {
json(response, 401, { error: "Unauthorized" });
}
function notFound(response) {
json(response, 404, { error: "Not found" });
}
function badRequest(response, error) {
json(response, 400, { error: error instanceof Error ? error.message : String(error) });
}
function requireAuth(request, response) {
const auth = String(request.headers.authorization || "");
return auth === `Bearer ${TOKEN}` ? true : (unauthorized(response), false);
}
function readJson(request) {
return new Promise((resolve, reject) => {
const chunks = [];
request.on("data", (chunk) => chunks.push(chunk));
request.on("end", () => {
const body = Buffer.concat(chunks).toString("utf-8");
if (!body) {
resolve({});
return;
}
try {
const parsed = JSON.parse(body);
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
reject(new Error("JSON body must be an object"));
return;
}
resolve(parsed);
} catch (error) {
reject(new Error(`Invalid JSON: ${error.message}`));
}
});
request.on("error", reject);
});
}
function nowSeconds() {
return Math.floor(Date.now() / 1000);
}
function ensureSession(sessionId) {
const session = sessions.get(String(sessionId));
if (!session) {
throw new Error(`Unknown worker session: ${sessionId}`);
}
return session;
}
function streamOptions() {
const options = {
width: STREAM_WIDTH,
height: STREAM_HEIGHT,
frameRate: STREAM_FPS,
bitrateVideo: STREAM_BITRATE_KBPS,
bitrateVideoMax: STREAM_MAX_BITRATE_KBPS,
hardwareAcceleratedDecoding: STREAM_HARDWARE_ACCELERATION,
minimizeLatency: false,
h26xPreset: STREAM_PRESET,
};
if (streamingStack.available && streamingStack.Utils && typeof streamingStack.Utils.normalizeVideoCodec === "function") {
options.videoCodec = streamingStack.Utils.normalizeVideoCodec(STREAM_CODEC);
} else {
options.videoCodec = STREAM_CODEC;
}
return options;
}
function streamUrlAtOffset(url, startSeconds) {
const parsed = new URL(url);
parsed.searchParams.set("StartTimeTicks", String(Math.max(0, Math.floor(startSeconds)) * 10000000));
return parsed.toString();
}
function sessionPlaybackPosition(session) {
if (session.playbackState !== "playing") {
return session.positionSeconds;
}
const startedAtSeconds = Number(session.startedAtSeconds || 0);
if (!startedAtSeconds) {
return session.positionSeconds;
}
const elapsed = Math.max(0, nowSeconds() - startedAtSeconds);
const total = Math.max(0, Number(session.positionSeconds || 0) + elapsed);
if (session.durationSeconds > 0) {
return Math.min(total, session.durationSeconds);
}
return total;
}
function sessionPayload(session) {
return {
workerStatus: session.workerStatus,
playbackState: session.playbackState,
currentTitle: session.currentTitle,
positionSeconds: sessionPlaybackPosition(session),
durationSeconds: session.durationSeconds,
lastError: session.lastError,
workerSessionId: session.workerSessionId,
guildId: session.guildId,
voiceChannelId: session.voiceChannelId,
textChannelId: session.textChannelId,
queueEntryId: session.queueEntryId,
jellyfinSourceId: session.jellyfinSourceId,
mediaType: session.mediaType,
playback: session.playback,
streamingAvailable: streamingStack.available,
discordReady: runtime.ready,
};
}
async function ensureStreamingRuntime() {
if (!streamingStack.available) {
throw new Error(`Streaming runtime unavailable: ${streamingStack.error}`);
}
if (!DISCORD_USER_TOKEN) {
throw new Error("WATCHPARTY_DISCORD_USER_TOKEN is required for Discord VC streaming");
}
if (runtime.ready) {
return runtime;
}
if (runtime.readyPromise) {
await runtime.readyPromise;
return runtime;
}
runtime.readyPromise = (async () => {
const client = new streamingStack.Client();
client.on("error", (error) => {
console.error("[worker] discord client error:", error);
});
runtime.client = client;
runtime.streamer = new streamingStack.Streamer(client);
await client.login(DISCORD_USER_TOKEN);
if (client.readyAt) {
runtime.ready = true;
return;
}
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error("Discord client did not become ready in time")), 30000);
client.once("ready", () => {
clearTimeout(timeout);
runtime.ready = true;
resolve();
});
client.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
})();
try {
await runtime.readyPromise;
} catch (error) {
runtime.readyPromise = null;
runtime.ready = false;
runtime.client = null;
runtime.streamer = null;
throw error;
}
return runtime;
}
async function ensureVoiceConnection(session) {
await ensureStreamingRuntime();
if (!runtime.streamer || !runtime.client) {
throw new Error("Worker runtime is not ready");
}
if (runtime.activeSessionId && runtime.activeSessionId !== session.workerSessionId) {
await stopSessionPlayback(ensureSession(runtime.activeSessionId), false);
}
await runtime.streamer.joinVoice(session.guildId, session.voiceChannelId);
runtime.activeSessionId = session.workerSessionId;
session.workerStatus = "connected";
session.lastError = "";
if (runtime.client.user && typeof runtime.client.user.setActivity === "function") {
runtime.client.user.setActivity(`Watch Party: ${session.currentTitle || session.title || "idle"}`).catch(() => {});
}
await new Promise((resolve) => setTimeout(resolve, 2000));
}
function canPauseCommand(command) {
return Boolean(command && command.ffmpegProc && typeof command.ffmpegProc.kill === "function");
}
async function stopSessionPlayback(session, leaveVoice = true) {
session.manualStop = true;
if (session.abortController) {
try {
session.abortController.abort();
} catch (_) {}
}
if (runtime.streamer) {
try {
runtime.streamer.stopStream();
} catch (_) {}
if (leaveVoice) {
try {
runtime.streamer.leaveVoice();
} catch (_) {}
}
}
session.workerStatus = leaveVoice ? "connected" : session.workerStatus;
session.playbackState = leaveVoice ? "stopped" : "idle";
session.abortController = null;
session.ffmpegCommand = null;
session.startedAtSeconds = 0;
session.positionSeconds = 0;
if (leaveVoice && runtime.activeSessionId === session.workerSessionId) {
runtime.activeSessionId = null;
}
}
async function startPlayback(session, playback, startSeconds = 0) {
if (!playback || typeof playback !== "object") {
throw new Error("playback payload is required");
}
const input = String(playback.streamUrl || playback.downloadUrl || "").trim();
if (!input) {
throw new Error("Playback payload is missing a Jellyfin media URL");
}
await ensureVoiceConnection(session);
if (!runtime.streamer || !streamingStack.prepareStream || !streamingStack.playStream) {
throw new Error("Streaming runtime is not available");
}
if (session.abortController) {
await stopSessionPlayback(session, false);
}
const abortController = new AbortController();
session.abortController = abortController;
session.playback = playback;
session.currentTitle = String(playback.title || session.currentTitle || "").trim();
session.durationSeconds = Number(playback.durationSeconds || 0) || 0;
session.positionSeconds = Math.max(0, Number(startSeconds || 0));
session.startedAtSeconds = nowSeconds();
session.playbackState = "playing";
session.workerStatus = "streaming";
session.manualStop = false;
session.lastError = "";
const options = streamOptions();
const prepared = streamingStack.prepareStream(
session.positionSeconds > 0 ? streamUrlAtOffset(input, session.positionSeconds) : input,
options,
abortController.signal,
);
const command = prepared && prepared.command ? prepared.command : null;
const output = prepared && prepared.output ? prepared.output : null;
if (!command || !output) {
throw new Error("Failed to prepare FFmpeg stream");
}
session.ffmpegCommand = command;
command.on("error", (error) => {
if (session.manualStop || abortController.signal.aborted) {
return;
}
session.lastError = error && error.message ? String(error.message) : "FFmpeg stream failed";
session.playbackState = "error";
session.workerStatus = "error";
});
streamingStack.playStream(output, runtime.streamer, undefined, abortController.signal)
.then(() => {
if (session.manualStop || abortController.signal.aborted) {
return;
}
session.positionSeconds = session.durationSeconds > 0 ? session.durationSeconds : sessionPlaybackPosition(session);
session.startedAtSeconds = 0;
session.playbackState = "idle";
session.workerStatus = "connected";
session.ffmpegCommand = null;
session.abortController = null;
})
.catch((error) => {
if (session.manualStop || abortController.signal.aborted) {
return;
}
session.lastError = error && error.message ? String(error.message) : "Streaming failed";
session.startedAtSeconds = 0;
session.playbackState = "error";
session.workerStatus = "error";
session.ffmpegCommand = null;
session.abortController = null;
});
}
function createSessionRecord(body) {
const sessionId = String(body.sessionId || "").trim();
if (!sessionId) {
throw new Error("sessionId is required");
}
return {
workerSessionId: sessionId,
guildId: String(body.guildId || "").trim(),
voiceChannelId: String(body.voiceChannelId || "").trim(),
textChannelId: String(body.textChannelId || "").trim(),
title: String(body.title || "").trim(),
workerStatus: "idle",
playbackState: "idle",
currentTitle: "",
positionSeconds: 0,
durationSeconds: 0,
lastError: "",
queueEntryId: null,
jellyfinSourceId: "",
mediaType: "",
playback: null,
startedAtSeconds: 0,
manualStop: false,
abortController: null,
ffmpegCommand: null,
};
}
const server = http.createServer(async (request, response) => {
try {
const url = new URL(request.url, `http://127.0.0.1:${PORT}`);
if (request.method === "GET" && url.pathname === "/health") {
if (!requireAuth(request, response)) return;
json(response, 200, {
ok: true,
service: "orb-stream-worker",
streamingImplemented: streamingStack.available,
streamingError: streamingStack.available ? "" : streamingStack.error,
discordConfigured: Boolean(DISCORD_USER_TOKEN),
discordReady: runtime.ready,
sessions: sessions.size,
activeSessionId: runtime.activeSessionId,
});
return;
}
if (request.method === "POST" && url.pathname === "/sessions") {
if (!requireAuth(request, response)) return;
const body = await readJson(request);
const session = createSessionRecord(body);
sessions.set(session.workerSessionId, session);
try {
await ensureStreamingRuntime();
await ensureVoiceConnection(session);
} catch (error) {
session.workerStatus = "error";
session.lastError = error instanceof Error ? error.message : String(error);
}
json(response, 200, sessionPayload(session));
return;
}
const playMatch = request.method === "POST" && url.pathname.match(/^\/sessions\/([^/]+)\/play$/);
if (playMatch) {
if (!requireAuth(request, response)) return;
const session = ensureSession(decodeURIComponent(playMatch[1]));
const body = await readJson(request);
session.queueEntryId = Number(body.queueEntryId || 0) || null;
session.jellyfinSourceId = String(body.jellyfinSourceId || "").trim();
session.currentTitle = String(body.title || "").trim();
session.mediaType = String(body.mediaType || "").trim();
try {
await startPlayback(session, body.playback);
} catch (error) {
session.lastError = error instanceof Error ? error.message : String(error);
session.workerStatus = "error";
session.playbackState = "error";
}
json(response, 200, sessionPayload(session));
return;
}
const controlMatch = request.method === "POST" && url.pathname.match(/^\/sessions\/([^/]+)\/control$/);
if (controlMatch) {
if (!requireAuth(request, response)) return;
const session = ensureSession(decodeURIComponent(controlMatch[1]));
const body = await readJson(request);
const action = String(body.action || "").trim();
if (action === "pause") {
if (!canPauseCommand(session.ffmpegCommand)) {
throw new Error("Pause is not available because FFmpeg is not active");
}
session.positionSeconds = sessionPlaybackPosition(session);
session.startedAtSeconds = 0;
session.ffmpegCommand.ffmpegProc.kill("SIGSTOP");
session.playbackState = "paused";
session.workerStatus = "connected";
} else if (action === "resume") {
if (!canPauseCommand(session.ffmpegCommand)) {
throw new Error("Resume is not available because FFmpeg is not active");
}
session.startedAtSeconds = nowSeconds();
session.ffmpegCommand.ffmpegProc.kill("SIGCONT");
session.playbackState = "playing";
session.workerStatus = "streaming";
} else if (action === "stop" || action === "skip") {
await stopSessionPlayback(session, true);
} else if (action === "seek") {
if (!session.playback) {
throw new Error("Seek is not available because nothing has been loaded");
}
const requestedPosition = Math.max(0, Number((body.data && body.data.positionSeconds) || 0) || 0);
await stopSessionPlayback(session, false);
await startPlayback(session, session.playback, requestedPosition);
} else {
throw new Error(`Unsupported control action: ${action}`);
}
json(response, 200, sessionPayload(session));
return;
}
const sessionMatch = request.method === "GET" && url.pathname.match(/^\/sessions\/([^/]+)$/);
if (sessionMatch) {
if (!requireAuth(request, response)) return;
const session = ensureSession(decodeURIComponent(sessionMatch[1]));
json(response, 200, sessionPayload(session));
return;
}
notFound(response);
} catch (error) {
if (error instanceof Error && /Unknown worker session/.test(error.message)) {
notFound(response);
return;
}
if (error instanceof Error && /required|invalid|Unsupported|not implemented|not available|missing/i.test(error.message)) {
badRequest(response, error);
return;
}
json(response, 500, { error: error && error.message ? error.message : "Internal error" });
}
});
server.listen(PORT, "0.0.0.0", () => {
console.log(`orb-stream-worker listening on ${PORT}`);
});