From 0080b762906d1d41fd4df1ad615ed4972349d2cf Mon Sep 17 00:00:00 2001 From: rambros Date: Sat, 7 Mar 2026 22:00:14 +0530 Subject: [PATCH] QOL: auto fetching guilds --- README.md | 8 +- src/core/discord_reader.py | 20 +++++ src/fluxer/writer.py | 22 +++++ src/stoat/writer.py | 56 ++++++++++++ src/ui/main_app.py | 173 ++++++++++++++++++++++++++++++------- 5 files changed, 248 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 4d9df34..9f93c0e 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ The tool now features a unified, intuitive TUI (Terminal User Interface) - no mo | - Text Messages | 🟩 | 🟩 | | - File Attachments | 🟩 | 🟩 *some file extensions not supported*| | - Preserve Reply Links | 🟩 | 🟩 | +| - Webhook Embeds | 🟩 | 🟩 | | - Threads | 🟩 | 🟩 | | - Forums | ⚠️ | ⚠️ | @@ -137,11 +138,16 @@ But now their own website states that **Persona** will be used in some countries --- ### Documentation -Detailed info about the code can be found at https://deepwiki.com/rambros3d/disco-reaper +- [![Website](https://img.shields.io/badge/Website-rambros3d.com-blue?style=flat&logo=googlechrome&logoColor=white)](https://reaper.rambros3d.com/) - view bot setup guides, tool usage guides, and backup viewer +- [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/rambros3d/disco-reaper) - incredibily good AI docs ### Vibe Code Notice - Code is provided as is; This tool was developed with AI. - Take it, use it, modify it, feel free to do whatever you wish. +### Star History + +[![Star History Chart](https://api.star-history.com/image?repos=rambros3d/disco-reaper&type=date&legend=top-left)](https://www.star-history.com/?repos=rambros3d%2Fdisco-reaper&type=date&legend=top-left) + --- \ No newline at end of file diff --git a/src/core/discord_reader.py b/src/core/discord_reader.py index 811eea4..b29eaff 100644 --- a/src/core/discord_reader.py +++ b/src/core/discord_reader.py @@ -31,6 +31,26 @@ class DiscordReader: """Factory for discord.PermissionOverwrite, keeps the import centralized.""" return discord.PermissionOverwrite() + @staticmethod + async def fetch_guilds(token: str) -> list[tuple[str, str]]: + """Fetches the list of guilds the bot is a member of. Returns list of (name, id).""" + intents = discord.Intents.default() + intents.guilds = True + client = discord.Client(intents=intents) + guilds_list = [] + try: + # We use a short-lived client just to fetch the guilds + await client.login(token) + async for guild in client.fetch_guilds(limit=None): + label = f"{guild.id}-{guild.name}" + guilds_list.append((label, str(guild.id))) + except Exception as e: + logger.error(f"Failed to fetch Discord guilds: {e}") + raise + finally: + await client.close() + return guilds_list + def __init__(self, token: str, server_id: str): self.token = token try: diff --git a/src/fluxer/writer.py b/src/fluxer/writer.py index 30fb3c6..532e3a6 100644 --- a/src/fluxer/writer.py +++ b/src/fluxer/writer.py @@ -16,6 +16,28 @@ class FluxerWriter: self._ready_event = asyncio.Event() self._webhooks: Dict[str, Webhook] = {} # channel_id -> Webhook + @staticmethod + async def fetch_guilds(token: str, api_url: str = "default") -> list[tuple[str, str]]: + """Fetches the list of Fluxer communities the bot is in. Returns list of (label, id).""" + from fluxer import HTTPClient, Guild + + http_kwargs = {} + if api_url and api_url != "default": + http_kwargs["api_url"] = api_url + + async with HTTPClient(token, **http_kwargs) as http: + try: + guilds_data = await http.get_current_user_guilds() + guilds_list = [] + for g_data in guilds_data: + g = Guild.from_data(g_data) + label = f"{g.id}-{g.name}" + guilds_list.append((label, str(g.id))) + return guilds_list + except Exception as e: + logger.error(f"Failed to fetch Fluxer communities via HTTP: {e}") + raise + async def _get_or_create_webhook(self, channel_id: str) -> Optional[Webhook]: """Gets an existing webhook for the channel or creates one.""" if channel_id in self._webhooks: diff --git a/src/stoat/writer.py b/src/stoat/writer.py index 806d5aa..409904f 100644 --- a/src/stoat/writer.py +++ b/src/stoat/writer.py @@ -1,3 +1,4 @@ +import asyncio import logging import stoat from typing import Optional, List, Dict, Any @@ -10,6 +11,61 @@ class StoatWriter: self.community_id = str(community_id) self.api_url = api_url + @staticmethod + async def fetch_guilds(token: str, api_url: str = "default") -> list[tuple[str, str]]: + """Fetches the list of Stoat servers the bot is in. Returns list of (label, id).""" + client_kwargs = {"token": token, "bot": True} + if api_url and api_url != "default": + client_kwargs["http_base"] = api_url + + client = stoat.Client(**client_kwargs) + ready_event = asyncio.Event() + servers_list = [] + + @client.on(stoat.ReadyEvent) + async def on_ready(event: stoat.ReadyEvent): + nonlocal servers_list + servers_list = event.servers + ready_event.set() + + client_task = asyncio.create_task(client.start()) + guilds_list = [] + try: + # Wait for bot to be ready OR for the task to fail + done, pending = await asyncio.wait( + [asyncio.create_task(ready_event.wait()), client_task], + return_when=asyncio.FIRST_COMPLETED, + timeout=10.0 + ) + + if ready_event.is_set(): + for s in servers_list: + label = f"{s.id}-{s.name}" + guilds_list.append((label, str(s.id))) + else: + # If we got here, either it timed out or client_task finished early + if client_task in done: + # Check for exception in the client task + exc = client_task.exception() + if exc: + raise exc + else: + raise Exception("Client task finished early without ready event") + else: + raise asyncio.TimeoutError("Timed out waiting for Stoat to be ready") + except Exception as e: + logger.error(f"Failed to fetch Stoat servers: {e}") + raise + finally: + await client.close() + client_task.cancel() + try: + await client_task + except asyncio.CancelledError: + pass + + return guilds_list + async def start(self): client_kwargs = {"token": self.token, "bot": True} if self.api_url and self.api_url != "default": diff --git a/src/ui/main_app.py b/src/ui/main_app.py index 66a0a75..60edf19 100644 --- a/src/ui/main_app.py +++ b/src/ui/main_app.py @@ -1,12 +1,15 @@ import re import os +import logging from pathlib import Path +logger = logging.getLogger(__name__) + from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical, VerticalScroll from textual.widgets import ( Header, Footer, Button, Label, Input, ListItem, - ListView, Rule, RadioButton, RadioSet, + ListView, Rule, RadioButton, RadioSet, Select, ) from textual.screen import Screen, ModalScreen @@ -188,6 +191,10 @@ class ConfigScreen(Screen): #target_section { height: auto; } #cfg_actions { height: auto; margin-top: 1; margin-bottom: 0; dock: bottom; } #cfg_actions Button { width: 1fr; margin: 0 1; } + .fetch_row { height: auto; align: left middle; margin-bottom: 1; } + .fetch_row Input { width: 1fr; } + .fetch_row Button { width: auto; margin-left: 1; } + #inp_discord_server { margin-bottom: 1; } """ BINDINGS = [("escape", "go_back", "Back")] @@ -206,17 +213,21 @@ class ConfigScreen(Screen): with VerticalScroll(id="cfg_scroll"): # ── Discord ────────────────────────────────────────────── - yield Label("Discord", classes="section_title") - yield Label("Bot Token:", classes="field_label") - yield Input( - value=self.config.discord_bot_token or "", - id="inp_discord_token", - password=True, - ) + yield Label("Discord Bot Token:", classes="field_label") + with Horizontal(classes="fetch_row"): + yield Input( + value=self.config.discord_bot_token or "", + id="inp_discord_token", + password=True, + placeholder="Paste Bot Token here" + ) + yield Button("Fetch", id="btn_fetch_guilds", variant="primary") + yield Label("Server ID:", classes="field_label") - yield Input( - value=self.config.discord_server_id or "", + yield Select( + options=[], id="inp_discord_server", + prompt="Select a Server" ) # ── Reaper Mode ────────────────────────────────────────── @@ -255,16 +266,22 @@ class ConfigScreen(Screen): value=(cur_plat == "stoat"), ) yield Label("Bot Token:", classes="field_label") - yield Input( - value=self.config.target_bot_token or "", - id="inp_target_token", - password=True, - ) + with Horizontal(classes="fetch_row"): + yield Input( + value=self.config.target_bot_token or "", + id="inp_target_token", + password=True, + placeholder="Paste Target Bot Token" + ) + yield Button("Fetch", id="btn_fetch_target_servers", variant="primary") + yield Label("Community / Server ID:", classes="field_label") - yield Input( - value=self.config.target_server_id or "", + yield Select( + options=[], id="inp_target_server", + prompt="Select a Community/Server" ) + yield Label("Target API URL:", classes="field_label") yield Input( value=self.config.target_api_url or "default", @@ -279,8 +296,106 @@ class ConfigScreen(Screen): def on_mount(self) -> None: self._toggle_target_section() + # If we have a token, try to populate the select widget on mount + if self.config.discord_bot_token and self.config.discord_bot_token != "DISCORD_BOT_TOKEN": + self.run_worker(self._do_fetch_guilds(self.config.discord_bot_token, initial=True)) + + # Also auto-fetch target servers if mode is not backup_only + if self._get_selected_mode() != "backup_only": + if self.config.target_bot_token and self.config.target_bot_token not in ("TARGET_BOT_TOKEN", ""): + platform = self.config.target_platform + if platform != "none": + self.run_worker(self._do_fetch_target_servers( + token=self.config.target_bot_token, + api_url=self.config.target_api_url, + platform=platform, + initial=True + )) - # ── show / hide the target platform section ────────────────────────── + async def _do_fetch_guilds(self, token: str, initial: bool = False) -> None: + """Background worker to fetch guilds and update the Select widget.""" + from src.core.discord_reader import DiscordReader + try: + guilds = await DiscordReader.fetch_guilds(token) + + if not guilds: + if not initial: + self.notify("No Discord servers found or invalid token.", severity="warning") + return + + options = [(name, gid) for name, gid in guilds] + select_widget = self.query_one("#inp_discord_server", Select) + select_widget.set_options(options) + + # Restore saved value if it exists in the fetched list + saved_id = self.config.discord_server_id + if saved_id and any(gid == saved_id for _, gid in guilds): + select_widget.value = saved_id + except Exception as e: + if not initial: + self.notify(f"Failed to fetch Discord servers: {e}", severity="error") + + async def _do_fetch_target_servers(self, token: str = None, api_url: str = None, platform: str = None, initial: bool = False) -> None: + """Background worker to fetch target platform servers.""" + if not platform: + platform = self._get_selected_platform() + if not token: + token = self.query_one("#inp_target_token", Input).value.strip() + if not api_url: + api_url = self.query_one("#inp_target_api", Input).value.strip() or "default" + + if not token or token == "TARGET_BOT_TOKEN": + return + + servers = [] + try: + if platform == "fluxer": + from src.fluxer.writer import FluxerWriter + servers = await FluxerWriter.fetch_guilds(token, api_url) + elif platform == "stoat": + from src.stoat.writer import StoatWriter + servers = await StoatWriter.fetch_guilds(token, api_url) + else: + return + except Exception as e: + logger.error(f"Failed to fetch {platform} servers: {e}") + if not initial: + self.notify(f"Failed to fetch {platform} servers: {e}", severity="error") + return + + if not servers: + if not initial: + self.notify(f"No {platform} servers found or invalid token.", severity="warning") + return + + options = [(label, sid) for label, sid in servers] + select_widget = self.query_one("#inp_target_server", Select) + select_widget.set_options(options) + + # Restore saved value + saved_id = self.config.target_server_id + if saved_id and any(sid == saved_id for _, sid in servers): + select_widget.value = saved_id + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn_fetch_guilds": + token = self.query_one("#inp_discord_token", Input).value.strip() + if not token or token == "DISCORD_BOT_TOKEN": + self.notify("Please enter a valid Bot Token first.", severity="error") + return + self.run_worker(self._do_fetch_guilds(token)) + elif event.button.id == "btn_fetch_target_servers": + token = self.query_one("#inp_target_token", Input).value.strip() + if not token: + self.notify("Please enter a valid Target Platform Token first.", severity="error") + return + self.run_worker(self._do_fetch_target_servers()) + elif event.button.id == "btn_back": + self.app.pop_screen() + elif event.button.id == "btn_save": + self._collect_and_save() + self.notify("Configuration saved.", severity="information") + self.dismiss(True) def _get_selected_mode(self) -> str: for rb in self.query("#mode_radio RadioButton"): @@ -306,13 +421,21 @@ class ConfigScreen(Screen): def _collect_and_save(self) -> None: self.config.discord_bot_token = self.query_one("#inp_discord_token", Input).value.strip() or self.config.discord_bot_token - self.config.discord_server_id = self.query_one("#inp_discord_server", Input).value.strip() or self.config.discord_server_id + + server_select = self.query_one("#inp_discord_server", Select) + if server_select.value != Select.BLANK: + self.config.discord_server_id = str(server_select.value) + self.config.tool_mode = self._get_selected_mode() if self.config.tool_mode != "backup_only": self.config.target_platform = self._get_selected_platform() self.config.target_bot_token = self.query_one("#inp_target_token", Input).value.strip() or self.config.target_bot_token - self.config.target_server_id = self.query_one("#inp_target_server", Input).value.strip() or self.config.target_server_id + + target_select = self.query_one("#inp_target_server", Select) + if target_select.value != Select.BLANK: + self.config.target_server_id = str(target_select.value) + self.config.target_api_url = self.query_one("#inp_target_api", Input).value.strip() or self.config.target_api_url else: self.config.target_platform = "none" @@ -322,16 +445,6 @@ class ConfigScreen(Screen): def _launch_mode(self) -> None: pass # No longer needed - def action_go_back(self) -> None: - self.app.pop_screen() - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_back": - self.app.pop_screen() - elif event.button.id == "btn_save": - self._collect_and_save() - self.notify("Configuration saved.", severity="information") - self.dismiss(True) # ──────────────────────────────────────────────────────────────────────────────