QOL: auto fetching guilds

This commit is contained in:
rambros 2026-03-07 22:00:14 +05:30
parent e5758a26d9
commit 0080b76290
5 changed files with 248 additions and 31 deletions

View file

@ -33,6 +33,7 @@ The tool now features a unified, intuitive TUI (Terminal User Interface) - no mo
| - Text Messages | 🟩 | 🟩 |
| - File Attachments | 🟩 | 🟩 *some file extensions not supported*|
| - Preserve Reply Links | 🟩 | 🟩 |
| - Webhook Embeds | 🟩 | 🟩 |
| - Threads | 🟩 | 🟩 |
| - Forums | ⚠️ | ⚠️ |
@ -137,11 +138,16 @@ But now their own website states that **Persona** will be used in some countries
---
### Documentation
Detailed info about the code can be found at https://deepwiki.com/rambros3d/disco-reaper
- [![Website](https://img.shields.io/badge/Website-rambros3d.com-blue?style=flat&logo=googlechrome&logoColor=white)](https://reaper.rambros3d.com/) - view bot setup guides, tool usage guides, and backup viewer
- [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/rambros3d/disco-reaper) - incredibily good AI docs
### Vibe Code Notice
- Code is provided as is; This tool was developed with AI.
- Take it, use it, modify it, feel free to do whatever you wish.
### Star History
[![Star History Chart](https://api.star-history.com/image?repos=rambros3d/disco-reaper&type=date&legend=top-left)](https://www.star-history.com/?repos=rambros3d%2Fdisco-reaper&type=date&legend=top-left)
---

View file

@ -31,6 +31,26 @@ class DiscordReader:
"""Factory for discord.PermissionOverwrite, keeps the import centralized."""
return discord.PermissionOverwrite()
@staticmethod
async def fetch_guilds(token: str) -> list[tuple[str, str]]:
"""Fetches the list of guilds the bot is a member of. Returns list of (name, id)."""
intents = discord.Intents.default()
intents.guilds = True
client = discord.Client(intents=intents)
guilds_list = []
try:
# We use a short-lived client just to fetch the guilds
await client.login(token)
async for guild in client.fetch_guilds(limit=None):
label = f"{guild.id}-{guild.name}"
guilds_list.append((label, str(guild.id)))
except Exception as e:
logger.error(f"Failed to fetch Discord guilds: {e}")
raise
finally:
await client.close()
return guilds_list
def __init__(self, token: str, server_id: str):
self.token = token
try:

View file

@ -16,6 +16,28 @@ class FluxerWriter:
self._ready_event = asyncio.Event()
self._webhooks: Dict[str, Webhook] = {} # channel_id -> Webhook
@staticmethod
async def fetch_guilds(token: str, api_url: str = "default") -> list[tuple[str, str]]:
"""Fetches the list of Fluxer communities the bot is in. Returns list of (label, id)."""
from fluxer import HTTPClient, Guild
http_kwargs = {}
if api_url and api_url != "default":
http_kwargs["api_url"] = api_url
async with HTTPClient(token, **http_kwargs) as http:
try:
guilds_data = await http.get_current_user_guilds()
guilds_list = []
for g_data in guilds_data:
g = Guild.from_data(g_data)
label = f"{g.id}-{g.name}"
guilds_list.append((label, str(g.id)))
return guilds_list
except Exception as e:
logger.error(f"Failed to fetch Fluxer communities via HTTP: {e}")
raise
async def _get_or_create_webhook(self, channel_id: str) -> Optional[Webhook]:
"""Gets an existing webhook for the channel or creates one."""
if channel_id in self._webhooks:

View file

@ -1,3 +1,4 @@
import asyncio
import logging
import stoat
from typing import Optional, List, Dict, Any
@ -10,6 +11,61 @@ class StoatWriter:
self.community_id = str(community_id)
self.api_url = api_url
@staticmethod
async def fetch_guilds(token: str, api_url: str = "default") -> list[tuple[str, str]]:
"""Fetches the list of Stoat servers the bot is in. Returns list of (label, id)."""
client_kwargs = {"token": token, "bot": True}
if api_url and api_url != "default":
client_kwargs["http_base"] = api_url
client = stoat.Client(**client_kwargs)
ready_event = asyncio.Event()
servers_list = []
@client.on(stoat.ReadyEvent)
async def on_ready(event: stoat.ReadyEvent):
nonlocal servers_list
servers_list = event.servers
ready_event.set()
client_task = asyncio.create_task(client.start())
guilds_list = []
try:
# Wait for bot to be ready OR for the task to fail
done, pending = await asyncio.wait(
[asyncio.create_task(ready_event.wait()), client_task],
return_when=asyncio.FIRST_COMPLETED,
timeout=10.0
)
if ready_event.is_set():
for s in servers_list:
label = f"{s.id}-{s.name}"
guilds_list.append((label, str(s.id)))
else:
# If we got here, either it timed out or client_task finished early
if client_task in done:
# Check for exception in the client task
exc = client_task.exception()
if exc:
raise exc
else:
raise Exception("Client task finished early without ready event")
else:
raise asyncio.TimeoutError("Timed out waiting for Stoat to be ready")
except Exception as e:
logger.error(f"Failed to fetch Stoat servers: {e}")
raise
finally:
await client.close()
client_task.cancel()
try:
await client_task
except asyncio.CancelledError:
pass
return guilds_list
async def start(self):
client_kwargs = {"token": self.token, "bot": True}
if self.api_url and self.api_url != "default":

View file

@ -1,12 +1,15 @@
import re
import os
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
from textual.widgets import (
Header, Footer, Button, Label, Input, ListItem,
ListView, Rule, RadioButton, RadioSet,
ListView, Rule, RadioButton, RadioSet, Select,
)
from textual.screen import Screen, ModalScreen
@ -188,6 +191,10 @@ class ConfigScreen(Screen):
#target_section { height: auto; }
#cfg_actions { height: auto; margin-top: 1; margin-bottom: 0; dock: bottom; }
#cfg_actions Button { width: 1fr; margin: 0 1; }
.fetch_row { height: auto; align: left middle; margin-bottom: 1; }
.fetch_row Input { width: 1fr; }
.fetch_row Button { width: auto; margin-left: 1; }
#inp_discord_server { margin-bottom: 1; }
"""
BINDINGS = [("escape", "go_back", "Back")]
@ -206,17 +213,21 @@ class ConfigScreen(Screen):
with VerticalScroll(id="cfg_scroll"):
# ── Discord ──────────────────────────────────────────────
yield Label("Discord", classes="section_title")
yield Label("Bot Token:", classes="field_label")
yield Label("Discord Bot Token:", classes="field_label")
with Horizontal(classes="fetch_row"):
yield Input(
value=self.config.discord_bot_token or "",
id="inp_discord_token",
password=True,
placeholder="Paste Bot Token here"
)
yield Button("Fetch", id="btn_fetch_guilds", variant="primary")
yield Label("Server ID:", classes="field_label")
yield Input(
value=self.config.discord_server_id or "",
yield Select(
options=[],
id="inp_discord_server",
prompt="Select a Server"
)
# ── Reaper Mode ──────────────────────────────────────────
@ -255,16 +266,22 @@ class ConfigScreen(Screen):
value=(cur_plat == "stoat"),
)
yield Label("Bot Token:", classes="field_label")
with Horizontal(classes="fetch_row"):
yield Input(
value=self.config.target_bot_token or "",
id="inp_target_token",
password=True,
placeholder="Paste Target Bot Token"
)
yield Button("Fetch", id="btn_fetch_target_servers", variant="primary")
yield Label("Community / Server ID:", classes="field_label")
yield Input(
value=self.config.target_server_id or "",
yield Select(
options=[],
id="inp_target_server",
prompt="Select a Community/Server"
)
yield Label("Target API URL:", classes="field_label")
yield Input(
value=self.config.target_api_url or "default",
@ -279,8 +296,106 @@ class ConfigScreen(Screen):
def on_mount(self) -> None:
self._toggle_target_section()
# If we have a token, try to populate the select widget on mount
if self.config.discord_bot_token and self.config.discord_bot_token != "DISCORD_BOT_TOKEN":
self.run_worker(self._do_fetch_guilds(self.config.discord_bot_token, initial=True))
# ── show / hide the target platform section ──────────────────────────
# Also auto-fetch target servers if mode is not backup_only
if self._get_selected_mode() != "backup_only":
if self.config.target_bot_token and self.config.target_bot_token not in ("TARGET_BOT_TOKEN", ""):
platform = self.config.target_platform
if platform != "none":
self.run_worker(self._do_fetch_target_servers(
token=self.config.target_bot_token,
api_url=self.config.target_api_url,
platform=platform,
initial=True
))
async def _do_fetch_guilds(self, token: str, initial: bool = False) -> None:
"""Background worker to fetch guilds and update the Select widget."""
from src.core.discord_reader import DiscordReader
try:
guilds = await DiscordReader.fetch_guilds(token)
if not guilds:
if not initial:
self.notify("No Discord servers found or invalid token.", severity="warning")
return
options = [(name, gid) for name, gid in guilds]
select_widget = self.query_one("#inp_discord_server", Select)
select_widget.set_options(options)
# Restore saved value if it exists in the fetched list
saved_id = self.config.discord_server_id
if saved_id and any(gid == saved_id for _, gid in guilds):
select_widget.value = saved_id
except Exception as e:
if not initial:
self.notify(f"Failed to fetch Discord servers: {e}", severity="error")
async def _do_fetch_target_servers(self, token: str = None, api_url: str = None, platform: str = None, initial: bool = False) -> None:
"""Background worker to fetch target platform servers."""
if not platform:
platform = self._get_selected_platform()
if not token:
token = self.query_one("#inp_target_token", Input).value.strip()
if not api_url:
api_url = self.query_one("#inp_target_api", Input).value.strip() or "default"
if not token or token == "TARGET_BOT_TOKEN":
return
servers = []
try:
if platform == "fluxer":
from src.fluxer.writer import FluxerWriter
servers = await FluxerWriter.fetch_guilds(token, api_url)
elif platform == "stoat":
from src.stoat.writer import StoatWriter
servers = await StoatWriter.fetch_guilds(token, api_url)
else:
return
except Exception as e:
logger.error(f"Failed to fetch {platform} servers: {e}")
if not initial:
self.notify(f"Failed to fetch {platform} servers: {e}", severity="error")
return
if not servers:
if not initial:
self.notify(f"No {platform} servers found or invalid token.", severity="warning")
return
options = [(label, sid) for label, sid in servers]
select_widget = self.query_one("#inp_target_server", Select)
select_widget.set_options(options)
# Restore saved value
saved_id = self.config.target_server_id
if saved_id and any(sid == saved_id for _, sid in servers):
select_widget.value = saved_id
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_fetch_guilds":
token = self.query_one("#inp_discord_token", Input).value.strip()
if not token or token == "DISCORD_BOT_TOKEN":
self.notify("Please enter a valid Bot Token first.", severity="error")
return
self.run_worker(self._do_fetch_guilds(token))
elif event.button.id == "btn_fetch_target_servers":
token = self.query_one("#inp_target_token", Input).value.strip()
if not token:
self.notify("Please enter a valid Target Platform Token first.", severity="error")
return
self.run_worker(self._do_fetch_target_servers())
elif event.button.id == "btn_back":
self.app.pop_screen()
elif event.button.id == "btn_save":
self._collect_and_save()
self.notify("Configuration saved.", severity="information")
self.dismiss(True)
def _get_selected_mode(self) -> str:
for rb in self.query("#mode_radio RadioButton"):
@ -306,13 +421,21 @@ class ConfigScreen(Screen):
def _collect_and_save(self) -> None:
self.config.discord_bot_token = self.query_one("#inp_discord_token", Input).value.strip() or self.config.discord_bot_token
self.config.discord_server_id = self.query_one("#inp_discord_server", Input).value.strip() or self.config.discord_server_id
server_select = self.query_one("#inp_discord_server", Select)
if server_select.value != Select.BLANK:
self.config.discord_server_id = str(server_select.value)
self.config.tool_mode = self._get_selected_mode()
if self.config.tool_mode != "backup_only":
self.config.target_platform = self._get_selected_platform()
self.config.target_bot_token = self.query_one("#inp_target_token", Input).value.strip() or self.config.target_bot_token
self.config.target_server_id = self.query_one("#inp_target_server", Input).value.strip() or self.config.target_server_id
target_select = self.query_one("#inp_target_server", Select)
if target_select.value != Select.BLANK:
self.config.target_server_id = str(target_select.value)
self.config.target_api_url = self.query_one("#inp_target_api", Input).value.strip() or self.config.target_api_url
else:
self.config.target_platform = "none"
@ -322,16 +445,6 @@ class ConfigScreen(Screen):
def _launch_mode(self) -> None:
pass # No longer needed
def action_go_back(self) -> None:
self.app.pop_screen()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_back":
self.app.pop_screen()
elif event.button.id == "btn_save":
self._collect_and_save()
self.notify("Configuration saved.", severity="information")
self.dismiss(True)
# ──────────────────────────────────────────────────────────────────────────────