commit 198612d5f0b6ae9efc08fbe7b6c98789cd66b2a6 Author: rambros Date: Fri Feb 20 11:58:50 2026 +0530 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d0ada39 --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +ENV/ + +# Configuration and Secrets +config.yaml +!config.example.yaml + +# Logs and State +migration.log +state.json + +# Temporary Test Scripts +test_*.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..75af5e6 --- /dev/null +++ b/README.md @@ -0,0 +1,39 @@ +# Discord Reaper (to Fluxer) + +A state-of-the-art highly resumable interactive CLI for cloning a Discord server to a Fluxer community. + +## Project Architecture + +The architecture separates the generic orchestration UI from the target and source logic. +- `src/discord_bot/reader.py`: Wraps `discord.py` to fetch server items, channels, users, roles, and message histories. +- `src/fluxer_bot/writer.py`: Wraps `fluxer.py` and creates mirrored items in Fluxer. +- `src/core/engine.py`: Manages reading from one end, writing to the other, and keeping state in `state.json`. +- `src/ui/app.py`: The `rich`-based interactive CLI that binds it all together in an elegant terminal app. + +## Setup Instructions + +1. Install requirements: + ```bash + pip install -r requirements.txt + ``` +2. Configure your properties in `config.yaml` with the correct Discord Bot Token, Fluxer Bot Token, and IDs. +3. Run the application: + ```bash + python main.py + ``` + +## Production Suggestions + +### 1. Speed & Concurrency +- **Asynchronous Batching**: The current engine reads one message and writes one message. To boost speed significantly without being rate-limited, read history aggressively using `asyncio.gather` for attachments and embeds, but queue write requests. +- **Database Backend**: Migrate `state.json` to `aiosqlite`. A JSON file becomes a bottleneck when state has hundreds of thousands of keys and gets rewritten constantly to disk. SQLite ensures safe concurrent commits. + +### 2. Safety & Error Recovery +- **Rate Limit Handlers**: `discord.py` natively respects 429s for Discord. For Fluxer, implement a leaky bucket or backoff wrapper inside `fluxer_bot/writer.py` to avoid 429 HTTP bans. Make sure the TUI reports when a rate-limit sleep occurs. +- **Attachment Caching**: When copying heavy attachments, stream to a temporary local disk file (e.g. `/tmp/discord_reaper/`), upload it to Fluxer, then dynamically delete it. Holding attachments in RAM will cause OOM crashes on large media servers. +- **Resumability Constraints**: Expand `state.py` to store not just channel offsets, but also thread IDs and role mapping hashes, ensuring total state recovery on unexpected exits. + +### 3. CLI UX Improvements +- **Interactive Selectors**: Replace the static "Run Migration" with a dynamic prompt menu using `rich` or `questionary` that lists Discord categories and channels. Users should be able to check/uncheck categories they want to exclude from the migration. +- **Color Coding**: Map log levels (INFO, ERROR, WARN) to Rich console colors (green, red, yellow). +- **Graceful Abort**: The tool gracefully handles exit commands, but adding a hotkey listener or `asyncio.Task.cancel()` for immediate stops during hanging requests could enhance responsiveness. diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..f8c2094 --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,8 @@ +discord_bot_token: DISCORD_BOT_TOKEN +fluxer_bot_token: FLUXER_BOT_TOKEN +discord_server_id: 'DISCORD_SERVER_ID' +fluxer_community_id: 'FLUXER_COMMUNITY_ID' +migration: + batch_size: 50 + rate_limit_delay_seconds: 2 + log_level: INFO diff --git a/main.py b/main.py new file mode 100644 index 0000000..9ad66c4 --- /dev/null +++ b/main.py @@ -0,0 +1,38 @@ +import sys +import asyncio +import logging +from src.ui.app import run_cli +from src.config import load_config + +def setup_logging(): + try: + config = load_config() + log_level_str = config.migration.log_level.upper() + level = getattr(logging, log_level_str, logging.INFO) + except Exception: + level = logging.INFO + + handlers = [logging.FileHandler('migration.log', mode='a')] + if level == logging.DEBUG: + handlers.append(logging.StreamHandler(sys.stdout)) + + logging.basicConfig( + format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=level, + handlers=handlers + ) + +def main(): + setup_logging() + try: + asyncio.run(run_cli()) + except KeyboardInterrupt: + print("\nOperation interrupted by user.") + sys.exit(0) + except Exception as e: + print(f"Failed to start tool: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9501f97 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +discord.py>=2.3.2 +git+https://github.com/akarealemil/fluxer.py +rich>=13.7.0 +PyYAML>=6.0.1 +pydantic>=2.5.3 # Good for configuration validation and mapping diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..a58f6cb --- /dev/null +++ b/src/config.py @@ -0,0 +1,35 @@ +import yaml +from pathlib import Path +from pydantic import BaseModel, Field + +class MigrationSettings(BaseModel): + batch_size: int = Field(default=50) + rate_limit_delay_seconds: int = Field(default=2) + log_level: str = Field(default="INFO") + +class AppConfig(BaseModel): + discord_bot_token: str + fluxer_bot_token: str + discord_server_id: str + fluxer_community_id: str + migration: MigrationSettings = Field(default_factory=MigrationSettings) + +def load_config(config_path: str | Path = "config.yaml") -> AppConfig: + path = Path(config_path) + if not path.exists(): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if not data: + raise ValueError("Configuration file is empty or invalid YAML.") + + return AppConfig(**data) + +def save_config(config: AppConfig, config_path: str | Path = "config.yaml"): + path = Path(config_path) + # Dump model to dictionary + data = config.model_dump() + with open(path, "w", encoding="utf-8") as f: + yaml.safe_dump(data, f, default_flow_style=False, sort_keys=False) diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/engine.py b/src/core/engine.py new file mode 100644 index 0000000..8d7c27e --- /dev/null +++ b/src/core/engine.py @@ -0,0 +1,197 @@ +import asyncio +import logging +from typing import Callable, Awaitable +from src.config import AppConfig +from src.core.state import MigrationState +from src.discord_bot.reader import DiscordReader +from src.fluxer_bot.writer import FluxerWriter + +logger = logging.getLogger(__name__) + +class MigrationEngine: + """Orchestrates reading from Discord and writing to Fluxer.""" + + def __init__(self, config: AppConfig): + self.config = config + self.state = MigrationState() + + self.discord_reader = DiscordReader( + token=config.discord_bot_token, + server_id=config.discord_server_id + ) + + self.fluxer_writer = FluxerWriter( + token=config.fluxer_bot_token, + community_id=config.fluxer_community_id + ) + + self.is_running = False + + async def validate_all(self) -> Dict[str, bool]: + """Returns True if both connections are valid.""" + try: + # Note: in real scenarios we might want concurrent validation + d_valid = await self.discord_reader.validate() + f_valid = await self.fluxer_writer.validate() + return { + "discord_token": d_valid.get("token", False), + "discord_server": d_valid.get("server", False), + "fluxer_token": f_valid.get("token", False), + "fluxer_community": f_valid.get("community", False) + } + except Exception as e: + logger.error(f"Validation failed with exception: {e}") + return { + "discord_token": False, + "discord_server": False, + "fluxer_token": False, + "fluxer_community": False + } + + async def start_connections(self): + await self.discord_reader.start() + await self.fluxer_writer.start() + + async def close_connections(self): + await self.discord_reader.close() + await self.fluxer_writer.close() + + async def migrate_channels(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): + """Clones categories and text channels.""" + categories = await self.discord_reader.get_categories() + channels = await self.discord_reader.get_channels() + + total = len(categories) + len(channels) + current_idx = 0 + + # Migrate Categories first + for cat in categories: + if not self.is_running: break + fluxer_id = self.state.get_fluxer_channel_id(str(cat.id)) + if not fluxer_id: + # 4 corresponds to Category type in Discord/Fluxer typically + fluxer_id = await self.fluxer_writer.create_channel(cat.name, type=4) + self.state.set_channel_mapping(str(cat.id), fluxer_id) + + current_idx += 1 + if progress_callback: await progress_callback(f"Cat: {cat.name}", current_idx, total) + await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) + + # Migrate Text Channels + for channel in channels: + if not self.is_running: break + + fluxer_id = self.state.get_fluxer_channel_id(str(channel.id)) + if not fluxer_id: + topic = channel.topic if channel.topic else "" + parent_id = self.state.get_fluxer_channel_id(str(channel.category_id)) if channel.category_id else None + + fluxer_id = await self.fluxer_writer.create_channel( + name=channel.name, + topic=topic, + type=0, + parent_id=parent_id + ) + self.state.set_channel_mapping(str(channel.id), fluxer_id) + + current_idx += 1 + if progress_callback: await progress_callback(channel.name, current_idx, total) + await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) + + async def migrate_messages(self, channel_id: int): + """Migrate messages for a specific channel.""" + fluxer_channel_id = self.state.get_fluxer_channel_id(str(channel_id)) + if not fluxer_channel_id: + logger.error(f"Cannot migrate messages: channel {channel_id} not mapped.") + return + + message_count = 0 + async for msg in self.discord_reader.fetch_message_history(channel_id): + if not self.is_running: + break + + # Process attachments + files = [] + for att in msg.attachments: + att_data = await self.discord_reader.download_attachment(att) + files.append({"filename": att.filename, "data": att_data}) + + await self.fluxer_writer.send_message( + channel_id=fluxer_channel_id, + author_name=msg.author.name, + content=msg.content, + timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), + files=files if files else None + ) + + self.state.update_last_message_timestamp(str(channel_id), str(msg.created_at)) + message_count += 1 + + # Delay for rate limit safety + await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) + + return message_count + + async def migrate_roles(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): + """Copies roles and their baseline permissions.""" + roles = await self.discord_reader.get_roles() + total = len(roles) + + for idx, role in enumerate(roles): + if not self.is_running: break + + fluxer_id = self.state.get_fluxer_channel_id(f"role_{role.id}") # reusing mapping method + if not fluxer_id: + fluxer_id = await self.fluxer_writer.create_role( + name=role.name, + color=role.color.value, + hoist=role.hoist, + mentionable=role.mentionable + ) + if fluxer_id: + self.state.set_channel_mapping(f"role_{role.id}", fluxer_id) + + if progress_callback: await progress_callback(role.name, idx + 1, total) + await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) + + async def migrate_emojis(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): + """Copies custom emojis and stickers.""" + emojis = await self.discord_reader.get_emojis() + total = len(emojis) + + for idx, emoji in enumerate(emojis): + if not self.is_running: break + + fluxer_id = self.state.get_fluxer_channel_id(f"emoji_{emoji.id}") + if not fluxer_id: + try: + img_data = await self.discord_reader.download_emoji(emoji) + fluxer_id = await self.fluxer_writer.create_emoji( + name=emoji.name, + image_bytes=img_data + ) + if fluxer_id: + self.state.set_channel_mapping(f"emoji_{emoji.id}", fluxer_id) + except Exception as e: + logger.error(f"Error downloading/uploading emoji {emoji.name}: {e}") + + if progress_callback: await progress_callback(emoji.name, idx + 1, total) + await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) + + async def run_full_migration(self): + self.is_running = True + try: + await self.start_connections() + await self.migrate_channels() + + # Example: just migrate one channel's messages to test + channels = await self.discord_reader.get_channels() + if channels: + await self.migrate_messages(channels[0].id) + + finally: + await self.close_connections() + self.is_running = False + + def stop(self): + self.is_running = False diff --git a/src/core/state.py b/src/core/state.py new file mode 100644 index 0000000..fc80008 --- /dev/null +++ b/src/core/state.py @@ -0,0 +1,48 @@ +import json +from pathlib import Path +from typing import Dict, Any + +class MigrationState: + """Manages persistence of the migration state to allow resumability.""" + + def __init__(self, state_file: str | Path = "state.json"): + self.state_file = Path(state_file) + # mappings: discord_id -> fluxer_id + self.channel_map: Dict[str, str] = {} + self.role_map: Dict[str, str] = {} + self.user_map: Dict[str, str] = {} + + # tracking last message timestamp per channel to resume + self.last_message_timestamps: Dict[str, str] = {} + + self.load() + + def load(self): + if self.state_file.exists(): + with open(self.state_file, "r", encoding="utf-8") as f: + data = json.load(f) + self.channel_map = data.get("channels", {}) + self.role_map = data.get("roles", {}) + self.user_map = data.get("users", {}) + self.last_message_timestamps = data.get("last_message_timestamps", {}) + + def save(self): + data = { + "channels": self.channel_map, + "roles": self.role_map, + "users": self.user_map, + "last_message_timestamps": self.last_message_timestamps + } + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=4) + + def set_channel_mapping(self, discord_id: str, fluxer_id: str): + self.channel_map[str(discord_id)] = str(fluxer_id) + self.save() + + def get_fluxer_channel_id(self, discord_id: str) -> str | None: + return self.channel_map.get(str(discord_id)) + + def update_last_message_timestamp(self, channel_id: str, timestamp: str): + self.last_message_timestamps[str(channel_id)] = timestamp + self.save() diff --git a/src/discord_bot/__init__.py b/src/discord_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/discord_bot/reader.py b/src/discord_bot/reader.py new file mode 100644 index 0000000..788458b --- /dev/null +++ b/src/discord_bot/reader.py @@ -0,0 +1,102 @@ +import discord +from typing import AsyncGenerator, Dict, Any + +class DiscordReader: + def __init__(self, token: str, server_id: str): + self.token = token + self.server_id = int(server_id) + + self.guild: discord.Guild | None = None + self.client: discord.Client | None = None + + def _create_client(self): + intents = discord.Intents.default() + intents.members = True + intents.message_content = True + intents.guilds = True + return discord.Client(intents=intents) + + async def start(self): + """Starts the Discord client to fetch metadata.""" + if not self.client or self.client.is_closed(): + self.client = self._create_client() + + await self.client.login(self.token) + # In a real app we'd wait until ready, here we simulate fetching the guild + self.guild = await self.client.fetch_guild(self.server_id) + + async def validate(self) -> Dict[str, bool]: + """Validates the token and server ID.""" + results = {"token": False, "server": False} + temp_client = self._create_client() + try: + await temp_client.login(self.token) + results["token"] = True + guild = await temp_client.fetch_guild(self.server_id) + if guild is not None: + results["server"] = True + except Exception: + pass + finally: + if not temp_client.is_closed(): + await temp_client.close() + return results + + async def get_server_metadata(self) -> Dict[str, Any]: + """Returns name, icon, and other metadata.""" + if not self.guild: + return {} + return { + "name": self.guild.name, + "id": str(self.guild.id), + "icon_url": self.guild.icon.url if self.guild.icon else None + } + + async def get_categories(self): + if not self.guild: + return [] + categories = await self.guild.fetch_channels() + return [c for c in categories if isinstance(c, discord.CategoryChannel)] + + async def get_roles(self): + """Returns all roles in the server (excluding @everyone).""" + if not self.guild: + return [] + roles = await self.guild.fetch_roles() + # Filter out default @everyone role which cannot typically be created + return [r for r in roles if not r.is_default()] + + async def get_emojis(self): + """Returns all custom emojis in the server.""" + if not self.guild: + return [] + return await self.guild.fetch_emojis() + + async def get_channels(self, category_id: int | None = None): + """Yields all non-category channels.""" + if not self.guild: + return [] + channels = await self.guild.fetch_channels() + all_channels = [c for c in channels if not isinstance(c, discord.CategoryChannel)] + if category_id: + all_channels = [c for c in all_channels if c.category_id == category_id] + return all_channels + + async def fetch_message_history(self, channel_id: int, limit: int = None) -> AsyncGenerator[discord.Message, None]: + """Yields messages from a given channel, optionally handling pagination.""" + channel = await self.client.fetch_channel(channel_id) + if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread): + # To avoid exploding RAM, we yield items one by one + async for message in channel.history(limit=limit, oldest_first=True): + yield message + + async def download_emoji(self, emoji: discord.Emoji) -> bytes: + """Downloads a Discord emoji into memory.""" + return await emoji.read() + + async def download_attachment(self, attachment: discord.Attachment) -> bytes: + """Downloads a Discord attachment into memory.""" + return await attachment.read() + + async def close(self): + await self.client.close() diff --git a/src/fluxer_bot/__init__.py b/src/fluxer_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fluxer_bot/writer.py b/src/fluxer_bot/writer.py new file mode 100644 index 0000000..7bca939 --- /dev/null +++ b/src/fluxer_bot/writer.py @@ -0,0 +1,121 @@ +# Supposing fluxer.py has an API similar to discord.py or requests based +# Since we don't have the exact library reference, we create a conceptual skeleton. + +from typing import Optional, List, Dict, Any +from fluxer.http import HTTPClient + +class FluxerWriter: + def __init__(self, token: str, community_id: str): + self.token = token + self.community_id = str(community_id) + self.client: Optional[HTTPClient] = None + + async def start(self): + """Authenticate with Fluxer.""" + self.client = HTTPClient(token=self.token, is_bot=True) + + async def validate(self) -> dict: + """Validates the token and community ID.""" + if not self.client: + await self.start() + + is_token_valid = False + is_community_valid = False + try: + # Check token by fetching me + await self.client.get_current_user() + is_token_valid = True + + # Check community + guild = await self.client.get_guild(self.community_id) + if guild: + is_community_valid = True + except Exception: + pass + + return { + "token": is_token_valid, + "community": is_community_valid + } + + async def create_channel(self, name: str, topic: str = "", type: int = 0, parent_id: Optional[str] = None) -> str: + """ + Creates a new channel in the target Fluxer community. + Returns the new Fluxer channel ID. + """ + assert self.client is not None + payload = { + "name": name, + "type": type, + } + if topic: + payload["topic"] = topic + if parent_id: + payload["parent_id"] = parent_id + + guild_channel = await self.client.request( + self.client._route("POST", "/guilds/{guild_id}/channels", guild_id=self.community_id), + json=payload + ) + return str(guild_channel["id"]) + + async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, files: Optional[List[Dict[str, Any]]] = None) -> None: + """ + Sends a message to the target channel. + """ + assert self.client is not None + + prefix = f"**[{timestamp}] {author_name}**:\n" + final_content = prefix + content if content else prefix + + try: + await self.client.send_message( + channel_id=channel_id, + content=final_content, + files=files + ) + except Exception as e: + # Handle empty messages if an attachment is the only content + print(f"Failed to copy message: {e}") + + async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str: + """ + Creates a new role in the Fluxer community. + Returns the new Fluxer role ID. + """ + assert self.client is not None + + try: + role = await self.client.create_guild_role( + guild_id=self.community_id, + name=name, + color=color, + hoist=hoist, + mentionable=mentionable + ) + return str(role["id"]) + except Exception as e: + print(f"Failed to copy role {name}: {e}") + return "" + + async def create_emoji(self, name: str, image_bytes: bytes) -> str: + """ + Creates a custom emoji in the Fluxer community. + """ + assert self.client is not None + + try: + emoji = await self.client.create_guild_emoji( + guild_id=self.community_id, + name=name, + image=image_bytes + ) + return str(emoji["id"]) + except Exception as e: + print(f"Failed to copy emoji {name}: {e}") + return "" + + async def close(self): + """Cleanly close connection.""" + if self.client: + await self.client.close() diff --git a/src/ui/__init__.py b/src/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ui/app.py b/src/ui/app.py new file mode 100644 index 0000000..f857a93 --- /dev/null +++ b/src/ui/app.py @@ -0,0 +1,282 @@ +import sys +import asyncio +from rich.console import Console +from rich.prompt import Prompt, Confirm +from rich.panel import Panel +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn +from src.config import load_config, save_config +from src.core.engine import MigrationEngine + +console = Console() + +class MigrationCLI: + """Standard CLI app to manage the Discord to Fluxer migration.""" + + def __init__(self): + try: + self.config = load_config() + except Exception as e: + console.print(f"[bold red]Failed to load config: {e}[/bold red]") + sys.exit(1) + + self.engine = MigrationEngine(self.config) + self.progress_callback_task = None + self.tokens_valid = False + + async def validate_config(self): + with console.status("[yellow]Validating tokens...[/yellow]"): + self.validation_results = await self.engine.validate_all() + self.tokens_valid = all(self.validation_results.values()) + + async def run(self): + console.print(Panel.fit("Discord Reaper", style="bold blue")) + await self.validate_config() + + while True: + console.print("\n[bold]Main Menu[/bold]") + console.print("(1) Clone Server Template (Channels & Categories)") + console.print("(2) Copy Roles & Permissions") + console.print("(3) Copy Emojis & Stickers") + console.print("(4) Migrate message history") + + val_status = "[bold green][VALID][/bold green]" if self.tokens_valid else "[bold red][INVALID][/bold red]" + console.print(f"(5) Configuration {val_status}") + + console.print("(Q) Exit") + + choice = Prompt.ask("Select an option", choices=["1", "2", "3", "4", "5", "Q", "q"], default="1").upper() + + if choice == "1": + await self.clone_server_template() + elif choice == "2": + await self.copy_roles() + elif choice == "3": + await self.copy_emojis() + elif choice == "4": + await self.migrate_message_history() + elif choice == "5": + await self.edit_configuration() + elif choice == "Q": + console.print("[yellow]Exiting tool...[/yellow]") + break + + async def edit_configuration(self): + console.print("\n[bold]Configuration Status:[/bold]") + + def get_status_str(is_valid): + return "[bold green][VALID][/bold green]" if is_valid else "[bold red][INVALID][/bold red]" + + console.print(f"Discord Bot Token {get_status_str(self.validation_results.get('discord_token', False))}") + console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}") + console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}") + console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}") + + if not Confirm.ask("Edit now?"): + return + + console.print("\n[bold]Configuration Editor[/bold] (leave blank to keep current)") + + d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token) + f_token = Prompt.ask("Fluxer Bot Token", default=self.config.fluxer_bot_token) + d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id) + f_comm = Prompt.ask("Fluxer Community ID", default=self.config.fluxer_community_id) + + # Only rewrite if changed + if (d_token != self.config.discord_bot_token or + f_token != self.config.fluxer_bot_token or + d_server != self.config.discord_server_id or + f_comm != self.config.fluxer_community_id): + + self.config.discord_bot_token = d_token + self.config.fluxer_bot_token = f_token + self.config.discord_server_id = d_server + self.config.fluxer_community_id = f_comm + + save_config(self.config) + # Recreate engine with new config + self.engine = MigrationEngine(self.config) + + # Re-validate + console.print("[yellow]Validating new configuration...[/yellow]") + await self.update_validation_status() + + console.print(f"\nDiscord Bot Token {get_status_str(self.validation_results.get('discord_token', False))}") + console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}") + console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}") + console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}") + + console.print("[bold green]Configuration updated and saved to config.yaml![/bold green]") + else: + console.print("[yellow]No changes made.[/yellow]") + + async def update_validation_status(self): + self.validation_results = await self.engine.validate_all() + self.tokens_valid = all(self.validation_results.values()) + + async def clone_server_template(self): + console.print("\n[yellow]Fetching server structure...[/yellow]") + categories = [] + channels = [] + try: + await self.engine.start_connections() + categories = await self.engine.discord_reader.get_categories() + channels = await self.engine.discord_reader.get_channels() + except Exception as e: + console.print(f"[bold red]Failed to fetch server structure: {e}[/bold red]") + await self.engine.close_connections() + return + + console.print("\n[bold]Server Template Preview:[/bold]") + + # Group channels by category + import discord + channels_by_cat = {} + uncategorized = [] + + for ch in channels: + if ch.category_id: + if ch.category_id not in channels_by_cat: + channels_by_cat[ch.category_id] = [] + channels_by_cat[ch.category_id].append(ch) + else: + uncategorized.append(ch) + + def print_channel(ch): + if isinstance(ch, discord.TextChannel): + color = "cyan" + elif isinstance(ch, discord.VoiceChannel): + color = "green" + elif isinstance(ch, discord.ForumChannel): + color = "magenta" + else: + color = "white" + console.print(f" [{color}]- {ch.name}[/{color}]") + + for cat in categories: + console.print(f"[bold yellow]{cat.name}[/bold yellow]") + cat_channels = channels_by_cat.get(cat.id, []) + for ch in cat_channels: + print_channel(ch) + + if uncategorized: + console.print(f"[bold yellow]Uncategorized[/bold yellow]") + for ch in uncategorized: + print_channel(ch) + + console.print("") + + if not Confirm.ask("Are you sure you want to clone channels and categories?"): + await self.engine.close_connections() + return + + console.print("\n[bold green]Starting Channel Cloning...[/bold green]") + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console + ) as progress: + + channel_task = progress.add_task("[cyan]Copying Channels...", total=100) + + async def update_progress(item_name: str, current: int, total: int): + progress.update(channel_task, total=total, completed=current, description=f"[cyan]Copying Channel: {item_name}") + + self.engine.is_running = True + await self.engine.migrate_channels(progress_callback=update_progress) + + console.print("[bold green]Server Template cloned![/bold green]") + + except Exception as e: + console.print(f"[bold red]Error during channel clone: {str(e)}[/bold red]") + finally: + await self.engine.close_connections() + self.engine.is_running = False + + async def copy_roles(self): + if not Confirm.ask("Are you sure you want to copy roles?"): + return + + console.print("\n[bold green]Starting Role Migration...[/bold green]") + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console + ) as progress: + + role_task = progress.add_task("[cyan]Copying Roles...", total=100) + + async def update_progress(item_name: str, current: int, total: int): + progress.update(role_task, total=total, completed=current, description=f"[cyan]Copying Role: {item_name}") + + await self.engine.start_connections() + self.engine.is_running = True + await self.engine.migrate_roles(progress_callback=update_progress) + + console.print("[bold green]Role migration complete![/bold green]") + + except Exception as e: + console.print(f"[bold red]Error during role migration: {str(e)}[/bold red]") + finally: + await self.engine.close_connections() + self.engine.is_running = False + + async def copy_emojis(self): + if not Confirm.ask("Are you sure you want to copy emojis and stickers?"): + return + + console.print("\n[bold green]Starting Emoji Migration...[/bold green]") + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console + ) as progress: + + emoji_task = progress.add_task("[cyan]Copying Emojis...", total=100) + + async def update_progress(item_name: str, current: int, total: int): + progress.update(emoji_task, total=total, completed=current, description=f"[cyan]Copying Emoji: {item_name}") + + await self.engine.start_connections() + self.engine.is_running = True + await self.engine.migrate_emojis(progress_callback=update_progress) + + console.print("[bold green]Emoji migration complete![/bold green]") + + except Exception as e: + console.print(f"[bold red]Error during emoji migration: {str(e)}[/bold red]") + finally: + await self.engine.close_connections() + self.engine.is_running = False + + async def migrate_message_history(self): + if not Confirm.ask("Are you sure you want to migrate message history?"): + return + + console.print("\n[bold green]Starting Message History Migration...[/bold green]") + try: + await self.engine.start_connections() + # Mock example of passing message progress. + console.print("[cyan]Migrating messages for the first channel (Demo)...[/cyan]") + channels = await self.engine.discord_reader.get_channels() + if channels: + self.engine.is_running = True + await self.engine.migrate_messages(channels[0].id) + console.print("[bold green]Message history migration complete![/bold green]") + except Exception as e: + console.print(f"[bold red]Error during message migration: {str(e)}[/bold red]") + finally: + await self.engine.close_connections() + self.engine.is_running = False + +async def run_cli(): + cli = MigrationCLI() + await cli.run()