disco-reaper/src/ui/app.py
2026-02-22 17:46:43 +05:30

1085 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import asyncio
import logging
import re
from rich.console import Console
from rich.prompt import Prompt, Confirm
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from src.config import load_config, save_config
from src.core.engine import MigrationEngine
class RateLimitHandler(logging.Handler):
"""Intersects library logs to print clean rate limit messages."""
def __init__(self):
super().__init__()
self._last_print = ""
def emit(self, record):
try:
msg = record.getMessage()
# Detect rate limit messages from discord.py or fluxer.py
if "retry" in msg.lower() and ("rate limit" in msg.lower() or "429" in msg):
# Extract seconds using regex: supports "retry in 5.50s" and "Retrying in 5.50 seconds"
match = re.search(r"in ([\d.]+)\s*(?:seconds?|s)", msg, re.IGNORECASE)
if match:
seconds = match.group(1)
platform = "discord" if "discord" in record.name.lower() else "fluxer"
# Format the message
new_msg = f"{platform} API rate limit: will retry after {seconds}"
# Avoid spamming the exact same message if nothing changed
if new_msg == self._last_print:
return
self._last_print = new_msg
# Use rich console to print on the same line.
# end="\r" works with rich's internal live-update handling.
# We add some padding to clear old text.
console.print(f"{new_msg} ", end="\r")
except Exception:
pass
console = Console()
class MigrationCLI:
"""Standard CLI app to manage the Discord to Fluxer migration."""
def __init__(self):
try:
self.config = load_config()
except Exception as e:
console.print(f"[bold red]Failed to load config: {e}[/bold red]")
sys.exit(1)
self.engine = MigrationEngine(self.config)
self.progress_callback_task = None
self.tokens_valid = False
# Register rate limit interceptor for both libraries
rl_handler = RateLimitHandler()
logging.getLogger("discord").addHandler(rl_handler)
logging.getLogger("fluxer").addHandler(rl_handler)
async def validate_config(self):
self.validation_results = {
"discord_token": False, "discord_bot_name": None,
"discord_server": False, "discord_server_name": None,
"discord_intents": {}, "discord_permissions": {},
"fluxer_token": False, "fluxer_bot_name": None,
"fluxer_community": False, "fluxer_community_name": None,
"fluxer_permissions": {}
}
self.tokens_valid = False
discord_task = asyncio.create_task(self.engine.discord_reader.validate())
fluxer_task = asyncio.create_task(self.engine.fluxer_writer.validate())
try:
with console.status("[yellow]Validating tokens...[/yellow]"):
start_time = asyncio.get_event_loop().time()
done, pending = await asyncio.wait(
[discord_task, fluxer_task],
timeout=10.0,
return_when=asyncio.ALL_COMPLETED
)
# Process Discord Result
if discord_task in done:
try:
res = discord_task.result()
self.validation_results["discord_token"] = res.get("token", False)
self.validation_results["discord_bot_name"] = res.get("bot_name")
self.validation_results["discord_server"] = res.get("server", False)
self.validation_results["discord_server_name"] = res.get("server_name")
if not res.get("token"):
console.print("[bold red]Discord Token validation failed (Invalid Token).[/bold red]")
elif not res.get("server"):
console.print("[bold red]Discord Server ID validation failed (Invalid/Inaccessible ID).[/bold red]")
else:
# Store intents & permissions for later UI display
self.validation_results["discord_intents"] = res.get("intents", {})
self.validation_results["discord_permissions"] = res.get("permissions", {})
except Exception as e:
console.print(f"[bold red]Discord validation failed with error: {e}[/bold red]")
else:
console.print("[bold red]Discord bot token validation timed out after 10 seconds.[/bold red]")
discord_task.cancel()
# Process Fluxer Result
if fluxer_task in done:
try:
res = fluxer_task.result()
self.validation_results["fluxer_token"] = res.get("token", False)
self.validation_results["fluxer_bot_name"] = res.get("bot_name")
self.validation_results["fluxer_community"] = res.get("community", False)
self.validation_results["fluxer_community_name"] = res.get("community_name")
if not res.get("token"):
console.print("[bold red]Fluxer Token validation failed (Invalid Token).[/bold red]")
elif not res.get("community"):
console.print("[bold red]Fluxer Community ID validation failed (Invalid/Inaccessible ID).[/bold red]")
else:
# Store permissions
self.validation_results["fluxer_permissions"] = res.get("permissions", {})
except Exception as e:
console.print(f"[bold red]Fluxer validation failed with error: {e}[/bold red]")
else:
console.print("[bold red]Fluxer bot token validation timed out after 10 seconds.[/bold red]")
fluxer_task.cancel()
# Only tokens and server/community existence are strictly required for 'tokens_valid'
self.tokens_valid = (
self.validation_results.get("discord_token") and
self.validation_results.get("discord_server") and
self.validation_results.get("fluxer_token") and
self.validation_results.get("fluxer_community")
)
# Check if all permissions are actually granted
self.permissions_complete = True
if self.tokens_valid:
# Discord
d_intents = self.validation_results.get("discord_intents", {})
d_perms = self.validation_results.get("discord_permissions", {})
if not all([d_intents.get("message_content"), d_perms.get("view_channel"), d_perms.get("read_message_history")]):
self.permissions_complete = False
# Fluxer
f_perms = self.validation_results.get("fluxer_permissions", {})
if not all(f_perms.values()) if f_perms else True:
self.permissions_complete = False
except Exception as e:
console.print(f"[bold red]Validation system failure: {e}[/bold red]")
finally:
# Ensure tasks are cleaned up
for t in [discord_task, fluxer_task]:
if not t.done(): t.cancel()
async def run(self):
await self.validate_config()
while True:
console.print("")
console.print(Panel.fit("Fluxer Reaper", style="bold blue"))
d_name = self.validation_results.get("discord_server_name")
d_display = f"[bold green]\"{d_name}\"[/bold green]" if d_name else "[bold red]NOT SET UP[/bold red]"
f_name = self.validation_results.get("fluxer_community_name")
f_display = f"[bold green]\"{f_name}\"[/bold green]" if f_name else "[bold red]NOT SET UP[/bold red]"
console.print(f"[bold cyan]Discord Server:[/bold cyan] {d_display}")
console.print(f"[bold #4641D9]Fluxer Community:[/bold #4641D9] {f_display}")
console.print("[bold]Main Menu[/bold]")
console.print("(1) Clone Server Template (Channels & Categories)")
console.print("(2) Copy Roles & Permissions")
console.print("(3) Copy Emojis & Stickers")
console.print("(4) Sync Server Name, Logo and Banner")
console.print("(5) Migrate message history")
if not self.tokens_valid:
val_status = "[bold red][INVALID][/bold red]"
elif not self.permissions_complete:
val_status = "[bold yellow][PERMISSION MISSING][/bold yellow]"
else:
val_status = "[bold green][VALID][/bold green]"
console.print(f"(6) Configuration {val_status}")
console.print("(7) [bold red]⚠ Danger Zone[/bold red]")
console.print("(Q) Exit")
choice = Prompt.ask("\nSelect an option", choices=["1", "2", "3", "4", "5", "6", "7", "Q", "q"], default="Q").upper()
if choice == "1":
await self.clone_server_template()
elif choice == "2":
await self.copy_roles()
elif choice == "3":
await self.copy_emojis()
elif choice == "4":
await self.sync_server_metadata()
elif choice == "5":
await self.migrate_message_history()
elif choice == "6":
await self.edit_configuration()
elif choice == "7":
await self.danger_zone()
elif choice == "Q":
console.print("[yellow]Exiting tool...[/yellow]")
await self.engine.close_connections()
break
async def edit_configuration(self):
await self.validate_config()
# Display Required Permissions FIRST
def fmt(name, val):
if val is None: return f"[dim]{name}[/dim]" # Unknown
return f"[green]{name}[/green]" if val else f"[red]{name} [MISSING][/red]"
d_intents = self.validation_results.get("discord_intents", {})
d_perms = self.validation_results.get("discord_permissions", {})
f_perms = self.validation_results.get("fluxer_permissions", {})
perm_table = Table(show_header=True, header_style="bold magenta", box=None, padding=(0, 2))
perm_table.add_column("Bot Type")
perm_table.add_column("Required Permissions & Intents")
perm_table.add_row(
"[bold #5865F2]Discord Bot[/bold #5865F2]",
f"• [bold]Intents:[/bold] Server Members, {fmt('Message Content', d_intents.get('message_content'))}\n"
f"• [bold]Permissions:[/bold] {fmt('View Channels', d_perms.get('view_channel'))}, {fmt('Read Message History', d_perms.get('read_message_history'))}"
)
perm_table.add_row(
"[bold #4641D9]Fluxer Bot[/bold #4641D9]",
f"• [bold]Permissions:[/bold] {fmt('Manage Channels', f_perms.get('manage_channels'))}, {fmt('Manage Messages', f_perms.get('manage_messages'))},\n"
f" {fmt('Manage Roles', f_perms.get('manage_roles'))}, {fmt('Manage Emojis/Stickers', f_perms.get('manage_emojis_stickers'))}, {fmt('Manage Webhooks', f_perms.get('manage_webhooks'))}"
)
console.print("\n") # Add spacing before panel
console.print(Panel(perm_table, title="[bold]Required Bot Permissions[/bold]", expand=False, border_style="dim"))
console.print("\n[bold]Configuration Status:[/bold]")
def get_status_str(is_valid, name=None):
status = "[bold green][VALID][/bold green]" if is_valid else "[bold red][INVALID][/bold red]"
if is_valid and name:
return f"{status} \"{name}\""
return status
console.print(f"Discord Bot Token {get_status_str(self.validation_results.get('discord_token', False), self.validation_results.get('discord_bot_name'))}")
console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False), self.validation_results.get('fluxer_bot_name'))}")
console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False), self.validation_results.get('discord_server_name'))}")
console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False), self.validation_results.get('fluxer_community_name'))}")
if not Confirm.ask("\nEdit now?"):
return
console.print("\n[bold]Configuration Editor[/bold] (leave blank to keep current)")
d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token)
f_token = Prompt.ask("Fluxer Bot Token", default=self.config.fluxer_bot_token)
d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id)
f_comm = Prompt.ask("Fluxer Community ID", default=self.config.fluxer_community_id)
# Only rewrite if changed
if (d_token != self.config.discord_bot_token or
f_token != self.config.fluxer_bot_token or
d_server != self.config.discord_server_id or
f_comm != self.config.fluxer_community_id):
self.config.discord_bot_token = d_token
self.config.fluxer_bot_token = f_token
self.config.discord_server_id = d_server
self.config.fluxer_community_id = f_comm
save_config(self.config)
# Recreate engine with new config
self.engine = MigrationEngine(self.config)
# Re-validate
console.print("[yellow]Validating new configuration...[/yellow]")
await self.update_validation_status()
console.print(f"\nDiscord Bot Token {get_status_str(self.validation_results.get('discord_token', False))}")
console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}")
console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}")
console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}")
console.print("[bold green]Configuration updated and saved to config.yaml![/bold green]")
else:
console.print("[yellow]No changes made.[/yellow]")
async def update_validation_status(self):
await self.validate_config()
async def clone_server_template(self):
console.print("\n[yellow]Fetching server structure...[/yellow]")
categories = []
channels = []
try:
await self.engine.start_connections()
with console.status("[yellow]Syncing Fluxer channel state...[/yellow]"):
await self.engine.sync_channel_state()
categories = await self.engine.discord_reader.get_categories()
channels = await self.engine.discord_reader.get_channels()
except Exception as e:
console.print(f"[bold red]Failed to fetch server structure: {e}[/bold red]")
await self.engine.close_connections()
return
table = Table(show_header=True, header_style="bold #4641D9")
table.add_column("Type", width=12)
table.add_column("Discord Name")
table.add_column("Status", justify="right")
# Group channels by category for status check
missing_by_cat = {}
missing_uncategorized = []
for ch in channels:
cat_id = str(ch.category_id) if ch.category_id else None
if not self.engine.state.get_fluxer_channel_id(str(ch.id)):
if cat_id:
if cat_id not in missing_by_cat: missing_by_cat[cat_id] = []
missing_by_cat[cat_id].append(ch)
else:
missing_uncategorized.append(ch)
# Build the unified preview table
for cat in categories:
cat_id_str = str(cat.id)
fluxer_cat_id = self.engine.state.get_fluxer_category_id(cat_id_str)
status = f"[cyan]{fluxer_cat_id}[/cyan]" if fluxer_cat_id else "[bold red]Missing[/bold red]"
table.add_row("[bold yellow]Category[/bold yellow]", f"[bold yellow]{cat.name}[/bold yellow]", status)
# Show ALL channels under this category
cat_channels = [ch for ch in channels if str(ch.category_id) == cat_id_str]
for ch in cat_channels:
fluxer_ch_id = self.engine.state.get_fluxer_channel_id(str(ch.id))
ch_status = f"[cyan]{fluxer_ch_id}[/cyan]" if fluxer_ch_id else "[red]Missing[/red]"
table.add_row(" Channel", ch.name, ch_status)
# Show Uncategorized
uncategorized = [ch for ch in channels if not ch.category_id]
if uncategorized:
table.add_row("Uncategorized", "", "")
for ch in uncategorized:
fluxer_ch_id = self.engine.state.get_fluxer_channel_id(str(ch.id))
ch_status = f"[cyan]{fluxer_ch_id}[/cyan]" if fluxer_ch_id else "[red]Missing[/red]"
table.add_row(" Channel", ch.name, ch_status)
console.print(table)
console.print("")
# Check for existing mappings to determine if we should suggest a force re-copy
cached_count = sum(1 for cat in categories if self.engine.state.get_fluxer_category_id(str(cat.id)))
cached_count += sum(1 for ch in channels if self.engine.state.get_fluxer_channel_id(str(ch.id)))
all_ids_len = len(categories) + len(channels)
force = False
if cached_count > 0:
console.print(f"[yellow]\u26a0 {cached_count}/{all_ids_len} item(s) already in state.json cache.[/yellow]")
# End of table consolidated section, back to standard flow logic.
console.print("[bold green](Y) Continue with only missing items[/bold green]")
console.print("[bold red](F) Force re-clone, creates duplicate channels![/bold red]")
console.print("[bold yellow](B) Back[/bold yellow]")
choice = Prompt.ask("Select an option", choices=["Y", "F", "B"], default="Y").upper()
if choice == "B":
await self.engine.close_connections()
return
elif choice == "F":
force = True
# if 'Y', force remains False and we continue
else:
if not Confirm.ask("Clone channels and categories?", default=True):
await self.engine.close_connections()
return
if not Confirm.ask("Are you sure?", default=True):
await self.engine.close_connections()
return
console.print("\n[bold green]Starting Channel Cloning...[/bold green]")
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
channel_task = progress.add_task("[cyan]Copying Channels...", total=100)
async def update_progress(item_name: str, status: str, current: int, total: int):
color = "cyan" if status == "Copying" else "yellow"
progress.update(channel_task, total=total, completed=current, description=f"[{color}]{status} Channel: {item_name}")
self.engine.is_running = True
await self.engine.migrate_channels(progress_callback=update_progress, force=force)
console.print("[bold green]Server Template cloned![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during channel clone: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def copy_roles(self):
console.print("\n[bold]Role & Permission Options[/bold]")
console.print("(1) Clone Roles & Role Permissions")
console.print("(2) Sync Category Permissions & Channel Permissions")
console.print("(B) Back")
choice = Prompt.ask("Select an option", choices=["1", "2", "B", "b"], default="B").upper()
if choice == "B":
return
if choice == "1":
try:
await self.engine.start_connections()
with console.status("[yellow]Checking Fluxer for existing roles...[/yellow]"):
await self.engine.sync_roles_state()
roles = await self.engine.discord_reader.get_roles()
table = Table(show_header=True, header_style="bold #4641D9")
table.add_column("Discord Role")
table.add_column("Status", justify="center")
table.add_column("Fluxer ID", justify="right")
cached_count = 0
for r in roles:
fluxer_id = self.engine.state.get_fluxer_role_id(str(r.id))
status = "[bold green]NEW[/bold green]"
fid_str = "[dim]N/A[/dim]"
if fluxer_id:
status = "[dim]already copied[/dim]"
fid_str = f"[cyan]{fluxer_id}[/cyan]"
cached_count += 1
table.add_row(r.name, status, fid_str)
console.print(table)
force = False
if cached_count > 0:
console.print(f"\n[yellow]{cached_count} role(s) already in state cache.[/yellow]")
console.print("[bold green](Y) Sync missing roles only[/bold green]")
console.print("[bold red](F) Force Overwrite[/bold red]")
console.print("[bold yellow](B) Back[/bold yellow]")
sub_choice = Prompt.ask("Select an option", choices=["Y", "F", "B"], default="Y").upper()
if sub_choice == "B":
return
elif sub_choice == "F":
force = True
console.print("\n[bold green]Starting Role Migration...[/bold green]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
role_task = progress.add_task("[cyan]Syncing Roles...", total=100)
async def update_progress(item_name: str, current: int, total: int):
progress.update(role_task, total=total, completed=current, description=f"[cyan]Syncing Role: {item_name}")
self.engine.is_running = True
await self.engine.migrate_roles(progress_callback=update_progress, force=force)
console.print("[bold green]Role migration complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during role migration: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
elif choice == "2":
try:
await self.engine.start_connections()
with console.status("[yellow]Analyzing categories and channels...[/yellow]"):
categories = await self.engine.discord_reader.get_categories()
channels = await self.engine.discord_reader.get_channels()
mapped_cats = sum(1 for c in categories if self.engine.state.get_fluxer_category_id(str(c.id)))
mapped_chs = sum(1 for c in channels if self.engine.state.get_fluxer_channel_id(str(c.id)))
total_mapped = mapped_cats + mapped_chs
console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]")
console.print("[bold green](Y) Proceed with Permission synchronization[/bold green]")
console.print("[bold yellow](B) Back[/bold yellow]")
sub_choice = Prompt.ask("Select an option", choices=["Y", "B"], default="Y").upper()
if sub_choice == "B":
return
console.print("\n[bold green]Syncing Category & Channel Permissions...[/bold green]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
perm_task = progress.add_task("[cyan]Syncing Permissions...", total=100)
async def update_progress(item_name: str, current: int, total: int):
progress.update(perm_task, total=total, completed=current, description=f"[cyan]Syncing: {item_name}")
self.engine.is_running = True
await self.engine.sync_permissions(progress_callback=update_progress)
console.print("[bold green]Permission synchronization complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during permission sync: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def copy_emojis(self):
console.print("\n[yellow]Fetching emojis and stickers...[/yellow]")
try:
await self.engine.start_connections()
with console.status("[yellow]Checking Fluxer for existing emojis and stickers...[/yellow]"):
await self.engine.sync_assets_state()
emojis = await self.engine.discord_reader.get_emojis()
stickers = await self.engine.discord_reader.get_stickers()
table = Table(show_header=True, header_style="bold #4641D9")
table.add_column("Type", width=10)
table.add_column("Name")
table.add_column("Status", justify="right")
cached_emojis = 0
for e in emojis:
already = self.engine.state.get_fluxer_emoji_id(str(e.id))
status = "[dim]already copied[/dim]" if already else "[bold green]NEW[/bold green]"
if already: cached_emojis += 1
table.add_row("Emoji", e.name, status)
cached_stickers = 0
for s in stickers:
already = self.engine.state.get_fluxer_sticker_id(str(s.id))
status = "[dim]already copied[/dim]" if already else "[bold green]NEW[/bold green]"
if already: cached_stickers += 1
table.add_row("Sticker", s.name, status)
console.print(table)
# Warn if everything is already in the state cache
force = False
total_items = len(emojis) + len(stickers)
cached_count = cached_emojis + cached_stickers
console.print("\n(1) Sync Emojis only")
console.print("(2) Sync Stickers only")
console.print("(3) Sync Emojis and Stickers")
console.print("(B) Back")
choice = Prompt.ask("Select an option", choices=["1", "2", "3", "B", "b"], default="B").upper()
if choice == "B":
return
types_to_include = []
if choice == "1":
types_to_include = ["Emoji"]
elif choice == "2":
types_to_include = ["Sticker"]
elif choice == "3":
types_to_include = ["Emoji", "Sticker"]
# Ask about force re-copy only if there are cached items in scope
cached_in_scope = 0
if "Emoji" in types_to_include:
cached_in_scope += sum(1 for e in emojis if self.engine.state.get_fluxer_emoji_id(str(e.id)))
if "Sticker" in types_to_include:
cached_in_scope += sum(1 for s in stickers if self.engine.state.get_fluxer_sticker_id(str(s.id)))
force = False
if cached_in_scope > 0:
console.print(f"\n[yellow]{cached_in_scope} item(s) already in state cache.[/yellow]")
console.print("[bold green](Y) Copy missing items only[/bold green]")
console.print("[bold red](F) Force Overwrite[/bold red]")
console.print("[bold yellow](B) Back[/bold yellow]")
choice = Prompt.ask("Select an option", choices=["Y", "F", "B"], default="Y").upper()
if choice == "B":
return
elif choice == "F":
force = True
# if 'Y', force remains False and we continue
console.print("\n[bold green]Starting Migration...[/bold green]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
emoji_task = progress.add_task("[cyan]Copying Assets...", total=100)
async def update_progress(item_name: str, item_type: str, current: int, total: int):
progress.update(emoji_task, total=total, completed=current, description=f"[cyan]Copying {item_type}: {item_name}")
self.engine.is_running = True
await self.engine.migrate_emojis(
progress_callback=update_progress,
types_to_include=types_to_include,
force=force
)
console.print("[bold green]Migration complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during emoji migration: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
async def sync_server_metadata(self):
console.print("\n[yellow]Fetching server metadata...[/yellow]")
try:
await self.engine.start_connections()
metadata = await self.engine.discord_reader.get_server_metadata()
name = metadata.get("name")
icon_status = "[bold green]FOUND[/bold green]" if metadata.get("icon_url") else "[yellow]NOT FOUND[/yellow]"
banner_status = "[bold green]FOUND[/bold green]" if metadata.get("banner_url") else "[yellow]NOT FOUND[/yellow]"
banner_status = "[bold green]FOUND[/bold green]" if metadata.get("banner_url") else "[yellow]NOT FOUND[/yellow]"
table = Table(show_header=False, box=None)
table.add_column("Property", style="bold")
table.add_column("Value")
table.add_row("Server Name:", name)
table.add_row("Server Icon:", icon_status)
table.add_row("Server Banner:", banner_status)
console.print(Panel(table, title="[bold]Discord Server Metadata[/bold]", expand=False))
console.print("\n(1) Sync Name only")
console.print("(2) Sync Icon only")
console.print("(3) Sync Banner only")
console.print("(4) Sync Everything")
console.print("(B) Back")
choice = Prompt.ask("Select an option", choices=["1", "2", "3", "4", "B", "b"], default="B").upper()
if choice == "B":
return
components = []
if choice == "1":
components = ["name"]
elif choice == "2":
components = ["icon"]
elif choice == "3":
components = ["banner"]
elif choice == "4":
components = ["name", "icon", "banner"]
console.print("\n[bold green]Syncing Server Metadata...[/bold green]")
async def progress_callback(item: str, status: str):
color = "green" if status == "DONE" else "red" if status == "ERROR" else "yellow"
console.print(f"{item} [[bold {color}]{status}[/bold {color}]]")
await self.engine.sync_server_metadata(progress_callback, components=components)
console.print("[bold green]Server metadata sync finished![/bold green]")
except Exception as e:
console.print(f"[bold red]Error during metadata sync: {str(e)}[/bold red]")
finally:
await self.engine.close_connections()
async def migrate_message_history(self):
console.print("\n[bold]Message History Migration (Manual Selection)[/bold]")
if not self.tokens_valid:
console.print("[bold red]Error: You must have a valid configuration (Option 6) to migrate messages.[/bold red]")
return
try:
with console.status("[yellow]Fetching Discord channels...[/yellow]"):
await self.engine.start_connections()
d_channels = await self.engine.discord_reader.get_channels()
if not d_channels:
console.print("[yellow]No text channels found in Discord server.[/yellow]")
return
console.print("\n[bold]Select Source Discord Channel:[/bold]")
for i, ch in enumerate(d_channels):
console.print(f"({i+1}) {ch.name}")
console.print("(B) Back")
d_choices = [str(i+1) for i in range(len(d_channels))] + ["B", "b"]
prompt_msg = f"Select Discord Channel [[bold cyan](1-{len(d_channels)})/B[/bold cyan]]"
d_choice = Prompt.ask(prompt_msg, choices=d_choices, show_choices=False).upper()
if d_choice == "B":
return
source_channel = d_channels[int(d_choice) - 1]
# 2. Select Target Fluxer Channel
with console.status("[yellow]Fetching Fluxer channels...[/yellow]"):
f_channels = await self.engine.fluxer_writer.get_channels()
if not f_channels:
console.print("[yellow]No channels found in Fluxer community.[/yellow]")
return
# Determine recommended channel
recommended_channel = None
# Priority 1: Check state.json mapping
mapped_id = self.engine.state.get_fluxer_channel_id(str(source_channel.id))
if mapped_id:
recommended_channel = next((c for c in f_channels if str(c.get('id', '')) == mapped_id), None)
# Priority 2: Check name match
if not recommended_channel:
recommended_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None)
console.print("\n[bold]Select Target Fluxer Channel:[/bold]")
for i, ch in enumerate(f_channels):
console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}")
f_choices = [str(i+1) for i in range(len(f_channels))] + ["B", "b", "N", "n"]
if recommended_channel:
console.print(f"(Y) [bold green]{recommended_channel.get('name')}[/bold green] (auto-matched)")
f_choices += ["Y", "y"]
console.print("(N) Create new channel")
console.print("(B) Back")
# Build simplified prompt string
prompt_parts = [f"(1-{len(f_channels)})", "B", "N"]
if recommended_channel:
prompt_parts.append("Y")
prompt_msg = f"Select Fluxer Channel [[bold cyan]{'/'.join(prompt_parts)}[/bold cyan]]"
f_choice = Prompt.ask(prompt_msg, choices=f_choices, show_choices=False, default="Y" if recommended_channel else None).upper()
if f_choice == "B":
return
if f_choice == "Y" and recommended_channel:
target_channel = recommended_channel
elif f_choice == "N":
# Check if a channel with this name already exists
target_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None)
if not target_channel:
with console.status(f"[yellow]Creating Fluxer channel #{source_channel.name}...[/yellow]"):
parent_id = None
if source_channel.category_id:
parent_id = self.engine.state.get_fluxer_category_id(str(source_channel.category_id))
topic = getattr(source_channel, 'topic', "") or ""
new_id = await self.engine.fluxer_writer.create_channel(
name=source_channel.name,
topic=topic,
type=0,
parent_id=parent_id
)
if new_id:
self.engine.state.set_channel_mapping(str(source_channel.id), new_id)
# Refresh list to get the channel object
f_channels = await self.engine.fluxer_writer.get_channels()
target_channel = next((c for c in f_channels if str(c.get('id')) == new_id), None)
else:
console.print("[bold red]Failed to create channel.[/bold red]")
return
else:
target_channel = f_channels[int(f_choice) - 1]
# 3. Handle Starting Message
with console.status("[yellow]Fetching first message...[/yellow]"):
first_msg = await self.engine.discord_reader.get_first_message(source_channel.id)
after_id = None
if first_msg:
# Link format: https://discord.com/channels/GUILD_ID/CHANNEL_ID/MESSAGE_ID
first_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{first_msg.id}"
console.print(f"\n[bold green]Channel Found![/bold green]")
console.print(f"Oldest Message Link: {first_msg_link}")
console.print(f"Author: [blue]{first_msg.author.name}[/blue]")
console.print(f"Content Preview: {first_msg.content[:100]}...")
console.print("\n(Y) [green]Yes, start from oldest message[/green]")
console.print("(M) Provide Specific message link or ID")
console.print("(B) Back")
start_mode = Prompt.ask("Start migration", choices=["Y", "M", "B"], default="Y").upper()
if start_mode == "B":
return
elif start_mode == "M":
custom_link = Prompt.ask("Enter Discord Message Link")
try:
# Extract message ID from end of link
custom_id = int(custom_link.split("/")[-1])
# Check if message exists to give feedback
msg = await self.engine.discord_reader.get_message(source_channel.id, custom_id)
if msg:
# To include this message, we start 'after' the ID before it
after_id = custom_id - 1
console.print(f"[green]Confirmed: Starting from {msg.author.name}'s message.[/green]")
else:
console.print("[red]Warning: Message ID not found in this channel. Starting from beginning.[/red]")
except Exception as e:
console.print(f"[red]Error parsing link: {e}. Starting from beginning.[/red]")
else:
console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]")
return
# 4. Analysis and Confirmation
console.print("\n[yellow]Analyzing channel content...[/yellow]")
self.engine.is_running = True
stats = {"messages": 0, "threads": 0, "attachments": 0}
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console
) as progress:
task = progress.add_task("[cyan]Scanning history...", total=None)
async def update_scan_progress(count: int):
progress.update(task, description=f"[cyan]Scanned {count} items...")
stats = await self.engine.analyze_migration(
source_channel_id=source_channel.id,
after_message_id=after_id,
progress_callback=update_scan_progress
)
finally:
self.engine.is_running = False
table = Table(show_header=True, header_style="bold #4641D9", title="Migration Summary & Estimates")
table.add_column("Item", width=15)
table.add_column("Count", justify="right", width=10)
table.add_column("Overhead/Details")
msg_time = stats['messages'] * self.config.migration.rate_limit_delay_seconds
table.add_row(
"Messages",
f"[bold cyan]{stats['messages']}[/bold cyan]",
f"~{msg_time}s delay (rate limiting), {stats['messages']} API writes"
)
table.add_row(
"Threads",
f"[bold cyan]{stats['threads']}[/bold cyan]",
f"{stats['threads'] * 2} extra marker messages, {stats['threads']} extra history fetches"
)
table.add_row(
"Attachments",
f"[bold cyan]{stats['attachments']}[/bold cyan]",
f"{stats['attachments']} downloads and uploads (bandwidth & API calls)"
)
console.print("")
console.print(table)
if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [#4641D9]#{target_channel.get('name')}[/#4641D9]?"):
return
# 5. Migration Execution
console.print("\n[bold green]Starting Migration...[/bold green]")
self.engine.is_running = True
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
task = progress.add_task("[cyan]Migrating messages...", total=None)
async def update_msg_progress(count: int):
progress.update(task, description=f"[cyan]Migrated {count} messages...")
count = await self.engine.migrate_messages(
source_channel_id=source_channel.id,
target_channel_id=target_channel.get("id"),
after_message_id=after_id,
progress_callback=update_msg_progress
)
console.print(f"\n[bold green]Success! {count} messages migrated to {target_channel.get('name')}.[/bold green]")
except Exception as e:
console.print(f"[bold red]Migration encountered an error: {str(e)}[/bold red]")
finally:
self.engine.is_running = False
await self.engine.close_connections()
async def danger_zone(self):
"""Danger Zone irreversible destructive operations on the Fluxer community."""
console.print("")
console.print(Panel.fit(
"[bold red]\u26a0 DANGER ZONE \u26a0[/bold red]\n"
"[yellow]These actions are PERMANENT and IRREVERSIBLE on your Fluxer community.[/yellow]\n"
"[yellow]Always double-check before confirming.[/yellow]",
style="bold red"
))
console.print("(1) Delete all Channels & Categories")
console.print("(2) Reset Channel & Category Permissions")
console.print("(3) Delete all Roles [dim](bot role is protected)[/dim]")
console.print("(4) Delete all custom Emojis & Stickers")
console.print("(B) Back")
choice = Prompt.ask(
"Select a Danger Zone option",
choices=["1", "2", "3", "4", "B", "b"],
default="B"
).upper()
if choice == "B":
return
# ---- (1) Delete all Channels & Categories ----
if choice == "1":
console.print("")
console.print("[bold red]This will DELETE every channel and category in the Fluxer community.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return
if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
del_task = progress.add_task("[red]Deleting channels...", total=100)
async def on_channel_deleted(name: str, current: int, total: int):
progress.update(del_task, total=total, completed=current,
description=f"[red]Deleting: {name}")
count = await self.engine.danger_delete_all_channels(progress_callback=on_channel_deleted)
console.print(f"[bold green]{count} channels/categories deleted.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
# ---- (2) Reset Channel & Category Permissions ----
elif choice == "2":
console.print("")
console.print("[bold red]This will RESET all permission overwrites on every channel and category.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return
if not Confirm.ask("[bold red]Last chance \u2013 all custom permission overrides will be wiped. Continue?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
perm_task = progress.add_task("[red]Resetting permissions...", total=100)
async def on_perm_reset(name: str, current: int, total: int):
progress.update(perm_task, total=total, completed=current,
description=f"[red]Resetting: {name}")
count = await self.engine.danger_reset_channel_permissions(progress_callback=on_perm_reset)
console.print(f"[bold green]Permissions reset on {count} channels/categories.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
# ---- (3) Delete all Roles ----
elif choice == "3":
console.print("")
console.print("[bold red]This will DELETE all roles in the Fluxer community.[/bold red]")
console.print("[dim]Managed roles (including the bot's own role) and @everyone are automatically protected.[/dim]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return
if not Confirm.ask("[bold red]Last chance \u2013 confirm permanent role deletion?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
role_task = progress.add_task("[red]Deleting roles...", total=100)
async def on_role_deleted(name: str, current: int, total: int):
progress.update(role_task, total=total, completed=current,
description=f"[red]Deleting role: {name}")
count = await self.engine.danger_delete_all_roles(progress_callback=on_role_deleted)
console.print(f"[bold green]{count} roles deleted.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
# ---- (4) Delete all Emojis & Stickers ----
elif choice == "4":
console.print("")
console.print("[bold red]This will DELETE all custom emojis and stickers in the Fluxer community.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return
if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
asset_task = progress.add_task("[red]Deleting assets...", total=100)
async def on_asset_deleted(name: str, asset_type: str, current: int, total: int):
progress.update(asset_task, total=total, completed=current,
description=f"[red]Deleting {asset_type}: {name}")
counts = await self.engine.danger_delete_all_emojis_and_stickers(progress_callback=on_asset_deleted)
console.print(f"[bold green]{counts.get('emojis', 0)} emojis deleted.[/bold green]")
console.print(f"[bold green]{counts.get('stickers', 0)} stickers deleted.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
async def run_cli():
cli = MigrationCLI()
await cli.run()