This commit is contained in:
rambros 2026-02-20 11:58:50 +05:30
commit 198612d5f0
16 changed files with 912 additions and 0 deletions

37
.gitignore vendored Normal file
View file

@ -0,0 +1,37 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
ENV/
# Configuration and Secrets
config.yaml
!config.example.yaml
# Logs and State
migration.log
state.json
# Temporary Test Scripts
test_*.py

39
README.md Normal file
View file

@ -0,0 +1,39 @@
# Discord Reaper (to Fluxer)
A state-of-the-art highly resumable interactive CLI for cloning a Discord server to a Fluxer community.
## Project Architecture
The architecture separates the generic orchestration UI from the target and source logic.
- `src/discord_bot/reader.py`: Wraps `discord.py` to fetch server items, channels, users, roles, and message histories.
- `src/fluxer_bot/writer.py`: Wraps `fluxer.py` and creates mirrored items in Fluxer.
- `src/core/engine.py`: Manages reading from one end, writing to the other, and keeping state in `state.json`.
- `src/ui/app.py`: The `rich`-based interactive CLI that binds it all together in an elegant terminal app.
## Setup Instructions
1. Install requirements:
```bash
pip install -r requirements.txt
```
2. Configure your properties in `config.yaml` with the correct Discord Bot Token, Fluxer Bot Token, and IDs.
3. Run the application:
```bash
python main.py
```
## Production Suggestions
### 1. Speed & Concurrency
- **Asynchronous Batching**: The current engine reads one message and writes one message. To boost speed significantly without being rate-limited, read history aggressively using `asyncio.gather` for attachments and embeds, but queue write requests.
- **Database Backend**: Migrate `state.json` to `aiosqlite`. A JSON file becomes a bottleneck when state has hundreds of thousands of keys and gets rewritten constantly to disk. SQLite ensures safe concurrent commits.
### 2. Safety & Error Recovery
- **Rate Limit Handlers**: `discord.py` natively respects 429s for Discord. For Fluxer, implement a leaky bucket or backoff wrapper inside `fluxer_bot/writer.py` to avoid 429 HTTP bans. Make sure the TUI reports when a rate-limit sleep occurs.
- **Attachment Caching**: When copying heavy attachments, stream to a temporary local disk file (e.g. `/tmp/discord_reaper/`), upload it to Fluxer, then dynamically delete it. Holding attachments in RAM will cause OOM crashes on large media servers.
- **Resumability Constraints**: Expand `state.py` to store not just channel offsets, but also thread IDs and role mapping hashes, ensuring total state recovery on unexpected exits.
### 3. CLI UX Improvements
- **Interactive Selectors**: Replace the static "Run Migration" with a dynamic prompt menu using `rich` or `questionary` that lists Discord categories and channels. Users should be able to check/uncheck categories they want to exclude from the migration.
- **Color Coding**: Map log levels (INFO, ERROR, WARN) to Rich console colors (green, red, yellow).
- **Graceful Abort**: The tool gracefully handles exit commands, but adding a hotkey listener or `asyncio.Task.cancel()` for immediate stops during hanging requests could enhance responsiveness.

8
config.example.yaml Normal file
View file

@ -0,0 +1,8 @@
discord_bot_token: DISCORD_BOT_TOKEN
fluxer_bot_token: FLUXER_BOT_TOKEN
discord_server_id: 'DISCORD_SERVER_ID'
fluxer_community_id: 'FLUXER_COMMUNITY_ID'
migration:
batch_size: 50
rate_limit_delay_seconds: 2
log_level: INFO

38
main.py Normal file
View file

