const http = require("http"); const { URL } = require("url"); const WebSocket = require("ws"); if (typeof globalThis.WebSocket === "undefined") { globalThis.WebSocket = WebSocket; } if (typeof Promise.withResolvers === "undefined") { Promise.withResolvers = function () { let resolve, reject; const promise = new Promise((res, rej) => { resolve = res; reject = rej; }); return { promise, resolve, reject }; }; } try { require("dotenv").config(); } catch (_) { // Optional in local dev when dependencies are not installed yet. } try { const fs = require("fs"); const path = require("path"); const targetPath = path.resolve(__dirname, "node_modules/@dank074/discord-video-stream/dist/media/LibavDemuxer.js"); if (fs.existsSync(targetPath)) { let content = fs.readFileSync(targetPath, "utf8"); if (!content.includes("DEMUX ERROR STACK")) { content = content.replace( `loggerFrameCommon.info({ error: e }, "Received an error during frame extraction. Stopping");`, `console.error("[DEMUX ERROR STACK]:", e && e.stack ? e.stack : e); loggerFrameCommon.info({ error: e }, "Received an error during frame extraction. Stopping");` ); fs.writeFileSync(targetPath, content, "utf8"); console.log("[worker] Patched LibavDemuxer.js successfully to print demux error stack."); } } } catch (err) { console.error("[worker] Failed to patch LibavDemuxer.js:", err); } const PORT = Number(process.env.PORT || "8790"); const TOKEN = String(process.env.WATCHPARTY_WORKER_TOKEN || "").trim(); const DISCORD_USER_TOKEN = String( process.env.WATCHPARTY_DISCORD_USER_TOKEN || process.env.DISCORD_USER_TOKEN || process.env.TOKEN || "" ).trim(); const STREAM_WIDTH = Number(process.env.WATCHPARTY_STREAM_WIDTH || "1280"); const STREAM_HEIGHT = Number(process.env.WATCHPARTY_STREAM_HEIGHT || "720"); const STREAM_FPS = Number(process.env.WATCHPARTY_STREAM_FPS || "30"); const STREAM_BITRATE_KBPS = Number(process.env.WATCHPARTY_STREAM_BITRATE_KBPS || "2000"); const STREAM_MAX_BITRATE_KBPS = Number(process.env.WATCHPARTY_STREAM_MAX_BITRATE_KBPS || "2500"); const STREAM_CODEC = String(process.env.WATCHPARTY_STREAM_CODEC || "H264").trim().toUpperCase(); const STREAM_PRESET = String(process.env.WATCHPARTY_STREAM_PRESET || "ultrafast").trim().toLowerCase(); const STREAM_TYPE = String(process.env.WATCHPARTY_STREAM_TYPE || "camera").trim().toLowerCase(); const STREAM_HARDWARE_ACCELERATION = ["1", "true", "yes", "on"].includes( String(process.env.WATCHPARTY_STREAM_HARDWARE_ACCELERATION || "").trim().toLowerCase() ); if (!TOKEN) { console.error("Missing WATCHPARTY_WORKER_TOKEN"); process.exit(1); } function loadStreamingStack() { try { const { Client } = require("discord.js-selfbot-v13"); const streamLib = require("@dank074/discord-video-stream"); return { available: true, Client, Streamer: streamLib.Streamer, Utils: streamLib.Utils, prepareStream: streamLib.prepareStream, playStream: streamLib.playStream, error: "", }; } catch (error) { console.error("[worker] Failed to load streaming stack:", error); return { available: false, error: error && error.stack ? String(error.stack) : (error && error.message ? String(error.message) : "Streaming dependencies are unavailable"), }; } } const streamingStack = loadStreamingStack(); console.log(`[worker] Streaming stack availability: ${streamingStack.available}`); if (!streamingStack.available) { console.error(`[worker] Streaming stack load error: ${streamingStack.error}`); } const sessions = new Map(); const runtime = { client: null, streamer: null, ready: false, readyPromise: null, activeSessionId: null, }; function json(response, statusCode, payload) { const body = Buffer.from(JSON.stringify(payload, null, 2)); response.writeHead(statusCode, { "Content-Type": "application/json; charset=utf-8", "Cache-Control": "no-store", "Content-Length": String(body.length), }); response.end(body); } function unauthorized(response) { json(response, 401, { error: "Unauthorized" }); } function notFound(response) { json(response, 404, { error: "Not found" }); } function badRequest(response, error) { json(response, 400, { error: error instanceof Error ? error.message : String(error) }); } function requireAuth(request, response) { const auth = String(request.headers.authorization || ""); return auth === `Bearer ${TOKEN}` ? true : (unauthorized(response), false); } function readJson(request) { return new Promise((resolve, reject) => { const chunks = []; request.on("data", (chunk) => chunks.push(chunk)); request.on("end", () => { const body = Buffer.concat(chunks).toString("utf-8"); if (!body) { resolve({}); return; } try { const parsed = JSON.parse(body); if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { reject(new Error("JSON body must be an object")); return; } resolve(parsed); } catch (error) { reject(new Error(`Invalid JSON: ${error.message}`)); } }); request.on("error", reject); }); } function nowSeconds() { return Math.floor(Date.now() / 1000); } function ensureSession(sessionId) { const session = sessions.get(String(sessionId)); if (!session) { throw new Error(`Unknown worker session: ${sessionId}`); } return session; } function streamOptions() { const options = { width: STREAM_WIDTH, height: STREAM_HEIGHT, frameRate: STREAM_FPS, bitrateVideo: STREAM_BITRATE_KBPS, bitrateVideoMax: STREAM_MAX_BITRATE_KBPS, hardwareAcceleratedDecoding: STREAM_HARDWARE_ACCELERATION, minimizeLatency: false, h26xPreset: STREAM_PRESET, }; if (streamingStack.available && streamingStack.Utils && typeof streamingStack.Utils.normalizeVideoCodec === "function") { options.videoCodec = streamingStack.Utils.normalizeVideoCodec(STREAM_CODEC); } else { options.videoCodec = STREAM_CODEC; } return options; } function playOptions() { return STREAM_TYPE === "go-live" ? { type: "go-live" } : { type: "camera" }; } function sessionPlaybackPosition(session) { if (session.playbackState !== "playing") { return session.positionSeconds; } const startedAtSeconds = Number(session.startedAtSeconds || 0); if (!startedAtSeconds) { return session.positionSeconds; } const elapsed = Math.max(0, nowSeconds() - startedAtSeconds); const total = Math.max(0, Number(session.positionSeconds || 0) + elapsed); if (session.durationSeconds > 0) { return Math.min(total, session.durationSeconds); } return total; } function sessionPayload(session) { return { workerStatus: session.workerStatus, playbackState: session.playbackState, currentTitle: session.currentTitle, positionSeconds: sessionPlaybackPosition(session), durationSeconds: session.durationSeconds, lastError: session.lastError, workerSessionId: session.workerSessionId, guildId: session.guildId, voiceChannelId: session.voiceChannelId, textChannelId: session.textChannelId, queueEntryId: session.queueEntryId, jellyfinSourceId: session.jellyfinSourceId, mediaType: session.mediaType, playback: session.playback, streamingAvailable: streamingStack.available, discordReady: runtime.ready, }; } async function ensureStreamingRuntime() { if (!streamingStack.available) { throw new Error(`Streaming runtime unavailable: ${streamingStack.error}`); } if (!DISCORD_USER_TOKEN) { throw new Error("WATCHPARTY_DISCORD_USER_TOKEN is required for Discord VC streaming"); } if (runtime.ready) { return runtime; } if (runtime.readyPromise) { await runtime.readyPromise; return runtime; } runtime.readyPromise = (async () => { const client = new streamingStack.Client(); client.on("error", (error) => { console.error("[worker] discord client error:", error); }); runtime.client = client; runtime.streamer = new streamingStack.Streamer(client); await client.login(DISCORD_USER_TOKEN); if (client.readyAt) { runtime.ready = true; return; } await new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error("Discord client did not become ready in time")), 30000); client.once("ready", () => { clearTimeout(timeout); runtime.ready = true; resolve(); }); client.once("error", (error) => { clearTimeout(timeout); reject(error); }); }); })(); try { await runtime.readyPromise; } catch (error) { runtime.readyPromise = null; runtime.ready = false; runtime.client = null; runtime.streamer = null; throw error; } return runtime; } async function ensureVoiceConnection(session) { await ensureStreamingRuntime(); if (!runtime.streamer || !runtime.client) { throw new Error("Worker runtime is not ready"); } if (runtime.activeSessionId && runtime.activeSessionId !== session.workerSessionId) { await stopSessionPlayback(ensureSession(runtime.activeSessionId), false); } const voiceConnection = runtime.streamer.voiceConnection; const isAlreadyConnected = voiceConnection && voiceConnection.status.started && voiceConnection.guildId === session.guildId && voiceConnection.channelId === session.voiceChannelId; if (!isAlreadyConnected) { console.log(`[worker] Joining voice channel ${session.voiceChannelId} in guild ${session.guildId}`); await runtime.streamer.joinVoice(session.guildId, session.voiceChannelId); await new Promise((resolve) => setTimeout(resolve, 2000)); } else { console.log(`[worker] Already connected to voice channel ${session.voiceChannelId} in guild ${session.guildId}`); } runtime.activeSessionId = session.workerSessionId; session.workerStatus = "connected"; session.lastError = ""; if (runtime.client.user && typeof runtime.client.user.setActivity === "function") { try { runtime.client.user.setActivity(`Watch Party: ${session.currentTitle || session.title || "idle"}`); } catch (_) {} } } function canPauseCommand(command) { return Boolean(command && command.ffmpegProc && typeof command.ffmpegProc.kill === "function"); } async function stopSessionPlayback(session, leaveVoice = true) { session.manualStop = true; if (session.abortController) { try { session.abortController.abort(); } catch (_) {} } if (runtime.streamer) { try { runtime.streamer.stopStream(); } catch (_) {} if (leaveVoice) { try { runtime.streamer.leaveVoice(); } catch (_) {} } } session.workerStatus = leaveVoice ? "connected" : session.workerStatus; session.playbackState = leaveVoice ? "stopped" : "idle"; session.abortController = null; session.ffmpegCommand = null; session.startedAtSeconds = 0; session.positionSeconds = 0; if (leaveVoice && runtime.activeSessionId === session.workerSessionId) { runtime.activeSessionId = null; } } async function startPlayback(session, playback, startSeconds = 0) { if (!playback || typeof playback !== "object") { throw new Error("playback payload is required"); } const input = String(playback.streamUrl || playback.downloadUrl || "").trim(); if (!input) { throw new Error("Playback payload is missing a Jellyfin media URL"); } await ensureVoiceConnection(session); if (!runtime.streamer || !streamingStack.prepareStream || !streamingStack.playStream) { throw new Error("Streaming runtime is not available"); } if (session.abortController) { await stopSessionPlayback(session, false); } const abortController = new AbortController(); session.abortController = abortController; session.playback = playback; session.currentTitle = String(playback.title || session.currentTitle || "").trim(); session.durationSeconds = Number(playback.durationSeconds || 0) || 0; session.positionSeconds = Math.max(0, Number(startSeconds || 0)); session.startedAtSeconds = nowSeconds(); session.playbackState = "playing"; session.workerStatus = "streaming"; session.manualStop = false; session.lastError = ""; const playMode = playOptions(); console.log( "[worker] starting playback", JSON.stringify( { sessionId: session.workerSessionId, title: session.currentTitle, mode: playMode.type, url: input, }, null, 2, ), ); const options = streamOptions(); if (session.positionSeconds > 0) { options.customInputOptions = ["-ss", String(session.positionSeconds)]; } console.log(`[worker] Calling prepareStream with options:`, JSON.stringify(options, null, 2)); const prepared = streamingStack.prepareStream( input, options, abortController.signal, ); const command = prepared && prepared.command ? prepared.command : null; const output = prepared && prepared.output ? prepared.output : null; if (!command || !output) { throw new Error("Failed to prepare FFmpeg stream"); } session.ffmpegCommand = command; command.on("start", (commandLine) => { console.log(`[worker] FFmpeg command line: ${commandLine}`); }); command.on("codecData", (data) => { console.log(`[worker] FFmpeg codec data:`, JSON.stringify(data, null, 2)); }); command.on("progress", (progress) => { console.log(`[worker] FFmpeg progress: frame=${progress.frames} fps=${progress.currentFps} kbps=${progress.currentKbps} time=${progress.timemark}`); }); command.on("stderr", (line) => { console.log(`[worker] FFmpeg stderr: ${line}`); }); command.on("error", (error) => { if (session.manualStop || abortController.signal.aborted) { console.log("[worker] FFmpeg command ended/stopped manually."); return; } console.error("[worker] FFmpeg command error:", error); session.lastError = error && error.message ? String(error.message) : "FFmpeg stream failed"; session.playbackState = "error"; session.workerStatus = "error"; }); console.log("[worker] Initializing playStream..."); streamingStack.playStream(output, runtime.streamer, playMode, abortController.signal) .then(() => { if (session.manualStop || abortController.signal.aborted) { console.log("[worker] playStream finished (manually stopped/aborted)."); return; } console.log("[worker] playStream completed successfully."); session.positionSeconds = session.durationSeconds > 0 ? session.durationSeconds : sessionPlaybackPosition(session); session.startedAtSeconds = 0; session.playbackState = "idle"; session.workerStatus = "connected"; session.ffmpegCommand = null; session.abortController = null; }) .catch((error) => { if (session.manualStop || abortController.signal.aborted) { console.log("[worker] playStream rejected (manually stopped/aborted)."); return; } console.error("[worker] playStream failed with error:", error); session.lastError = error && error.message ? String(error.message) : "Streaming failed"; session.startedAtSeconds = 0; session.playbackState = "error"; session.workerStatus = "error"; session.ffmpegCommand = null; session.abortController = null; }); } function createSessionRecord(body) { const sessionId = String(body.sessionId || "").trim(); if (!sessionId) { throw new Error("sessionId is required"); } return { workerSessionId: sessionId, guildId: String(body.guildId || "").trim(), voiceChannelId: String(body.voiceChannelId || "").trim(), textChannelId: String(body.textChannelId || "").trim(), title: String(body.title || "").trim(), workerStatus: "idle", playbackState: "idle", currentTitle: "", positionSeconds: 0, durationSeconds: 0, lastError: "", queueEntryId: null, jellyfinSourceId: "", mediaType: "", playback: null, startedAtSeconds: 0, manualStop: false, abortController: null, ffmpegCommand: null, }; } const server = http.createServer(async (request, response) => { try { const url = new URL(request.url, `http://127.0.0.1:${PORT}`); if (request.method === "GET" && url.pathname === "/health") { if (!requireAuth(request, response)) return; json(response, 200, { ok: true, service: "orb-stream-worker", streamingImplemented: streamingStack.available, streamingError: streamingStack.available ? "" : streamingStack.error, discordConfigured: Boolean(DISCORD_USER_TOKEN), discordReady: runtime.ready, discordUserId: runtime.client && runtime.client.user ? runtime.client.user.id : null, sessions: sessions.size, activeSessionId: runtime.activeSessionId, }); return; } if (request.method === "POST" && url.pathname === "/sessions") { if (!requireAuth(request, response)) return; const body = await readJson(request); const session = createSessionRecord(body); sessions.set(session.workerSessionId, session); try { await ensureStreamingRuntime(); await ensureVoiceConnection(session); } catch (error) { console.error("[worker] Error in session setup:", error); session.workerStatus = "error"; session.lastError = error instanceof Error ? error.message : String(error); } json(response, 200, sessionPayload(session)); return; } const playMatch = request.method === "POST" && url.pathname.match(/^\/sessions\/([^/]+)\/play$/); if (playMatch) { if (!requireAuth(request, response)) return; const session = ensureSession(decodeURIComponent(playMatch[1])); const body = await readJson(request); session.queueEntryId = Number(body.queueEntryId || 0) || null; session.jellyfinSourceId = String(body.jellyfinSourceId || "").trim(); session.currentTitle = String(body.title || "").trim(); session.mediaType = String(body.mediaType || "").trim(); try { const startPosition = Math.max(0, Number(body.positionSeconds || 0) || 0); await startPlayback(session, body.playback, startPosition); } catch (error) { console.error("[worker] Error in play setup:", error); session.lastError = error instanceof Error ? error.message : String(error); session.workerStatus = "error"; session.playbackState = "error"; } json(response, 200, sessionPayload(session)); return; } const controlMatch = request.method === "POST" && url.pathname.match(/^\/sessions\/([^/]+)\/control$/); if (controlMatch) { if (!requireAuth(request, response)) return; const session = ensureSession(decodeURIComponent(controlMatch[1])); const body = await readJson(request); const action = String(body.action || "").trim(); if (action === "pause") { if (session.playbackState !== "playing") { throw new Error("Pause is not available because session is not playing"); } const pausedPos = sessionPlaybackPosition(session); await stopSessionPlayback(session, false); session.positionSeconds = pausedPos; session.playbackState = "paused"; session.workerStatus = "connected"; } else if (action === "resume") { if (session.playbackState !== "paused") { throw new Error("Resume is not available because session is not paused"); } if (!session.playback) { throw new Error("Cannot resume because no playback metadata exists"); } await startPlayback(session, session.playback, session.positionSeconds); } else if (action === "stop" || action === "skip") { await stopSessionPlayback(session, true); } else if (action === "seek") { if (!session.playback) { throw new Error("Seek is not available because nothing has been loaded"); } const requestedPosition = Math.max(0, Number((body.data && body.data.positionSeconds) || 0) || 0); await stopSessionPlayback(session, false); await startPlayback(session, session.playback, requestedPosition); } else { throw new Error(`Unsupported control action: ${action}`); } json(response, 200, sessionPayload(session)); return; } const sessionMatch = request.method === "GET" && url.pathname.match(/^\/sessions\/([^/]+)$/); if (sessionMatch) { if (!requireAuth(request, response)) return; const session = ensureSession(decodeURIComponent(sessionMatch[1])); json(response, 200, sessionPayload(session)); return; } notFound(response); } catch (error) { if (error instanceof Error && /Unknown worker session/.test(error.message)) { notFound(response); return; } if (error instanceof Error && /required|invalid|Unsupported|not implemented|not available|missing/i.test(error.message)) { badRequest(response, error); return; } json(response, 500, { error: error && error.message ? error.message : "Internal error" }); } }); server.listen(PORT, "0.0.0.0", () => { console.log(`orb-stream-worker listening on ${PORT}`); });