diff --git a/archive_bot/discord_api.py b/archive_bot/discord_api.py index 3c66dd1..0aa16a4 100644 --- a/archive_bot/discord_api.py +++ b/archive_bot/discord_api.py @@ -13,8 +13,10 @@ from .core import BotRuntime, DISCORD_API from .storage import channel_settings, load_state, save_state from .watchparty import ( add_watch_party_queue_item, + clear_watch_party_queue, control_watch_party_worker, create_watch_party_session, + end_watch_party_session, find_watch_party_session, get_watch_party_session, list_media_candidates, @@ -35,6 +37,8 @@ WATCH_PARTY_PANEL_RESUME_ID = "watchparty_panel:resume" WATCH_PARTY_PANEL_STOP_ID = "watchparty_panel:stop" WATCH_PARTY_PANEL_QUEUE_ID = "watchparty_panel:queue" WATCH_PARTY_PANEL_STATUS_ID = "watchparty_panel:status" +WATCH_PARTY_PANEL_CLEAR_QUEUE_ID = "watchparty_panel:clear_queue" +WATCH_PARTY_PANEL_END_SESSION_ID = "watchparty_panel:end_session" def format_watch_party_summary(payload: dict[str, Any]) -> str: @@ -102,7 +106,9 @@ def watch_party_panel_payload() -> dict[str, Any]: "components": [ {"type": 2, "style": 2, "label": "Pause", "custom_id": WATCH_PARTY_PANEL_PAUSE_ID}, {"type": 2, "style": 2, "label": "Resume", "custom_id": WATCH_PARTY_PANEL_RESUME_ID}, - {"type": 2, "style": 4, "label": "Stop", "custom_id": WATCH_PARTY_PANEL_STOP_ID}, + {"type": 2, "style": 2, "label": "Stop", "custom_id": WATCH_PARTY_PANEL_STOP_ID}, + {"type": 2, "style": 2, "label": "Clear Queue", "custom_id": WATCH_PARTY_PANEL_CLEAR_QUEUE_ID}, + {"type": 2, "style": 4, "label": "End Session", "custom_id": WATCH_PARTY_PANEL_END_SESSION_ID}, ], }, ], @@ -273,7 +279,8 @@ class DiscordGatewayManager: voice_channel = ensure_voice_context(interaction) session = session_for_voice(interaction) if session is not None: - return session + if not create_if_missing or session.get("status") not in {"stopped", "error"}: + return session if not create_if_missing: raise ValueError("No watch-party session exists for your voice channel. Run /watchparty create first.") payload = create_watch_party_session( @@ -310,7 +317,7 @@ class DiscordGatewayManager: async def do_create(interaction: Any, title: str | None = None) -> None: voice_channel = ensure_voice_context(interaction) session = session_for_voice(interaction) - if session is None: + if session is None or session.get("status") in {"stopped", "error"}: payload = create_watch_party_session( runtime, guild_id=str(interaction.guild.id), @@ -370,6 +377,22 @@ class DiscordGatewayManager: payload = refresh_watch_party_worker(int(session["id"])) if worker_enabled() else get_watch_party_session(int(session["id"])) await respond(interaction, format_watch_party_summary(payload)) + async def do_clear_queue(interaction: Any) -> None: + session = ensure_session(interaction, create_if_missing=False) + payload = clear_watch_party_queue(int(session["id"])) + await respond( + interaction, + f"Cleared all queued items from `{payload['session']['title']}`. Queue size: {len(payload['queue'])}.", + ) + + async def do_end_session(interaction: Any) -> None: + session = ensure_session(interaction, create_if_missing=False) + await end_watch_party_session(int(session["id"])) + await respond( + interaction, + f"Ended the watch-party session `{session['title']}`.", + ) + class WatchPartyAddModal(discord.ui.Modal, title="Add To Watch Party"): query = discord.ui.TextInput(label="Title", placeholder="Alien", required=True, max_length=120) media_type = discord.ui.TextInput( @@ -436,13 +459,27 @@ class DiscordGatewayManager: except Exception as exc: await respond(interaction, str(exc)) - @discord.ui.button(label="Stop", style=discord.ButtonStyle.danger, custom_id=WATCH_PARTY_PANEL_STOP_ID, row=1) + @discord.ui.button(label="Stop", style=discord.ButtonStyle.secondary, custom_id=WATCH_PARTY_PANEL_STOP_ID, row=1) async def stop_button(self, interaction: Any, _button: Any) -> None: try: await defer_then(interaction, lambda: do_control(interaction, "stop", {})) except Exception as exc: await respond(interaction, str(exc)) + @discord.ui.button(label="Clear Queue", style=discord.ButtonStyle.secondary, custom_id=WATCH_PARTY_PANEL_CLEAR_QUEUE_ID, row=1) + async def clear_queue_button(self, interaction: Any, _button: Any) -> None: + try: + await defer_then(interaction, lambda: do_clear_queue(interaction)) + except Exception as exc: + await respond(interaction, str(exc)) + + @discord.ui.button(label="End Session", style=discord.ButtonStyle.danger, custom_id=WATCH_PARTY_PANEL_END_SESSION_ID, row=1) + async def end_session_button(self, interaction: Any, _button: Any) -> None: + try: + await defer_then(interaction, lambda: do_end_session(interaction)) + except Exception as exc: + await respond(interaction, str(exc)) + group = discord.app_commands.Group(name="watchparty", description="Control Jellyfin watch parties in Discord") @group.command(name="create", description="Create a watch-party session for your current voice channel") @@ -522,6 +559,20 @@ class DiscordGatewayManager: except Exception as exc: await respond(interaction, str(exc)) + @group.command(name="clear", description="Clear the watch-party queue") + async def clear_command(interaction: Any) -> None: + try: + await do_clear_queue(interaction) + except Exception as exc: + await respond(interaction, str(exc)) + + @group.command(name="end", description="End the current watch-party session") + async def end_command(interaction: Any) -> None: + try: + await do_end_session(interaction) + except Exception as exc: + await respond(interaction, str(exc)) + tree.add_command(group) self.client.add_view(WatchPartyPanelView()) self._commands_registered = True diff --git a/archive_bot/watchparty.py b/archive_bot/watchparty.py index 7f0d2c3..1a81c8d 100644 --- a/archive_bot/watchparty.py +++ b/archive_bot/watchparty.py @@ -656,3 +656,50 @@ def refresh_watch_party_worker(session_id: int) -> dict[str, Any]: last_error=str(worker.get("lastError", "")), ) return {"worker": worker, **result} + + +def clear_watch_party_queue(session_id: int) -> dict[str, Any]: + initialize_watchparty_schema() + with connect_db() as connection: + if connection.execute("SELECT id FROM watch_party_sessions WHERE id = ?", (session_id,)).fetchone() is None: + raise ValueError(f"Watch party session not found: {session_id}") + connection.execute( + "DELETE FROM watch_party_queue WHERE session_id = ? AND status = 'queued'", + (session_id,), + ) + session_row = connection.execute( + "SELECT status FROM watch_party_sessions WHERE id = ?", (session_id,) + ).fetchone() + current_status = session_row["status"] if session_row else "" + if current_status == "queued": + connection.execute( + "UPDATE watch_party_sessions SET status = 'draft', updated_at = ? WHERE id = ?", + (utc_now(), session_id), + ) + log_watch_party_event(connection, session_id, "queue.cleared", {}) + connection.commit() + return get_watch_party_session(session_id) + + +def end_watch_party_session(session_id: int) -> dict[str, Any]: + from .worker_client import worker_control, worker_enabled + + initialize_watchparty_schema() + if worker_enabled(): + try: + worker_control(session_id=session_id, action="stop") + except Exception as exc: + print(f"Failed to notify worker of stop during session end: {exc}", flush=True) + + update_watch_party_status(session_id, "stopped") + result = update_worker_state( + session_id, + worker_status="idle", + playback_state="idle", + current_title="", + position_seconds=0, + duration_seconds=0, + last_error="", + ) + return result + diff --git a/orb_stream_worker/server.js b/orb_stream_worker/server.js index 750e6d1..e4d9d7a 100644 --- a/orb_stream_worker/server.js +++ b/orb_stream_worker/server.js @@ -263,7 +263,20 @@ async function ensureVoiceConnection(session) { await stopSessionPlayback(ensureSession(runtime.activeSessionId), false); } - await runtime.streamer.joinVoice(session.guildId, session.voiceChannelId); + const voiceConnection = runtime.streamer.voiceConnection; + const isAlreadyConnected = voiceConnection && + voiceConnection.status.started && + voiceConnection.guildId === session.guildId && + voiceConnection.channelId === session.voiceChannelId; + + if (!isAlreadyConnected) { + console.log(`[worker] Joining voice channel ${session.voiceChannelId} in guild ${session.guildId}`); + await runtime.streamer.joinVoice(session.guildId, session.voiceChannelId); + await new Promise((resolve) => setTimeout(resolve, 2000)); + } else { + console.log(`[worker] Already connected to voice channel ${session.voiceChannelId} in guild ${session.guildId}`); + } + runtime.activeSessionId = session.workerSessionId; session.workerStatus = "connected"; session.lastError = ""; @@ -271,8 +284,6 @@ async function ensureVoiceConnection(session) { if (runtime.client.user && typeof runtime.client.user.setActivity === "function") { runtime.client.user.setActivity(`Watch Party: ${session.currentTitle || session.title || "idle"}`).catch(() => {}); } - - await new Promise((resolve) => setTimeout(resolve, 2000)); } function canPauseCommand(command) {