@ -0,0 +1,38 @@
import sys
import asyncio
import logging
from src.ui.app import run_cli
from src.config import load_config
def setup_logging():
try:
config = load_config()
log_level_str = config.migration.log_level.upper()
level = getattr(logging, log_level_str, logging.INFO)
except Exception:
level = logging.INFO
handlers = [logging.FileHandler('migration.log', mode='a')]
if level == logging.DEBUG:
handlers.append(logging.StreamHandler(sys.stdout))
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=level,
handlers=handlers
)
def main():
setup_logging()
try:
asyncio.run(run_cli())
except KeyboardInterrupt:
print("\nOperation interrupted by user.")
sys.exit(0)
except Exception as e:
print(f"Failed to start tool: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View file

@ -0,0 +1,5 @@
discord.py>=2.3.2
git+https://github.com/akarealemil/fluxer.py
rich>=13.7.0
PyYAML>=6.0.1
pydantic>=2.5.3 # Good for configuration validation and mapping

0
src/__init__.py Normal file
View file

35
src/config.py Normal file
View file

@ -0,0 +1,35 @@
import yaml
from pathlib import Path
from pydantic import BaseModel, Field
class MigrationSettings(BaseModel):
batch_size: int = Field(default=50)
rate_limit_delay_seconds: int = Field(default=2)
log_level: str = Field(default="INFO")
class AppConfig(BaseModel):
discord_bot_token: str
fluxer_bot_token: str
discord_server_id: str
fluxer_community_id: str
migration: MigrationSettings = Field(default_factory=MigrationSettings)
def load_config(config_path: str | Path = "config.yaml") -> AppConfig:
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not data:
raise ValueError("Configuration file is empty or invalid YAML.")
return AppConfig(**data)
def save_config(config: AppConfig, config_path: str | Path = "config.yaml"):
path = Path(config_path)
# Dump model to dictionary
data = config.model_dump()
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f, default_flow_style=False, sort_keys=False)

0
src/core/__init__.py Normal file
View file

197
src/core/engine.py Normal file
View file

