from __future__ import annotations import hmac import json import os import time import urllib.parse from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from .auth import DashboardAuth, DashboardSession, dashboard_auth_from_env from .core import BotRuntime, MAX_REQUEST_BYTES, SESSION_COOKIE, bool_env from .media import ( available_media_genres, catalog_item_for_source_id, catalog_known_source_ids, fetch_jellyfin_image, media_catalog_status, load_media_library, media_items_from_data, publish_media_items, recommend_media_items, resolve_jellyfin_playback_source, reset_media_library, save_media_library, sync_jellyfin_library, update_jellyfin_sync_state, ) from .status import result_to_jsonable, run_check_cycle, runtime_status, save_services_config, services_from_data from .storage import ( channel_settings, jellyfin_settings, save_catalog_url_setting, save_channel_settings, save_jellyfin_settings, ) from .watchparty import ( add_watch_party_queue_item, create_watch_party_session, first_queued_entry, get_watch_party_session, list_media_candidates, list_watch_party_sessions, update_queue_entry_status, update_watch_party_status, update_worker_state, worker_command_payload, ) from .worker_client import ( worker_control, worker_create_session, worker_enabled, worker_health, worker_play, worker_status, ) def make_dashboard_handler(runtime: BotRuntime, auth: DashboardAuth | None) -> type[BaseHTTPRequestHandler]: dashboard_path = Path(__file__).resolve().parent.parent / "dashboard.html" class DashboardHandler(BaseHTTPRequestHandler): server_version = "ArchiveStatusDashboard/1.0" def log_message(self, format: str, *args: Any) -> None: print(f"[dashboard] {self.address_string()} - {format % args}", flush=True) def do_GET(self) -> None: parsed_request = urllib.parse.urlparse(self.path) path = parsed_request.path query = urllib.parse.parse_qs(parsed_request.query) if path in {"/", "/dashboard"}: self.send_dashboard() return if path == "/catalog": from .media import render_catalog_html self.send_catalog(render_catalog_html(runtime)) return if path.startswith("/catalog/poster/"): item_id = urllib.parse.unquote(path.removeprefix("/catalog/poster/")).strip() self.send_catalog_poster(item_id) return if path == "/favicon.ico": self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() return if path == "/api/session": session = self.require_auth() if session is None: return _session_id, data = session self.send_json( HTTPStatus.OK, { "username": data.username, "csrfToken": data.csrf_token, }, ) return if path == "/api/status": if self.require_auth() is None: return self.send_json(HTTPStatus.OK, runtime_status(runtime)) return if path == "/api/media": if self.require_auth() is None: return self.send_json(HTTPStatus.OK, media_catalog_status(runtime)) return if path == "/api/media/recommendations": if self.require_auth() is None: return movies, shows = load_media_library(runtime) media_type = str(query.get("type", ["all"])[0] or "all") genre = str(query.get("genre", ["all"])[0] or "all") mode = str(query.get("mode", ["balanced"])[0] or "balanced") search = str(query.get("search", [""])[0] or "").strip() try: limit = int(str(query.get("limit", ["6"])[0] or "6")) except ValueError: limit = 6 self.send_json( HTTPStatus.OK, { "genres": available_media_genres(movies, shows), **recommend_media_items( movies, shows, media_type=media_type, genre=genre, mode=mode, search=search, limit=limit, ), }, ) return if path == "/api/settings": if self.require_auth() is None: return self.send_json(HTTPStatus.OK, {"channels": channel_settings(runtime)}) return if path == "/api/watchparty/sessions": if self.require_auth() is None: return self.send_json(HTTPStatus.OK, {"sessions": list_watch_party_sessions()}) return if path.startswith("/api/watchparty/sessions/"): if self.require_auth() is None: return suffix = path.removeprefix("/api/watchparty/sessions/") if suffix.isdigit(): self.send_json(HTTPStatus.OK, get_watch_party_session(int(suffix))) return if path == "/api/watchparty/media": if self.require_auth() is None: return media_type = str(query.get("type", ["all"])[0] or "all") search = str(query.get("search", [""])[0] or "").strip() try: limit = int(str(query.get("limit", ["25"])[0] or "25")) except ValueError: limit = 25 self.send_json( HTTPStatus.OK, {"items": list_media_candidates(runtime, media_type=media_type, search=search, limit=limit)}, ) return if path == "/api/jellyfin": if self.require_auth() is None: return self.send_json(HTTPStatus.OK, {"jellyfin": jellyfin_settings(runtime)}) return if path == "/api/watchparty/worker-health": if self.require_auth() is None: return if not worker_enabled(): self.send_json(HTTPStatus.OK, {"configured": False}) return self.send_json(HTTPStatus.OK, {"configured": True, "worker": worker_health()}) return self.send_error(HTTPStatus.NOT_FOUND) def do_POST(self) -> None: path = urllib.parse.urlparse(self.path).path if path == "/api/login": self.handle_login() return session = self.require_auth(require_csrf=True) if session is None: return if path == "/api/logout": self.handle_logout(session[0]) return if path == "/api/check": self.handle_check() return if path == "/api/services": self.handle_services() return if path == "/api/media": self.handle_media_catalog() return if path == "/api/media/library": self.handle_media_library() return if path == "/api/media/reset": self.handle_media_reset() return if path == "/api/settings": self.handle_settings() return if path == "/api/jellyfin/settings": self.handle_jellyfin_settings() return if path == "/api/jellyfin/sync": self.handle_jellyfin_sync() return if path == "/api/watchparty/sessions": self.handle_watchparty_sessions() return if path == "/api/watchparty/queue": self.handle_watchparty_queue() return if path == "/api/watchparty/session-status": self.handle_watchparty_session_status() return if path == "/api/watchparty/queue-status": self.handle_watchparty_queue_status() return if path == "/api/watchparty/worker-state": self.handle_watchparty_worker_state() return if path == "/api/watchparty/worker-command": self.handle_watchparty_worker_command() return if path == "/api/watchparty/worker-start": self.handle_watchparty_worker_start() return if path == "/api/watchparty/worker-play-next": self.handle_watchparty_worker_play_next() return if path == "/api/watchparty/worker-control": self.handle_watchparty_worker_control() return if path == "/api/watchparty/worker-status": self.handle_watchparty_worker_status() return self.send_error(HTTPStatus.NOT_FOUND) def require_auth(self, require_csrf: bool = False) -> tuple[str, DashboardSession] | None: if auth is None: return "disabled", DashboardSession("local", "disabled", time.time() + 3600) session = auth.session_from_cookie(self.headers.get("Cookie")) if session is None: self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Login required"}) return None if require_csrf: csrf = self.headers.get("X-CSRF-Token", "") if not hmac.compare_digest(csrf, session[1].csrf_token): self.send_json(HTTPStatus.FORBIDDEN, {"error": "CSRF token mismatch"}) return None return session def cookie_attributes(self, max_age: int) -> str: attrs = [ "Path=/", "HttpOnly", "SameSite=Strict", f"Max-Age={max_age}", ] if auth is not None and auth.config.cookie_secure: attrs.append("Secure") return "; ".join(attrs) def set_session_cookie(self, session_id: str) -> None: ttl = auth.config.session_ttl_seconds if auth is not None else 3600 self.send_header( "Set-Cookie", f"{SESSION_COOKIE}={session_id}; {self.cookie_attributes(ttl)}", ) def clear_session_cookie(self) -> None: self.send_header( "Set-Cookie", f"{SESSION_COOKIE}=; {self.cookie_attributes(0)}", ) def send_dashboard(self) -> None: try: body = dashboard_path.read_bytes() except FileNotFoundError: self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "dashboard.html missing") return self.send_response(HTTPStatus.OK) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def send_catalog(self, body: bytes) -> None: self.send_response(HTTPStatus.OK) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def send_catalog_poster(self, item_id: str) -> None: if item_id not in catalog_known_source_ids(runtime): self.send_error(HTTPStatus.NOT_FOUND) return try: item = catalog_item_for_source_id(runtime, item_id) image = fetch_jellyfin_image(runtime, item_id) except Exception as exc: self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)}) return if image is None and item is not None: from .media import tmdb_fallback_poster_url fallback_url = tmdb_fallback_poster_url(item) if fallback_url: self.send_response(HTTPStatus.FOUND) self.send_header("Location", fallback_url) self.send_header("Cache-Control", "public, max-age=3600") self.end_headers() return if image is None: self.send_error(HTTPStatus.NOT_FOUND) return body, content_type = image self.send_response(HTTPStatus.OK) self.send_header("Content-Type", content_type) self.send_header("Cache-Control", "public, max-age=3600") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def read_json(self) -> dict[str, Any]: raw_length = self.headers.get("Content-Length", "0") try: length = int(raw_length) except ValueError as exc: raise ValueError("Invalid Content-Length") from exc if length > MAX_REQUEST_BYTES: raise ValueError("Request body is too large") body = self.rfile.read(length) try: data = json.loads(body.decode("utf-8")) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON: {exc}") from exc if not isinstance(data, dict): raise ValueError("JSON body must be an object") return data def send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None: body = json.dumps(payload, indent=2).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def handle_login(self) -> None: if auth is None: self.send_json(HTTPStatus.OK, {"username": "local", "csrfToken": "disabled"}) return try: data = self.read_json() except ValueError as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return username = str(data.get("username", "")) password = str(data.get("password", "")) throttle_key = f"{self.client_address[0]}:{username}" if not auth.login_allowed(throttle_key): self.send_json(HTTPStatus.TOO_MANY_REQUESTS, {"error": "Too many login attempts. Try again later."}) return login = auth.login(username, password) if login is None: auth.record_failed_login(throttle_key) self.send_json(HTTPStatus.UNAUTHORIZED, {"error": "Invalid username or password"}) return auth.clear_failed_login(throttle_key) session_id, session = login payload = { "username": session.username, "csrfToken": session.csrf_token, } body = json.dumps(payload, indent=2).encode("utf-8") self.send_response(HTTPStatus.OK) self.set_session_cookie(session_id) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def handle_logout(self, session_id: str) -> None: if auth is not None: auth.logout(session_id) body = b'{\n "ok": true\n}' self.send_response(HTTPStatus.OK) self.clear_session_cookie() self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Cache-Control", "no-store") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def handle_check(self) -> None: try: message_id, results = run_check_cycle(runtime) except Exception as exc: with runtime.lock: runtime.last_error = str(exc) self.send_json(HTTPStatus.BAD_GATEWAY, {"error": str(exc)}) return self.send_json( HTTPStatus.OK, { "messageId": message_id, "results": [result_to_jsonable(result) for result in results], }, ) def handle_services(self) -> None: try: data = self.read_json() services_from_data(data) save_services_config(runtime.config_path, data) message_id, results = run_check_cycle(runtime) except Exception as exc: with runtime.lock: runtime.last_error = str(exc) self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json( HTTPStatus.OK, { "messageId": message_id, "services": data.get("services", []), "results": [result_to_jsonable(result) for result in results], }, ) def handle_media_catalog(self) -> None: try: data = self.read_json() if "movies" in data or "shows" in data: if "catalogUrl" in data: save_catalog_url_setting(runtime, str(data.get("catalogUrl", ""))) movies = media_items_from_data(data.get("movies", []), "movie") shows = media_items_from_data(data.get("shows", []), "show") save_media_library(runtime, movies, shows) result = publish_media_items( runtime=runtime, channel_id=str(data.get("channelId", "")), movies_all=movies, shows_all=shows, ) self.send_json(HTTPStatus.OK, result) return raise ValueError("Publish requires the saved Jellyfin media library") except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return def handle_media_library(self) -> None: try: data = self.read_json() movies = media_items_from_data(data.get("movies", []), "movie") shows = media_items_from_data(data.get("shows", []), "show") saved = save_media_library(runtime, movies, shows) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json( HTTPStatus.OK, { "library": { "movies": saved.get("movies", []), "shows": saved.get("shows", []), }, "movieCount": len(movies), "showCount": len(shows), }, ) def handle_media_reset(self) -> None: try: result = reset_media_library(runtime) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_settings(self) -> None: try: data = self.read_json() channels = data.get("channels", data) if not isinstance(channels, dict): raise ValueError("Settings payload must include a channels object") result = save_channel_settings( runtime, str(channels.get("statusChannelId", "")), str(channels.get("mediaChannelId", "")), str(channels.get("catalogUrl", "")), ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, {"channels": result}) def handle_jellyfin_settings(self) -> None: try: data = self.read_json() result = save_jellyfin_settings(runtime, data) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, {"jellyfin": result}) def handle_jellyfin_sync(self) -> None: try: data = self.read_json() if data: save_jellyfin_settings(runtime, data) result = sync_jellyfin_library(runtime, force_publish=bool(data.get("forcePublish", False))) except Exception as exc: update_jellyfin_sync_state(runtime, error=str(exc)[:240]) self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc), "jellyfin": jellyfin_settings(runtime)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_sessions(self) -> None: try: data = self.read_json() result = create_watch_party_session( runtime, guild_id=str(data.get("guildId", "")).strip(), voice_channel_id=str(data.get("voiceChannelId", "")).strip(), text_channel_id=str(data.get("textChannelId", "")).strip(), owner_user_id=str(data.get("ownerUserId", "")).strip(), title=str(data.get("title", "Watch Party")).strip() or "Watch Party", ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_queue(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) source_id = str(data.get("jellyfinSourceId", "")).strip() result = add_watch_party_queue_item(runtime, session_id, source_id) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_session_status(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) status = str(data.get("status", "")).strip() worker_session_id = data.get("workerSessionId") current_queue_entry_id = data.get("currentQueueEntryId") result = update_watch_party_status( session_id, status, worker_session_id=str(worker_session_id).strip() if worker_session_id is not None else None, current_queue_entry_id=int(current_queue_entry_id) if current_queue_entry_id not in {None, ""} else None, ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_queue_status(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) queue_entry_id = int(str(data.get("queueEntryId", "")).strip()) status = str(data.get("status", "")).strip() result = update_queue_entry_status(session_id, queue_entry_id, status) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_worker_state(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) result = update_worker_state( session_id, worker_status=str(data.get("workerStatus", "idle")).strip() or "idle", playback_state=str(data.get("playbackState", "idle")).strip() or "idle", current_title=str(data.get("currentTitle", "")).strip(), position_seconds=int(data.get("positionSeconds", 0) or 0), duration_seconds=int(data.get("durationSeconds", 0) or 0), last_error=str(data.get("lastError", "")).strip(), ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_worker_command(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) action = str(data.get("action", "")).strip() command_data = data.get("data", {}) if not isinstance(command_data, dict): raise ValueError("Worker command data must be an object") result = worker_command_payload(session_id, action, command_data) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, result) def handle_watchparty_worker_start(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) session_payload = get_watch_party_session(session_id) session = session_payload["session"] worker = worker_create_session( session_id=session_id, guild_id=str(session["guildId"]), voice_channel_id=str(session["voiceChannelId"]), text_channel_id=str(session["textChannelId"]), ) result = update_watch_party_status( session_id, "connecting", worker_session_id=str(worker.get("workerSessionId", session_id)), current_queue_entry_id=session["currentQueueEntryId"], ) result = update_worker_state( session_id, worker_status=str(worker.get("workerStatus", "idle")), playback_state=str(worker.get("playbackState", "idle")), current_title=str(worker.get("currentTitle", "")), position_seconds=int(worker.get("positionSeconds", 0) or 0), duration_seconds=int(worker.get("durationSeconds", 0) or 0), last_error=str(worker.get("lastError", "")), ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, {"worker": worker, **result}) def handle_watchparty_worker_play_next(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) queue_entry = first_queued_entry(session_id) if queue_entry is None: raise ValueError("No queued items available") media_item = catalog_item_for_source_id(runtime, str(queue_entry["jellyfinSourceId"])) if media_item is None: raise ValueError(f"Queued media item no longer exists in the saved library: {queue_entry['jellyfinSourceId']}") playback = resolve_jellyfin_playback_source(runtime, media_item) worker = worker_play( session_id=session_id, queue_entry_id=int(queue_entry["id"]), jellyfin_source_id=str(queue_entry["jellyfinSourceId"]), title=str(queue_entry["title"]), media_type=str(queue_entry["mediaType"]), playback=playback, ) update_queue_entry_status(session_id, int(queue_entry["id"]), "playing") update_watch_party_status( session_id, "playing", current_queue_entry_id=int(queue_entry["id"]), ) result = update_worker_state( session_id, worker_status=str(worker.get("workerStatus", "idle")), playback_state=str(worker.get("playbackState", "idle")), current_title=str(worker.get("currentTitle", "")), position_seconds=int(worker.get("positionSeconds", 0) or 0), duration_seconds=int(worker.get("durationSeconds", 0) or 0), last_error=str(worker.get("lastError", "")), ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, {"worker": worker, "playback": playback, **result}) def handle_watchparty_worker_control(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) action = str(data.get("action", "")).strip() command_data = data.get("data", {}) if not isinstance(command_data, dict): raise ValueError("Worker control data must be an object") worker = worker_control(session_id=session_id, action=action, data=command_data) if action == "pause": update_watch_party_status(session_id, "paused") elif action == "resume": update_watch_party_status(session_id, "playing") elif action == "stop": update_watch_party_status(session_id, "stopped") result = update_worker_state( session_id, worker_status=str(worker.get("workerStatus", "idle")), playback_state=str(worker.get("playbackState", "idle")), current_title=str(worker.get("currentTitle", "")), position_seconds=int(worker.get("positionSeconds", 0) or 0), duration_seconds=int(worker.get("durationSeconds", 0) or 0), last_error=str(worker.get("lastError", "")), ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, {"worker": worker, **result}) def handle_watchparty_worker_status(self) -> None: try: data = self.read_json() session_id = int(str(data.get("sessionId", "")).strip()) worker = worker_status(session_id) result = update_worker_state( session_id, worker_status=str(worker.get("workerStatus", "idle")), playback_state=str(worker.get("playbackState", "idle")), current_title=str(worker.get("currentTitle", "")), position_seconds=int(worker.get("positionSeconds", 0) or 0), duration_seconds=int(worker.get("durationSeconds", 0) or 0), last_error=str(worker.get("lastError", "")), ) except Exception as exc: self.send_json(HTTPStatus.BAD_REQUEST, {"error": str(exc)}) return self.send_json(HTTPStatus.OK, {"worker": worker, **result}) return DashboardHandler def maybe_start_dashboard(runtime: BotRuntime) -> ThreadingHTTPServer | None: if not bool_env("DASHBOARD_ENABLED", False): return None host = os.getenv("DASHBOARD_HOST", "127.0.0.1").strip() or "127.0.0.1" port = int(os.getenv("DASHBOARD_PORT", "8787")) auth = dashboard_auth_from_env() server = ThreadingHTTPServer((host, port), make_dashboard_handler(runtime, auth)) thread = __import__("threading").Thread(target=server.serve_forever, name="dashboard", daemon=True) thread.start() auth_note = "without auth" if auth is None else "with password sessions" print(f"Dashboard running at http://{host}:{port} ({auth_note})", flush=True) return server