@ -0,0 +1,197 @@
import asyncio
import logging
from typing import Callable, Awaitable
from src.config import AppConfig
from src.core.state import MigrationState
from src.discord_bot.reader import DiscordReader
from src.fluxer_bot.writer import FluxerWriter
logger = logging.getLogger(__name__)
class MigrationEngine:
"""Orchestrates reading from Discord and writing to Fluxer."""
def __init__(self, config: AppConfig):
self.config = config
self.state = MigrationState()
self.discord_reader = DiscordReader(
token=config.discord_bot_token,
server_id=config.discord_server_id
)
self.fluxer_writer = FluxerWriter(
token=config.fluxer_bot_token,
community_id=config.fluxer_community_id
)
self.is_running = False
async def validate_all(self) -> Dict[str, bool]:
"""Returns True if both connections are valid."""
try:
# Note: in real scenarios we might want concurrent validation
d_valid = await self.discord_reader.validate()
f_valid = await self.fluxer_writer.validate()
return {
"discord_token": d_valid.get("token", False),
"discord_server": d_valid.get("server", False),
"fluxer_token": f_valid.get("token", False),
"fluxer_community": f_valid.get("community", False)
}
except Exception as e:
logger.error(f"Validation failed with exception: {e}")
return {
"discord_token": False,
"discord_server": False,
"fluxer_token": False,
"fluxer_community": False
}
async def start_connections(self):
await self.discord_reader.start()
await self.fluxer_writer.start()
async def close_connections(self):
await self.discord_reader.close()
await self.fluxer_writer.close()
async def migrate_channels(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None):
"""Clones categories and text channels."""
categories = await self.discord_reader.get_categories()
channels = await self.discord_reader.get_channels()
total = len(categories) + len(channels)
current_idx = 0
# Migrate Categories first
for cat in categories:
if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(str(cat.id))
if not fluxer_id:
# 4 corresponds to Category type in Discord/Fluxer typically
fluxer_id = await self.fluxer_writer.create_channel(cat.name, type=4)
self.state.set_channel_mapping(str(cat.id), fluxer_id)
current_idx += 1
if progress_callback: await progress_callback(f"Cat: {cat.name}", current_idx, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
# Migrate Text Channels
for channel in channels:
if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(str(channel.id))
if not fluxer_id:
topic = channel.topic if channel.topic else ""
parent_id = self.state.get_fluxer_channel_id(str(channel.category_id)) if channel.category_id else None
fluxer_id = await self.fluxer_writer.create_channel(
name=channel.name,
topic=topic,
type=0,
parent_id=parent_id
)
self.state.set_channel_mapping(str(channel.id), fluxer_id)
current_idx += 1
if progress_callback: await progress_callback(channel.name, current_idx, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def migrate_messages(self, channel_id: int):
"""Migrate messages for a specific channel."""
fluxer_channel_id = self.state.get_fluxer_channel_id(str(channel_id))
if not fluxer_channel_id:
logger.error(f"Cannot migrate messages: channel {channel_id} not mapped.")
return
message_count = 0
async for msg in self.discord_reader.fetch_message_history(channel_id):
if not self.is_running:
break
# Process attachments
files = []
for att in msg.attachments:
att_data = await self.discord_reader.download_attachment(att)
files.append({"filename": att.filename, "data": att_data})
await self.fluxer_writer.send_message(
channel_id=fluxer_channel_id,
author_name=msg.author.name,
content=msg.content,
timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"),
files=files if files else None
)
self.state.update_last_message_timestamp(str(channel_id), str(msg.created_at))
message_count += 1
# Delay for rate limit safety
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
return message_count
async def migrate_roles(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None):
"""Copies roles and their baseline permissions."""
roles = await self.discord_reader.get_roles()
total = len(roles)
for idx, role in enumerate(roles):
if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(f"role_{role.id}") # reusing mapping method
if not fluxer_id:
fluxer_id = await self.fluxer_writer.create_role(
name=role.name,
color=role.color.value,
hoist=role.hoist,
mentionable=role.mentionable
)
if fluxer_id:
self.state.set_channel_mapping(f"role_{role.id}", fluxer_id)
if progress_callback: await progress_callback(role.name, idx + 1, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def migrate_emojis(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None):
"""Copies custom emojis and stickers."""
emojis = await self.discord_reader.get_emojis()
total = len(emojis)
for idx, emoji in enumerate(emojis):
if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(f"emoji_{emoji.id}")
if not fluxer_id:
try:
img_data = await self.discord_reader.download_emoji(emoji)
fluxer_id = await self.fluxer_writer.create_emoji(
name=emoji.name,
image_bytes=img_data
)
if fluxer_id:
self.state.set_channel_mapping(f"emoji_{emoji.id}", fluxer_id)
except Exception as e:
logger.error(f"Error downloading/uploading emoji {emoji.name}: {e}")
if progress_callback: await progress_callback(emoji.name, idx + 1, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def run_full_migration(self):
self.is_running = True
try:
await self.start_connections()
await self.migrate_channels()
# Example: just migrate one channel's messages to test
channels = await self.discord_reader.get_channels()
if channels:
await self.migrate_messages(channels[0].id)
finally:
await self.close_connections()
self.is_running = False
def stop(self):
self.is_running = False

48
src/core/state.py Normal file
View file

@ -0,0 +1,48 @@
import json
from pathlib import Path
from typing import Dict, Any
class MigrationState:
"""Manages persistence of the migration state to allow resumability."""
def __init__(self, state_file: str | Path = "state.json"):
self.state_file = Path(state_file)
# mappings: discord_id -> fluxer_id
self.channel_map: Dict[str, str] = {}
self.role_map: Dict[str, str] = {}
self.user_map: Dict[str, str] = {}
# tracking last message timestamp per channel to resume
self.last_message_timestamps: Dict[str, str] = {}
self.load()
def load(self):
if self.state_file.exists():
with open(self.state_file, "r", encoding="utf-8") as f:
data = json.load(f)
self.channel_map = data.get("channels", {})
self.role_map = data.get("roles", {})
self.user_map = data.get("users", {})
self.last_message_timestamps = data.get("last_message_timestamps", {})
def save(self):
data = {
"channels": self.channel_map,
"roles": self.role_map,
"users": self.user_map,
"last_message_timestamps": self.last_message_timestamps
}
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
def set_channel_mapping(self, discord_id: str, fluxer_id: str):
self.channel_map[str(discord_id)] = str(fluxer_id)
self.save()
def get_fluxer_channel_id(self, discord_id: str) -> str | None:
return self.channel_map.get(str(discord_id))
def update_last_message_timestamp(self, channel_id: str, timestamp: str):
self.last_message_timestamps[str(channel_id)] = timestamp
self.save()

View file

102
src/discord_bot/reader.py Normal file
View file

@ -0,0 +1,102 @@
import discord
from typing import AsyncGenerator, Dict, Any
class DiscordReader:
def __init__(self, token: str, server_id: str):
self.token = token
self.server_id = int(server_id)
self.guild: discord.Guild | None = None
self.client: discord.Client | None = None
def _create_client(self):
intents = discord.Intents.default()
intents.members = True
intents.message_content = True
intents.guilds = True
return discord.Client(intents=intents)
async def start(self):
"""Starts the Discord client to fetch metadata."""
if not self.client or self.client.is_closed():
self.client = self._create_client()
await self.client.login(self.token)
# In a real app we'd wait until ready, here we simulate fetching the guild
self.guild = await self.client.fetch_guild(self.server_id)
async def validate(self) -> Dict[str, bool]:
"""Validates the token and server ID."""
results = {"token": False, "server": False}
temp_client = self._create_client()
try:
await temp_client.login(self.token)
results["token"] = True
guild = await temp_client.fetch_guild(self.server_id)
if guild is not None:
results["server"] = True
except Exception:
pass
finally:
if not temp_client.is_closed():
await temp_client.close()
return results
async def get_server_metadata(self) -> Dict[str, Any]:
"""Returns name, icon, and other metadata."""
if not self.guild:
return {}
return {
"name": self.guild.name,
"id": str(self.guild.id),
"icon_url": self.guild.icon.url if self.guild.icon else None
}
async def get_categories(self):
if not self.guild:
return []
categories = await self.guild.fetch_channels()
return [c for c in categories if isinstance(c, discord.CategoryChannel)]
async def get_roles(self):
"""Returns all roles in the server (excluding @everyone)."""
if not self.guild:
return []
roles = await self.guild.fetch_roles()
# Filter out default @everyone role which cannot typically be created
return [r for r in roles if not r.is_default()]
async def get_emojis(self):
"""Returns all custom emojis in the server."""
if not self.guild:
return []
return await self.guild.fetch_emojis()
async def get_channels(self, category_id: int | None = None):
"""Yields all non-category channels."""
if not self.guild:
return []
channels = await self.guild.fetch_channels()
all_channels = [c for c in channels if not isinstance(c, discord.CategoryChannel)]
if category_id:
all_channels = [c for c in all_channels if c.category_id == category_id]
return all_channels
async def fetch_message_history(self, channel_id: int, limit: int = None) -> AsyncGenerator[discord.Message, None]:
"""Yields messages from a given channel, optionally handling pagination."""
channel = await self.client.fetch_channel(channel_id)
if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread):
# To avoid exploding RAM, we yield items one by one
async for message in channel.history(limit=limit, oldest_first=True):
yield message
async def download_emoji(self, emoji: discord.Emoji) -> bytes:
"""Downloads a Discord emoji into memory."""
return await emoji.read()
async def download_attachment(self, attachment: discord.Attachment) -> bytes:
"""Downloads a Discord attachment into memory."""
return await attachment.read()
async def close(self):
await self.client.close()

View file

121
src/fluxer_bot/writer.py Normal file
View file

@ -0,0 +1,121 @@
# Supposing fluxer.py has an API similar to discord.py or requests based
# Since we don't have the exact library reference, we create a conceptual skeleton.
from typing import Optional, List, Dict, Any
from fluxer.http import HTTPClient
class FluxerWriter:
def __init__(self, token: str, community_id: str):
self.token = token
self.community_id = str(community_id)
self.client: Optional[HTTPClient] = None
async def start(self):
"""Authenticate with Fluxer."""
self.client = HTTPClient(token=self.token, is_bot=True)
async def validate(self) -> dict:
"""Validates the token and community ID."""
if not self.client:
await self.start()
is_token_valid = False
is_community_valid = False
try:
# Check token by fetching me
await self.client.get_current_user()
is_token_valid = True
# Check community
guild = await self.client.get_guild(self.community_id)
if guild:
is_community_valid = True
except Exception:
pass
return {
"token": is_token_valid,
"community": is_community_valid
}
async def create_channel(self, name: str, topic: str = "", type: int = 0, parent_id: Optional[str] = None) -> str:
"""
Creates a new channel in the target Fluxer community.
Returns the new Fluxer channel ID.
"""
assert self.client is not None
payload = {
"name": name,
"type": type,
}
if topic:
payload["topic"] = topic
if parent_id:
payload["parent_id"] = parent_id
guild_channel = await self.client.request(
self.client._route("POST", "/guilds/{guild_id}/channels", guild_id=self.community_id),
json=payload
)
return str(guild_channel["id"])
async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, files: Optional[List[Dict[str, Any]]] = None) -> None:
"""
Sends a message to the target channel.
"""
assert self.client is not None
prefix = f"**[{timestamp}] {author_name}**:\n"
final_content = prefix + content if content else prefix
try:
await self.client.send_message(
channel_id=channel_id,
content=final_content,
files=files
)
except Exception as e:
# Handle empty messages if an attachment is the only content
print(f"Failed to copy message: {e}")
async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str:
"""
Creates a new role in the Fluxer community.
Returns the new Fluxer role ID.
"""
assert self.client is not None
try:
role = await self.client.create_guild_role(
guild_id=self.community_id,
name=name,
color=color,
hoist=hoist,
mentionable=mentionable
)
return str(role["id"])
except Exception as e:
print(f"Failed to copy role {name}: {e}")
return ""
async def create_emoji(self, name: str, image_bytes: bytes) -> str:
"""
Creates a custom emoji in the Fluxer community.
"""
assert self.client is not None
try:
emoji = await self.client.create_guild_emoji(
guild_id=self.community_id,
name=name,
image=image_bytes
)
return str(emoji["id"])
except Exception as e:
print(f"Failed to copy emoji {name}: {e}")
return ""
async def close(self):
"""Cleanly close connection."""
if self.client:
await self.client.close()

0
src/ui/__init__.py Normal file
View file

282
src/ui/app.py Normal file
View file

@ -0,0 +1,282 @@
import sys
import asyncio
from rich.console import Console
from rich.prompt import Prompt, Confirm
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from src.config import load_config, save_config
from src.core.engine import MigrationEngine
console = Console()
class MigrationCLI:
"""Standard CLI app to manage the Discord to Fluxer migration."""
def __init__(self):
try:
self.config = load_config()
except Exception as e:
console.print(f"[bold red]Failed to load config: {e}[/bold red]")
sys.exit(1)
self.engine = MigrationEngine(self.config)
self.progress_callback_task = None
self.tokens_valid = False
async def validate_config(self):
with console.status("[yellow]Validating tokens...[/yellow]"):
self.validation_results = await self.engine.validate_all()
self.tokens_valid = all(self.validation_results.values())
async def run(self):
console.print(Panel.fit("Discord Reaper", style="bold blue"))
await self.validate_config()
while True:
console.print("\n[bold]Main Menu[/bold]")
console.print("(1) Clone Server Template (Channels & Categories)")
console.print("(2) Copy Roles & Permissions")
console.print("(3) Copy Emojis & Stickers")
console.print("(4) Migrate message history")
val_status = "[bold green][VALID][/bold green]" if self.tokens_valid else "[bold red][INVALID][/bold red]"
console.print(f"(5) Configuration {val_status}")
console.print("(Q) Exit")
choice = Prompt.ask("Select an option", choices=["1", "2", "3", "4", "5", "Q", "q"], default="1").upper()
if choice == "1":
await self.clone_server_template()
elif choice == "2":
await self.copy_roles()
elif choice == "3":
await self.copy_emojis()
elif choice == "4":
await self.migrate_message_history()
elif choice == "5":
await self.edit_configuration()
elif choice == "Q":
console.print("[yellow]Exiting tool...[/yellow]")
break
async def edit_configuration(self):
console.print("\n[bold]Configuration Status:[/bold]")
def get_status_str(is_valid):
return "[bold green][VALID][/bold green]" if is_valid else "[bold red][INVALID][/bold red]"
console.print(f"Discord Bot Token {get_status_str(self.validation_results.get('discord_token', False))}")
console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}")
console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}")
console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}")
if not Confirm.ask("Edit now?"):
return
console.print("\n[bold]Configuration Editor[/bold] (leave blank to keep current)")
d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token)
f_token = Prompt.ask("Fluxer Bot Token", default=self.config.fluxer_bot_token)
d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id)
f_comm = Prompt.ask("Fluxer Community ID", default=self.config.fluxer_community_id)
# Only rewrite if changed
if (d_token != self.config.discord_bot_token or
f_token != self.config.fluxer_bot_token or
d_server != self.config.discord_server_id or
f_comm != self.config.fluxer_community_id):
self.config.discord_bot_token = d_token
self.config.fluxer_bot_token = f_token
self.config.discord_server_id = d_server
self.config.fluxer_community_id = f_comm
save_config(self.config)
# Recreate engine with new config
self.engine = MigrationEngine(self.config)
# Re-validate
console.print("[yellow]Validating new configuration...[/yellow]")
await self.update_validation_status()
console.print(f"\nDiscord Bot Token {get_status_str(self.validation_results.get('discord_token', False))}")
console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}")
console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}")
console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}")
console.print("[bold green]Configuration updated and saved to config.yaml![/bold green]")
else:
console.print("[yellow]No changes made.[/yellow]")
async def update_validation_status(self):
self.validation_results = await self.engine.validate_all()
self.tokens_valid = all(self.validation_results.values())
async def clone_server_template(self):
console.print("\n[yellow]Fetching server structure...[/yellow]")
categories = []
channels = []
try:
await self.engine.start_connections()
categories = await self.engine.discord_reader.get_categories()
channels = await self.engine.discord_reader.get_channels()
except Exception as e:
console.print(f"[bold red]Failed to fetch server structure: {e}[/bold red]")
await self.engine.close_connections()
return
console.print("\n[bold]Server Template Preview:[/bold]")
# Group channels by category
import discord
channels_by_cat = {}
uncategorized = []
for ch in channels:
if ch.category_id:
if ch.category_id not in channels_by_cat:
channels_by_cat[ch.category_id] = []
channels_by_cat[ch.category_id].append(ch)
else:
uncategorized.append(ch)
def print_channel(ch):
if isinstance(ch, discord.TextChannel):
color = "cyan"
elif isinstance(ch, discord.VoiceChannel):
color = "green"
elif isinstance(ch, discord.ForumChannel):
color = "magenta"
else:
color = "white"
console.print(f" [{color}]- {ch.name}[/{color}]")
for cat in categories:
console.print(f"[bold yellow]{cat.name}[/bold yellow]")
cat_channels = channels_by_cat.get(cat.id, [])
for ch in cat_channels:
print_channel(ch)
if uncategorized:
console.print(f"[bold yellow]Uncategorized[/bold yellow]")
for ch in uncategorized:
print_channel(ch)
console.print("")
if not Confirm.ask("Are you sure you want to clone channels and categories?"):
await self.engine.close_connections()
return
console.print("\n[bold green]Starting Channel Cloning...[/bold green]")
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
channel_task = progress.add_task("[cyan]Copying Channels...", total=100)
async def update_progress(item_name: str, current: int, total: int):
progress.update(channel_task, total=total, completed=current, description=f"[cyan]Copying Channel: {item_name}")
self.engine.is_running = True
await self.engine.migrate_channels(progress_callback=update_progress)
console.print("[bold green]Server Template cloned![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during channel clone: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def copy_roles(self):
if not Confirm.ask("Are you sure you want to copy roles?"):
return
console.print("\n[bold green]Starting Role Migration...[/bold green]")
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
role_task = progress.add_task("[cyan]Copying Roles...", total=100)
async def update_progress(item_name: str, current: int, total: int):
progress.update(role_task, total=total, completed=current, description=f"[cyan]Copying Role: {item_name}")
await self.engine.start_connections()
self.engine.is_running = True
await self.engine.migrate_roles(progress_callback=update_progress)
console.print("[bold green]Role migration complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during role migration: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def copy_emojis(self):
if not Confirm.ask("Are you sure you want to copy emojis and stickers?"):
return
console.print("\n[bold green]Starting Emoji Migration...[/bold green]")
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
emoji_task = progress.add_task("[cyan]Copying Emojis...", total=100)
async def update_progress(item_name: str, current: int, total: int):
progress.update(emoji_task, total=total, completed=current, description=f"[cyan]Copying Emoji: {item_name}")
await self.engine.start_connections()
self.engine.is_running = True
await self.engine.migrate_emojis(progress_callback=update_progress)
console.print("[bold green]Emoji migration complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during emoji migration: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def migrate_message_history(self):
if not Confirm.ask("Are you sure you want to migrate message history?"):
return
console.print("\n[bold green]Starting Message History Migration...[/bold green]")
try:
await self.engine.start_connections()
# Mock example of passing message progress.
console.print("[cyan]Migrating messages for the first channel (Demo)...[/cyan]")
channels = await self.engine.discord_reader.get_channels()
if channels:
self.engine.is_running = True
await self.engine.migrate_messages(channels[0].id)
console.print("[bold green]Message history migration complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during message migration: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def run_cli():
cli = MigrationCLI()
await cli.run()