From 3a6a138c895a4d1da5a2afe34ecad5ca96e4894e Mon Sep 17 00:00:00 2001 From: rambros Date: Wed, 4 Mar 2026 13:13:38 +0530 Subject: [PATCH] prepare for backup_reader refactor --- src/core/backup_reader.py | 0 src/core/discord_reader.py | 8 + src/ui/backup_ops.py | 15 +- src/ui/shuttle_app.py | 1617 ------------------------------------ src/ui/shuttle_ops.py | 69 +- src/ui/unifier_tui.py | 1013 ---------------------- 6 files changed, 66 insertions(+), 2656 deletions(-) create mode 100644 src/core/backup_reader.py delete mode 100644 src/ui/shuttle_app.py delete mode 100644 src/ui/unifier_tui.py diff --git a/src/core/backup_reader.py b/src/core/backup_reader.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/discord_reader.py b/src/core/discord_reader.py index 7a296c8..811eea4 100644 --- a/src/core/discord_reader.py +++ b/src/core/discord_reader.py @@ -9,6 +9,14 @@ class DiscordReader: MESSAGE_TYPE_DEFAULT = discord.MessageType.default MESSAGE_TYPE_REPLY = discord.MessageType.reply MESSAGE_TYPE_THREAD_STARTER = discord.MessageType.thread_starter_message + + # Exceptions + Forbidden = discord.Forbidden + + # Channel Types + CHANNEL_TYPE_TEXT = discord.ChannelType.text + CHANNEL_TYPE_NEWS = discord.ChannelType.news + CHANNEL_TYPE_FORUM = discord.ChannelType.forum @staticmethod def find_item(iterable, **attrs): diff --git a/src/ui/backup_ops.py b/src/ui/backup_ops.py index 5ac0439..f6d3def 100644 --- a/src/ui/backup_ops.py +++ b/src/ui/backup_ops.py @@ -4,7 +4,6 @@ Embedded inside ModeScreen's "Backup" tab. """ import asyncio -import discord import json import re from pathlib import Path @@ -147,7 +146,7 @@ class BackupPane(Container): modal.write(f"- {e_count} emojis, {s_count} stickers.") modal.phase_report("Profile Backup") - except discord.Forbidden as e: + except self.engine.discord_reader.Forbidden as e: modal.write(f"[bold red]Backup failed: {e}[/bold red]") modal.phase_report("Profile Backup", "error") except Exception as e: @@ -174,7 +173,11 @@ class BackupPane(Container): eligible_channels = [ c for c in all_channels - if c.type in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.forum] + if c.type in [ + self.engine.discord_reader.CHANNEL_TYPE_TEXT, + self.engine.discord_reader.CHANNEL_TYPE_NEWS, + self.engine.discord_reader.CHANNEL_TYPE_FORUM + ] ] if not eligible_channels: @@ -284,7 +287,11 @@ class BackupPane(Container): all_channels = await self.engine.discord_reader.get_channels() eligible_channels = [ c for c in all_channels - if c.type in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.forum] + if c.type in [ + self.engine.discord_reader.CHANNEL_TYPE_TEXT, + self.engine.discord_reader.CHANNEL_TYPE_NEWS, + self.engine.discord_reader.CHANNEL_TYPE_FORUM + ] ] selected_channels = [ diff --git a/src/ui/shuttle_app.py b/src/ui/shuttle_app.py deleted file mode 100644 index e49b8a0..0000000 --- a/src/ui/shuttle_app.py +++ /dev/null @@ -1,1617 +0,0 @@ -import sys -import asyncio -import discord -import logging -import re -import time -import aiohttp -from rich.console import Console -from rich.prompt import Prompt, Confirm -from rich.table import Table -from rich.panel import Panel -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, ProgressColumn -from rich.text import Text -from src.core.configuration import load_config, save_config -from src.core.base import MigrationContext - -import src.fluxer.roles_permissions as fluxer_roles -import src.stoat.roles_permissions as stoat_roles -import src.fluxer.emoji_stickers as fluxer_emoji_stickers -import src.stoat.emoji_stickers as stoat_emoji_stickers -import src.fluxer.server_metadata as fluxer_metadata -import src.stoat.server_metadata as stoat_metadata -import src.fluxer.migrate_message as fluxer_migrate -import src.stoat.migrate_message as stoat_migrate - -from src.core.audit import log_audit_event - -global_rate_limit_msg = "" -global_rate_limit_expires = 0.0 - -class RateLimitColumn(ProgressColumn): - """Renders the current dynamic rate limit wait.""" - def render(self, task) -> Text: - global global_rate_limit_msg, global_rate_limit_expires - if time.time() < global_rate_limit_expires: - return Text.from_markup(f"[dim]\\[wait: {global_rate_limit_msg}][/dim]") - return Text("") - -class RateLimitHandler(logging.Handler): - """Intersects library logs to capture clean dynamic rate limit messages.""" - def __init__(self): - super().__init__() - self._last_print = "" - - def emit(self, record): - try: - msg = record.getMessage() - # Detect rate limit messages from discord.py, fluxer.py, stoat, etc. - if "retry" in msg.lower() and ("rate limit" in msg.lower() or "429" in msg): - # Extract seconds using regex: supports "retry in 5.50s" and "Retrying in 5.50 seconds" - match = re.search(r"in ([\d.]+)\s*(?:seconds?|s)", msg, re.IGNORECASE) - if match: - seconds = match.group(1) - if "discord" in record.name.lower(): - platform = "Discord" - elif "fluxer" in record.name.lower(): - platform = "Fluxer" - elif "stoat" in record.name.lower(): - platform = "Stoat" - else: - platform = "API" - - # Update the global dynamic rate limit info - global global_rate_limit_msg, global_rate_limit_expires - global_rate_limit_msg = f"{platform} rate limit {seconds}s" - try: - global_rate_limit_expires = time.time() + float(seconds) - except ValueError: - pass - except Exception: - pass - -console = Console() - -class MigrationCLI: - """Standard CLI app to manage the Discord to Fluxer migration.""" - - def __init__(self, target_platform="fluxer", config_path="config.yaml"): - self.config_path = config_path - self.target_platform = target_platform - try: - self.config = load_config(self.config_path) - self.engine = MigrationContext(self.config, self.target_platform) - except Exception as e: - console.print(f"[bold red]Failed to load config: {e}[/bold red]") - sys.exit(1) - - self.progress_callback_task = None - self.tokens_valid = False - - # Register rate limit interceptor for both libraries - rl_handler = RateLimitHandler() - logging.getLogger("discord").addHandler(rl_handler) - logging.getLogger("fluxer").addHandler(rl_handler) - logging.getLogger("stoat").addHandler(rl_handler) - - async def validate_config(self): - self.validation_results = { - "discord_token": False, "discord_bot_name": None, - "discord_server": False, "discord_server_name": None, - "discord_intents": {}, "discord_permissions": {}, - "fluxer_token": False, "fluxer_bot_name": None, - "fluxer_community": False, "fluxer_community_name": None, - "fluxer_permissions": {}, - "stoat_token": False, "stoat_bot_name": None, - "stoat_server": False, "stoat_server_name": None, - "stoat_permissions": {}, - "discord_timeout": False, "fluxer_timeout": False, "stoat_timeout": False - } - self.tokens_valid = False - - d_token = self.config.discord_bot_token - f_token = self.config.fluxer_bot_token - s_token = self.config.stoat_bot_token - - fillers = ["DISCORD_BOT_TOKEN", "FLUXER_BOT_TOKEN", "STOAT_BOT_TOKEN", - "000000000000000000", "DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID", "STOAT_SERVER_ID", - "", None - ] - discord_dummy = d_token in fillers or self.config.discord_server_id in fillers - fluxer_dummy = f_token in fillers or self.config.fluxer_community_id in fillers - stoat_dummy = s_token in fillers or self.config.stoat_server_id in fillers - - discord_task = None - if not discord_dummy: - discord_task = asyncio.create_task(self.engine.discord_reader.validate()) - - fluxer_task = None - if not fluxer_dummy and self.target_platform == "fluxer": - fluxer_task = asyncio.create_task(self.engine.writer.validate()) - - stoat_task = None - if not stoat_dummy and self.target_platform == "stoat": - stoat_task = asyncio.create_task(self.engine.writer.validate()) - - tasks_to_wait = [t for t in [discord_task, fluxer_task, stoat_task] if t is not None] - - try: - with console.status("[yellow]Validating tokens...[/yellow]"): - start_time = asyncio.get_event_loop().time() - done = set() - pending = set() - if tasks_to_wait: - done, pending = await asyncio.wait( - tasks_to_wait, - timeout=10.0, - return_when=asyncio.ALL_COMPLETED - ) - - # Process Discord Result - if discord_dummy: - console.print("[bold yellow]Discord setup incomplete.[/bold yellow]") - elif discord_task in done: - try: - res = discord_task.result() - self.validation_results["discord_token"] = res.get("token", False) - self.validation_results["discord_bot_name"] = res.get("bot_name") - self.validation_results["discord_server"] = res.get("server", False) - self.validation_results["discord_server_name"] = res.get("server_name") - if not res.get("token"): - console.print("[bold red]Discord Token validation failed (Invalid Token).[/bold red]") - elif not res.get("server"): - console.print("[bold red]Discord Server ID validation failed (Invalid/Inaccessible ID).[/bold red]") - else: - # Store intents & permissions for later UI display - self.validation_results["discord_intents"] = res.get("intents", {}) - self.validation_results["discord_permissions"] = res.get("permissions", {}) - - except Exception as e: - console.print(f"[bold red]Discord validation failed with error: {e}[/bold red]") - else: - console.print("[bold red]Discord bot token validation timed out after 10 seconds.[/bold red]") - self.validation_results["discord_timeout"] = True - discord_task.cancel() - - # Process Fluxer Result - if self.target_platform == "fluxer": - if fluxer_dummy: - console.print("[dim yellow]Fluxer platform setup incomplete.[/dim yellow]") - elif fluxer_task in done: - try: - res = fluxer_task.result() - self.validation_results["fluxer_token"] = res.get("token", False) - self.validation_results["fluxer_bot_name"] = res.get("bot_name") - self.validation_results["fluxer_community"] = res.get("community", False) - self.validation_results["fluxer_community_name"] = res.get("community_name") - if not res.get("token"): - console.print("[bold red]Fluxer Token validation failed (Invalid Token).[/bold red]") - elif not res.get("community"): - console.print("[bold red]Fluxer Community ID validation failed (Invalid/Inaccessible ID).[/bold red]") - else: - self.validation_results["fluxer_permissions"] = res.get("permissions", {}) - except Exception as e: - console.print(f"[bold red]Fluxer validation failed with error: {e}[/bold red]") - else: - console.print("[bold red]Fluxer bot token validation timed out after 10 seconds.[/bold red]") - self.validation_results["fluxer_timeout"] = True - fluxer_task.cancel() - - # Process Stoat Result - if self.target_platform == "stoat": - if stoat_dummy: - console.print("[dim yellow]Stoat platform setup incomplete.[/dim yellow]") - elif stoat_task in done: - try: - res = stoat_task.result() - self.validation_results["stoat_token"] = res.get("token", False) - self.validation_results["stoat_bot_name"] = res.get("bot_name") - self.validation_results["stoat_server"] = res.get("community", False) - self.validation_results["stoat_server_name"] = res.get("community_name") - if not res.get("token"): - console.print("[bold red]Stoat Token validation failed (Invalid Token).[/bold red]") - elif not res.get("community"): - console.print("[bold red]Stoat Server ID validation failed (Invalid/Inaccessible ID).[/bold red]") - else: - self.validation_results["stoat_permissions"] = res.get("permissions", {}) - except Exception as e: - console.print(f"[bold red]Stoat validation failed with error: {e}[/bold red]") - else: - console.print("[bold red]Stoat bot token validation timed out after 10 seconds.[/bold red]") - self.validation_results["stoat_timeout"] = True - stoat_task.cancel() - - # Only tokens and server/community existence are strictly required for 'tokens_valid' - discord_valid = self.validation_results.get("discord_token") and self.validation_results.get("discord_server") - - if self.target_platform == "fluxer": - target_valid = self.validation_results.get("fluxer_token") and self.validation_results.get("fluxer_community") - else: - target_valid = self.validation_results.get("stoat_token") and self.validation_results.get("stoat_server") - - self.tokens_valid = discord_valid and target_valid - - if self.tokens_valid: - if self.target_platform == "fluxer": - srv_id = self.config.fluxer_community_id - srv_name = self.validation_results.get("fluxer_community_name", "unknown") - else: - srv_id = self.config.stoat_server_id - srv_name = self.validation_results.get("stoat_server_name", "unknown") - - if srv_id and srv_name and srv_id not in ["000000000000000000", "DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID", "STOAT_SERVER_ID", "unconfigured", ""]: - import re - safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', srv_name) - self.engine.state.set_folder(str(srv_id), safe_name) - - - # Check if all permissions are actually granted - self.permissions_complete = True - if self.tokens_valid: - # Discord - d_intents = self.validation_results.get("discord_intents", {}) - d_perms = self.validation_results.get("discord_permissions", {}) - if not all([d_intents.get("message_content"), d_perms.get("view_channel"), d_perms.get("read_message_history")]): - self.permissions_complete = False - - # Fluxer or Stoat - if self.target_platform == "fluxer": - f_perms = self.validation_results.get("fluxer_permissions", {}) - if not all(f_perms.values()) if f_perms else True: - self.permissions_complete = False - else: - s_perms = self.validation_results.get("stoat_permissions", {}) - if not all(s_perms.values()) if s_perms else True: - self.permissions_complete = False - - except Exception as e: - console.print(f"[bold red]Validation system failure: {e}[/bold red]") - finally: - # Ensure tasks are cleaned up - for t in [discord_task, fluxer_task, stoat_task]: - if t is not None and not t.done(): t.cancel() - - async def run(self): - if self.config.discord_bot_token in ["YOUR_DISCORD_TOKEN", "DISCORD_BOT_TOKEN"]: - console.print("\n[bold yellow]First time setup detected. Redirecting to configuration...[/bold yellow]") - self.validation_results = { - "discord_token": False, "discord_bot_name": None, - "discord_server": False, "discord_server_name": None, - "discord_intents": {}, "discord_permissions": {}, - "fluxer_token": False, "fluxer_bot_name": None, - "fluxer_community": False, "fluxer_community_name": None, - "fluxer_permissions": {}, - "stoat_token": False, "stoat_bot_name": None, - "stoat_server": False, "stoat_server_name": None, - "stoat_permissions": {}, - "discord_timeout": False, "fluxer_timeout": False, "stoat_timeout": False - } - self.tokens_valid = False - self.permissions_complete = False - await self.edit_configuration(skip_validation=True) - else: - await self.validate_config() - - while True: - console.print("") - console.print(Panel.fit(f"{self.target_platform.capitalize()} Reaper", style="bold blue")) - d_name = self.validation_results.get("discord_server_name") - d_display = f"[bold green]\"{d_name}\"[/bold green]" if d_name else ("[bold yellow]TIMEOUT ERROR[/bold yellow]" if self.validation_results.get("discord_timeout") else "[bold red]NOT SET UP[/bold red]") - - f_name = self.validation_results.get("fluxer_community_name") - f_display = f"[bold green]\"{f_name}\"[/bold green]" if f_name else ("[bold yellow]TIMEOUT ERROR[/bold yellow]" if self.validation_results.get("fluxer_timeout") else "[bold red]NOT SET UP[/bold red]") - - s_name = self.validation_results.get("stoat_server_name") - s_display = f"[bold green]\"{s_name}\"[/bold green]" if s_name else ("[bold yellow]TIMEOUT ERROR[/bold yellow]" if self.validation_results.get("stoat_timeout") else "[bold red]NOT SET UP[/bold red]") - - console.print(f"[bold cyan]Discord Server:[/bold cyan] {d_display}") - - if self.target_platform == "fluxer": - console.print(f"[bold #4641D9]Fluxer Community:[/bold #4641D9] {f_display}") - else: - console.print(f"[bold #FF8C00]Stoat Server:[/bold #FF8C00] {s_display}") - console.print("[bold]Main Menu[/bold]") - console.print("(1) Clone Server Template (Channels & Categories)") - console.print("(2) Copy Roles & Permissions") - console.print("(3) Copy Emojis & Stickers") - console.print("(4) Sync Server Name, Logo and Banner") - console.print("(5) Migrate message history") - - if not self.tokens_valid: - val_status = "[bold red][INVALID][/bold red]" - elif not self.permissions_complete: - val_status = "[bold yellow][PERMISSION MISSING][/bold yellow]" - else: - val_status = "[bold green][VALID][/bold green]" - - console.print(f"(6) Configuration {val_status}") - console.print("(7) [red]Danger Zone ⚠[/red]") - - console.print("(Q) Exit") - - choice = Prompt.ask("\nSelect an option [1/2/3/4/5/6/7/Q]", choices=["1", "2", "3", "4", "5", "6", "7", "Q", "q"], default="Q", show_choices=False).upper() - - if choice in ("1", "2", "3", "4", "5", "7") and self.tokens_valid and not self.permissions_complete: - console.print("[bold red]Warning:[/bold red][bold yellow] Permission Missing - Check[/bold yellow] [bold blue](6) Configuration[/bold blue] [bold yellow]for more info[/bold yellow]") - - if choice == "1": - await self.clone_server_template() - elif choice == "2": - await self.copy_roles() - elif choice == "3": - await self.copy_emojis() - elif choice == "4": - await self.sync_server_metadata() - elif choice == "5": - await self.migrate_message_history() - elif choice == "6": - await self.edit_configuration() - elif choice == "7": - await self.danger_zone() - elif choice == "Q": - console.print("[yellow]Exiting tool...[/yellow]") - await self.engine.close_connections() - break - - async def edit_configuration(self, skip_validation=False): - if not skip_validation: - await self.validate_config() - - while True: - console.clear() - console.print(Panel(f"[bold blue]Configuration: {self.target_platform.capitalize()}[/bold blue]", expand=False)) - - # Print permissions summary - v = self.validation_results - s_perms = v.get(f"{self.target_platform}_permissions", {}) - d_perms = v.get("discord_permissions", {}) - d_intents = v.get("discord_intents", {}) - - def fmt(name, has): - if has is None: return f"[dim]{name}[/dim]" - color = "green" if has else "red" - symbol = "✔" if has else "✘" - return f"[{color}]{symbol} {name}[/{color}]" - - perm_table = Table(show_header=True, header_style="bold magenta", box=None, padding=(0, 2)) - perm_table.add_column("Bot Type") - perm_table.add_column("Required Permissions & Intents") - - perm_table.add_row( - "[bold #5865F2]Discord Bot[/bold #5865F2]", - f"• [bold]Intents:[/bold] Server Members, {fmt('Message Content', d_intents.get('message_content'))}\n" - f"• [bold]Permissions:[/bold] {fmt('View Channels', d_perms.get('view_channel'))}, {fmt('Read Message History', d_perms.get('read_message_history'))}" - ) - - if self.target_platform == "fluxer": - perm_table.add_row( - "[bold #4641D9]Fluxer Bot[/bold #4641D9]", - f"• [bold]Permissions:[/bold] {fmt('Manage Channels', s_perms.get('manage_channels'))}, {fmt('Manage Messages', s_perms.get('manage_messages'))},\n" - f" {fmt('Manage Roles', s_perms.get('manage_roles'))}, {fmt('Manage Emojis/Stickers', s_perms.get('manage_emojis_stickers'))}, {fmt('Manage Webhooks', s_perms.get('manage_webhooks'))}" - ) - else: - perm_table.add_row( - "[bold #FF8C00]Stoat Bot[/bold #FF8C00]", - f"• [bold]Permissions:[/bold] {fmt('Manage Channel', s_perms.get('manage_channels'))}, {fmt('Manage Server', s_perms.get('manage_server'))},\n" - f" {fmt('Manage Permissions', s_perms.get('manage_permissions'))}, {fmt('Manage Roles', s_perms.get('manage_roles'))}, {fmt('Manage Customization', s_perms.get('manage_customization'))},\n" - f" {fmt('Manage Messages', s_perms.get('manage_messages'))}, {fmt('Send Messages', s_perms.get('send_messages'))}, {fmt('Masquerade', s_perms.get('masquerade'))},\n" - f" {fmt('Upload Files', s_perms.get('upload_files'))}, {fmt('React', s_perms.get('react'))}, {fmt('Mention Everyone', s_perms.get('mention_everyone'))}, {fmt('Mention Roles', s_perms.get('mention_roles'))}" - ) - - console.print("\n") # Add spacing before panel - console.print(Panel(perm_table, title=f"[bold]Required Bot Permissions (Target: {self.target_platform.capitalize()})[/bold]", expand=False, border_style="dim")) - - console.print("\n[bold]Configuration Status:[/bold]") - - def get_status_str(is_valid, name=None): - status = "[bold green][VALID][/bold green]" if is_valid else "[bold red][INVALID][/bold red]" - if is_valid and name: - return f"{status} \"{name}\"" - return status - - console.print(f"Discord Bot Token {get_status_str(v.get('discord_token', False), v.get('discord_bot_name'))}") - console.print(f"Discord Server ID {get_status_str(v.get('discord_server', False), v.get('discord_server_name'))}") - - if self.target_platform == "fluxer": - console.print(f"Fluxer Bot Token {get_status_str(v.get('fluxer_token', False), v.get('fluxer_bot_name'))}") - console.print(f"Fluxer Community ID {get_status_str(v.get('fluxer_community', False), v.get('fluxer_community_name'))}") - console.print(f"Fluxer API URL [dim]({self.config.fluxer_api_url})[/dim]") - else: - console.print(f"Stoat Bot Token {get_status_str(v.get('stoat_token', False), v.get('stoat_bot_name'))}") - console.print(f"Stoat Server ID {get_status_str(v.get('stoat_server', False), v.get('stoat_server_name'))}") - console.print(f"Stoat API URL [dim]({self.config.stoat_api_url})[/dim]") - - console.print("\n(1) Edit Bot tokens & Community IDs") - console.print("(2) Edit API url (for self hosted instances)") - console.print("(B) Back") - - menu_choice = Prompt.ask("\nSelect an option [1/2/B]", choices=["1", "2", "B", "b"], default="B", show_choices=False).upper() - - if menu_choice == "B": - return - - if menu_choice == "2": - console.print("\n[bold]API URL Editor[/bold] (leave blank to keep current)") - - if self.target_platform == "fluxer": - f_url = Prompt.ask("Fluxer API URL", default=self.config.fluxer_api_url) - s_url = self.config.stoat_api_url - changed = f_url != self.config.fluxer_api_url - else: - s_url = Prompt.ask("Stoat API URL", default=self.config.stoat_api_url) - f_url = self.config.fluxer_api_url - changed = s_url != self.config.stoat_api_url - - if not changed: - console.print("[yellow]No changes made.[/yellow]") - time.sleep(1) - continue - - # Validation logic - all_valid = True - with console.status("[bold yellow]Validating API endpoint..."): - async def check_api(url, name): - if url == "default": - return True - try: - # Use a 5 second timeout for validation - async with aiohttp.ClientSession() as session: - async with session.get(url.rstrip("/") + "/", timeout=5.0) as resp: - if resp.status >= 500: - console.print(f"[red]Error: {name} API returned server error {resp.status}[/red]") - return False - return True - except Exception as e: - console.print(f"[red]Error connecting to {name} API ({url}): {e}[/red]") - return False - - if self.target_platform == "fluxer": - if not await check_api(f_url, "Fluxer"): all_valid = False - else: - if not await check_api(s_url, "Stoat"): all_valid = False - - if all_valid: - console.print("[bold green]API endpoint valid[/bold green]") - else: - console.print("[bold red]API validation failed![/bold red]") - - if Confirm.ask("Save config?", default=all_valid): - self.config.fluxer_api_url = f_url - self.config.stoat_api_url = s_url - save_config(self.config, self.config_path) - # Recreate engine with new config - self.engine = MigrationContext(self.config, self.target_platform) - console.print(f"\n[bold green]API URL updated and saved to {self.config_path}![/bold green]") - await self.update_validation_status() - else: - console.print("[yellow]Changes discarded.[/yellow]") - - time.sleep(1) - continue - - console.print("\n[bold]Configuration Editor[/bold] (leave blank to keep current)") - - d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token) - d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id) - - if self.target_platform == "fluxer": - f_token = Prompt.ask("Fluxer Bot Token", default=self.config.fluxer_bot_token) - f_comm = Prompt.ask("Fluxer Community ID", default=self.config.fluxer_community_id) - s_token = self.config.stoat_bot_token - s_comm = self.config.stoat_server_id - else: - s_token = Prompt.ask("Stoat Bot Token", default=self.config.stoat_bot_token) - s_comm = Prompt.ask("Stoat Server ID", default=self.config.stoat_server_id) - f_token = self.config.fluxer_bot_token - f_comm = self.config.fluxer_community_id - - if (d_token != self.config.discord_bot_token or - f_token != self.config.fluxer_bot_token or - d_server != self.config.discord_server_id or - f_comm != self.config.fluxer_community_id or - s_token != self.config.stoat_bot_token or - s_comm != self.config.stoat_server_id): - - # Treat empty string as None to unconfigure - self.config.discord_bot_token = d_token if d_token != "" else None - self.config.discord_server_id = d_server if d_server != "" else None - self.config.fluxer_bot_token = f_token if f_token != "" else None - self.config.fluxer_community_id = f_comm if f_comm != "" else None - self.config.stoat_bot_token = s_token if s_token != "" else None - self.config.stoat_server_id = s_comm if s_comm != "" else None - - save_config(self.config, self.config_path) - # Recreate engine with new config - self.engine = MigrationContext(self.config, self.target_platform) - - # Re-validate - console.print("[yellow]Validating new configuration...[/yellow]") - await self.update_validation_status() - - console.print(f"\nDiscord Bot Token {get_status_str(self.validation_results.get('discord_token', False))}") - console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}") - - if self.target_platform == "fluxer": - console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}") - console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}") - else: - console.print(f"Stoat Bot Token {get_status_str(self.validation_results.get('stoat_token', False))}") - console.print(f"Stoat Server ID {get_status_str(self.validation_results.get('stoat_server', False))}") - - console.print(f"[bold green]Configuration updated and saved to {self.config_path}![/bold green]") - else: - console.print("[yellow]No changes made.[/yellow]") - - time.sleep(1) # Small delay to see status before refresh - - async def update_validation_status(self): - await self.validate_config() - - async def clone_server_template(self): - if self.target_platform == "fluxer": - from src.fluxer.clone_server import sync_channel_state, migrate_channels - else: - from src.stoat.clone_server import sync_channel_state, migrate_channels - - console.print("\n[yellow]Fetching server structure...[/yellow]") - categories = [] - channels = [] - try: - await self.engine.start_connections() - with console.status(f"[yellow]Syncing {self.target_platform.capitalize()} channel state...[/yellow]"): - await sync_channel_state(self.engine) - categories = await self.engine.discord_reader.get_categories() - channels = await self.engine.discord_reader.get_channels() - except Exception as e: - console.print(f"[bold red]Failed to fetch server structure: {e}[/bold red]") - await self.engine.close_connections() - return - - table = Table(show_header=True, header_style="bold #4641D9") - table.add_column("Type", width=12) - table.add_column("Discord Name") - table.add_column("Status", justify="right") - - # Group channels by category for status check - missing_by_cat = {} - missing_uncategorized = [] - for ch in channels: - cat_id = str(ch.category_id) if ch.category_id else None - if not self.engine.state.get_fluxer_channel_id(str(ch.id)): - if cat_id: - if cat_id not in missing_by_cat: missing_by_cat[cat_id] = [] - missing_by_cat[cat_id].append(ch) - else: - missing_uncategorized.append(ch) - - # Build the unified preview table - for cat in categories: - cat_id_str = str(cat.id) - fluxer_cat_id = self.engine.state.get_fluxer_category_id(cat_id_str) - - status = f"[cyan]{fluxer_cat_id}[/cyan]" if fluxer_cat_id else "[bold red]Missing[/bold red]" - table.add_row("[bold yellow]Category[/bold yellow]", f"[bold yellow]{cat.name}[/bold yellow]", status) - - # Show ALL channels under this category - cat_channels = [ch for ch in channels if str(ch.category_id) == cat_id_str] - for ch in cat_channels: - fluxer_ch_id = self.engine.state.get_fluxer_channel_id(str(ch.id)) - ch_status = f"[cyan]{fluxer_ch_id}[/cyan]" if fluxer_ch_id else "[red]Missing[/red]" - table.add_row(" Channel", ch.name, ch_status) - - # Show Uncategorized - uncategorized = [ch for ch in channels if not ch.category_id] - if uncategorized: - table.add_row("Uncategorized", "", "") - for ch in uncategorized: - fluxer_ch_id = self.engine.state.get_fluxer_channel_id(str(ch.id)) - ch_status = f"[cyan]{fluxer_ch_id}[/cyan]" if fluxer_ch_id else "[red]Missing[/red]" - table.add_row(" Channel", ch.name, ch_status) - - console.print(table) - console.print("") - - # Check for existing mappings to determine if we should suggest a force re-copy - cached_count = sum(1 for cat in categories if self.engine.state.get_fluxer_category_id(str(cat.id))) - cached_count += sum(1 for ch in channels if self.engine.state.get_fluxer_channel_id(str(ch.id))) - all_ids_len = len(categories) + len(channels) - - force = False - if cached_count > 0: - console.print(f"[yellow]\u26a0 {cached_count}/{all_ids_len} item(s) already in state cache.[/yellow]") - # End of table consolidated section, back to standard flow logic. - - console.print("[bold green](Y) Continue with only missing items[/bold green]") - console.print("[bold red](F) Force re-clone, creates duplicate channels![/bold red]") - console.print("[bold yellow](B) Back[/bold yellow]") - - choice = Prompt.ask("Select an option [Y/F/B]", choices=["Y", "y", "F", "f", "B", "b"], default="Y", show_choices=False).upper() - - if choice == "B": - await self.engine.close_connections() - return - elif choice == "F": - force = True - # if 'Y', force remains False and we continue - else: - if not Confirm.ask("Clone channels and categories?", default=True): - await self.engine.close_connections() - return - if not Confirm.ask("Are you sure?", default=True): - await self.engine.close_connections() - return - - console.print("\n[bold green]Starting Channel Cloning...[/bold green]") - try: - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - RateLimitColumn(), - console=console - ) as progress: - - channel_task = progress.add_task("[cyan]Copying Channels...", total=100) - - async def update_progress(item_name: str, status: str, current: int, total: int): - color = "cyan" if status == "Copying" else "yellow" - progress.update(channel_task, total=total, completed=current, description=f"[{color}]{status} Channel: {item_name}") - - self.engine.is_running = True - cloned_info = await migrate_channels(self.engine, progress_callback=update_progress, force=force) - - console.print("[bold green]Server Template cloned![/bold green]") - - if cloned_info and cloned_info.get("structure"): - lines = ["Successfully cloned channels and categories from Discord:"] - # Sort categories but keep "No Category" at the bottom if it exists - cats = sorted(cloned_info["structure"].keys(), key=lambda x: (x == "No Category", x)) - for cat_name in cats: - channel_names = cloned_info["structure"][cat_name] - # Only print if the category itself was created OR it has channels created under it - if cat_name in cloned_info["categories_created"] or channel_names: - lines.append(f"- **{cat_name}**") - for ch_name in sorted(channel_names): - lines.append(f" - {ch_name}") - - audit_desc = "\n".join(lines) - await log_audit_event(self.engine, "Server Template Cloned", audit_desc) - else: - await log_audit_event(self.engine, "Server Template Cloned", "No new channels or categories were cloned.") - - except Exception as e: - console.print(f"[bold red]Error during channel clone: {str(e)}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - - async def copy_roles(self): - console.print("\n[bold]Role & Permission Options[/bold]") - console.print("(1) Clone Roles & Role Permissions") - console.print("(2) Sync Category Permissions & Channel Permissions") - console.print("(B) Back") - - # Select appropriate role module - roles_mod = fluxer_roles if self.engine.target_platform == "fluxer" else stoat_roles - - choice = Prompt.ask("Select an option [1/2/B]", choices=["1", "2", "B", "b"], default="B", show_choices=False).upper() - - if choice == "B": - return - - if choice == "1": - try: - await self.engine.start_connections() - - with console.status(f"[yellow]Checking {self.engine.target_platform.capitalize()} for existing roles...[/yellow]"): - await roles_mod.sync_roles_state(self.engine) - - roles = await self.engine.discord_reader.get_roles() - - table = Table(show_header=True, header_style="bold #4641D9") - table.add_column("Discord Role") - table.add_column("Status", justify="center") - table.add_column(f"{self.engine.target_platform.capitalize()} ID", justify="right") - - cached_count = 0 - for r in roles: - target_id = self.engine.state.get_target_role_id(str(r.id)) - status = "[bold green]NEW[/bold green]" - tid_str = "[dim]N/A[/dim]" - if target_id: - status = "[dim]already copied[/dim]" - tid_str = f"[cyan]{target_id}[/cyan]" - cached_count += 1 - table.add_row(r.name, status, tid_str) - - console.print(table) - - force = False - if cached_count > 0: - console.print(f"\n[yellow]{cached_count} role(s) already in state cache.[/yellow]") - console.print("[bold green](Y) Sync missing roles only[/bold green]") - console.print("[bold red](F) Force Overwrite[/bold red]") - console.print("[bold yellow](B) Back[/bold yellow]") - - sub_choice = Prompt.ask("Select an option [Y/F/B]", choices=["Y", "y", "F", "f", "B", "b"], default="Y", show_choices=False).upper() - - if sub_choice == "B": - return - elif sub_choice == "F": - force = True - else: - if not Confirm.ask("Clone roles?", default=True): - await self.engine.close_connections() - return - if not Confirm.ask("Are you sure?", default=True): - await self.engine.close_connections() - return - - console.print("\n[bold green]Starting Role Migration...[/bold green]") - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - RateLimitColumn(), - console=console - ) as progress: - - role_task = progress.add_task("[cyan]Syncing Roles...", total=100) - - async def update_progress(item_name: str, current: int, total: int): - progress.update(role_task, total=total, completed=current, description=f"[cyan]Copying Role: {item_name}") - - self.engine.is_running = True - cloned_roles = await roles_mod.migrate_roles(self.engine, progress_callback=update_progress, force=force) - - console.print("[bold green]Role migration complete![/bold green]") - if cloned_roles: - audit_desc = "Successfully cloned these roles and their baseline permissions:\n" + "\n".join(f"- {r}" for r in cloned_roles) - await log_audit_event(self.engine, "Roles Cloned", audit_desc) - else: - await log_audit_event(self.engine, "Roles Cloned", "No new roles were cloned.") - - except Exception as e: - console.print(f"[bold red]Error during role migration: {str(e)}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - - elif choice == "2": - try: - await self.engine.start_connections() - - with console.status("[yellow]Analyzing categories and channels...[/yellow]"): - categories = await self.engine.discord_reader.get_categories() - channels = await self.engine.discord_reader.get_channels() - - mapped_cats = sum(1 for c in categories if self.engine.state.get_target_category_id(str(c.id))) - mapped_chs = sum(1 for c in channels if self.engine.state.get_target_channel_id(str(c.id))) - total_mapped = mapped_cats + mapped_chs - - console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]") - console.print("[bold green](Y) Proceed with Permission sync[/bold green]") - console.print("[bold yellow](B) Back[/bold yellow]") - - sub_choice = Prompt.ask("Select an option [Y/B]", choices=["Y", "y", "B", "b"], default="Y", show_choices=False).upper() - - if sub_choice == "B": - return - - console.print("\n[bold green]Syncing Category & Channel Permissions...[/bold green]") - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - RateLimitColumn(), - console=console - ) as progress: - - perm_task = progress.add_task("[cyan]Syncing Permissions...", total=100) - - async def update_progress(item_name: str, current: int, total: int): - progress.update(perm_task, total=total, completed=current, description=f"[cyan]Syncing: {item_name}") - - self.engine.is_running = True - synced_info = await roles_mod.sync_permissions(self.engine, progress_callback=update_progress) - - console.print("[bold green]Permission synchronization complete![/bold green]") - - if synced_info and synced_info.get("structure"): - lines = ["Successfully synchronized channel and category permission overrides:"] - # Sort categories but keep "No Category" at the bottom - cats = sorted(synced_info["structure"].keys(), key=lambda x: (x == "No Category", x)) - for cat_name in cats: - channel_names = synced_info["structure"][cat_name] - # For permissions, we show everything that was successfully synced - if cat_name in synced_info["categories_synced"] or channel_names: - lines.append(f"- **{cat_name}**") - for ch_name in sorted(channel_names): - lines.append(f" - {ch_name}") - - audit_desc = "\n".join(lines) - await log_audit_event(self.engine, "Permissions Synced", audit_desc) - else: - await log_audit_event(self.engine, "Permissions Synced", "No permissions were synchronized.") - - except Exception as e: - console.print(f"[bold red]Error during permission sync: {str(e)}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - - async def copy_emojis(self): - console.print("\n[yellow]Fetching emojis and stickers...[/yellow]") - - # Select module based on platform - asset_mod = stoat_emoji_stickers if self.target_platform == "stoat" else fluxer_emoji_stickers - - try: - await self.engine.start_connections() - - with console.status(f"[yellow]Checking {self.target_platform.capitalize()} for existing emojis and stickers...[/yellow]"): - await asset_mod.sync_assets_state(self.engine) - - emojis = await self.engine.discord_reader.get_emojis() - stickers = await self.engine.discord_reader.get_stickers() - - table = Table(show_header=True, header_style="bold #4641D9") - table.add_column("Type", width=10) - table.add_column("Name") - table.add_column("Status", justify="right") - - cached_emojis = 0 - for e in emojis: - already = self.engine.state.get_target_emoji_id(str(e.id)) - status = "[dim]already copied[/dim]" if already else "[bold green]NEW[/bold green]" - if already: cached_emojis += 1 - table.add_row("Emoji", e.name, status) - - cached_stickers = 0 - for s in stickers: - already = self.engine.state.get_target_sticker_id(str(s.id)) - status = "[dim]already copied[/dim]" if already else "[bold green]NEW[/bold green]" - if already: cached_stickers += 1 - table.add_row("Sticker", s.name, status) - - console.print(table) - - # Warn if everything is already in the state cache - force = False - total_items = len(emojis) + len(stickers) - cached_count = cached_emojis + cached_stickers - - - - console.print("\n(1) Sync Emojis only") - console.print("(2) Sync Stickers only") - console.print("(3) Sync Emojis and Stickers") - console.print("(B) Back") - - choice = Prompt.ask("Select an option [1/2/3/B]", choices=["1", "2", "3", "B", "b"], default="B", show_choices=False).upper() - - if choice == "B": - return - - types_to_include = [] - if choice == "1": - types_to_include = ["Emoji"] - elif choice == "2": - types_to_include = ["Sticker"] - elif choice == "3": - types_to_include = ["Emoji", "Sticker"] - - # Ask about force re-copy only if there are cached items in scope - cached_in_scope = 0 - if "Emoji" in types_to_include: - cached_in_scope += sum(1 for e in emojis if self.engine.state.get_target_emoji_id(str(e.id))) - if "Sticker" in types_to_include: - cached_in_scope += sum(1 for s in stickers if self.engine.state.get_target_sticker_id(str(s.id))) - - force = False - if cached_in_scope > 0: - console.print(f"\n[yellow]{cached_in_scope} item(s) already in state cache.[/yellow]") - console.print("[bold green](Y) Copy missing items only[/bold green]") - console.print("[bold red](F) Force Overwrite[/bold red]") - console.print("[bold yellow](B) Back[/bold yellow]") - - choice = Prompt.ask("Select an option [Y/F/B]", choices=["Y", "y", "F", "f", "B", "b"], default="Y", show_choices=False).upper() - - if choice == "B": - return - elif choice == "F": - force = True - # if 'Y', force remains False and we continue - - console.print("\n[bold green]Starting Migration...[/bold green]") - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - RateLimitColumn(), - console=console - ) as progress: - - emoji_task = progress.add_task("[cyan]Copying Assets...", total=100) - - async def update_progress(item_name: str, item_type: str, current: int, total: int): - progress.update(emoji_task, total=total, completed=current, description=f"[cyan]Copying {item_type}: {item_name}") - - self.engine.is_running = True - cloned_assets = await asset_mod.migrate_emojis( - self.engine, - progress_callback=update_progress, - types_to_include=types_to_include, - force=force - ) - - console.print("[bold green]Migration complete![/bold green]") - if cloned_assets and (cloned_assets.get("Emoji") or cloned_assets.get("Sticker")): - lines = [] - if cloned_assets.get("Emoji"): - lines.append("Successfully cloned these emojis:") - for name, e_id in cloned_assets["Emoji"].items(): - lines.append(f"- {name} <:{name}:{e_id}>") - if cloned_assets.get("Sticker"): - if lines: lines.append("") - lines.append("Successfully cloned these stickers:") - for name, s_id in cloned_assets["Sticker"].items(): - lines.append(f"- {name}") - - audit_desc = "\n".join(lines) - await log_audit_event(self.engine, "Emojis & Stickers Cloned", audit_desc) - else: - await log_audit_event(self.engine, "Emojis & Stickers Cloned", "No new emojis or stickers were cloned.") - - except Exception as e: - console.print(f"[bold red]Error during emoji migration: {str(e)}[/bold red]") - finally: - await self.engine.close_connections() - - self.engine.is_running = False - - async def sync_server_metadata(self): - console.print("\n[yellow]Fetching server metadata...[/yellow]") - try: - await self.engine.start_connections() - metadata = await self.engine.discord_reader.get_server_metadata() - - name = metadata.get("name") - icon_status = "[bold green]FOUND[/bold green]" if metadata.get("icon_url") else "[yellow]NOT FOUND[/yellow]" - banner_status = "[bold green]FOUND[/bold green]" if metadata.get("banner_url") else "[yellow]NOT FOUND[/yellow]" - - banner_status = "[bold green]FOUND[/bold green]" if metadata.get("banner_url") else "[yellow]NOT FOUND[/yellow]" - - table = Table(show_header=False, box=None) - table.add_column("Property", style="bold") - table.add_column("Value") - - table.add_row("Server Name:", name) - table.add_row("Server Icon:", icon_status) - table.add_row("Server Banner:", banner_status) - - console.print(Panel(table, title="[bold]Discord Server Metadata[/bold]", expand=False)) - - console.print("\n(1) Sync Name only") - console.print("(2) Sync Icon only") - console.print("(3) Sync Banner only") - console.print("(4) Sync Everything") - console.print("(B) Back") - - choice = Prompt.ask("Select an option [1/2/3/4/B]", choices=["1", "2", "3", "4", "B", "b"], default="B", show_choices=False).upper() - - if choice == "B": - return - - components = [] - if choice == "1": - components = ["name"] - elif choice == "2": - components = ["icon"] - elif choice == "3": - components = ["banner"] - elif choice == "4": - components = ["name", "icon", "banner"] - - console.print("\n[bold green]Syncing Server Metadata...[/bold green]") - - async def progress_callback(item: str, status: str): - color = "green" if status == "DONE" else "red" if status == "ERROR" else "yellow" - console.print(f"{item} [[bold {color}]{status}[/bold {color}]]") - - # Select appropriate metadata module - metadata_mod = fluxer_metadata if self.engine.target_platform == "fluxer" else stoat_metadata - - cloned_data = await metadata_mod.sync_server_metadata(self.engine, progress_callback, components=components) - console.print("[bold green]Server profile sync finished![/bold green]") - - audit_lines = ["Successfully synchronized Community profile:"] - if "name" in cloned_data: - audit_lines.append(f"- **Name**: {cloned_data['name']}") - if "icon" in cloned_data: - audit_lines.append(f"- **Icon**") - if "banner" in cloned_data: - audit_lines.append(f"- **Banner**") - - files = [] - if "icon" in cloned_data: - icon_ext = "gif" if cloned_data["icon"].startswith(b"GIF") else "png" - files.append({ - "filename": f"icon.{icon_ext}", - "data": cloned_data["icon"] - }) - if "banner" in cloned_data: - banner_ext = "gif" if cloned_data["banner"].startswith(b"GIF") else "png" - files.append({ - "filename": f"banner.{banner_ext}", - "data": cloned_data["banner"] - }) - - audit_desc = "\n".join(audit_lines) - if cloned_data: - await log_audit_event(self.engine, "Server Profile Synced", audit_desc, files=files) - else: - await log_audit_event(self.engine, "Server Profile Synced", "No profile information was synchronized.") - except Exception as e: - console.print(f"[bold red]Error during metadata sync: {str(e)}[/bold red]") - finally: - await self.engine.close_connections() - - async def migrate_message_history(self): - console.print("\n[bold]Message History Migration[/bold]") - - if not self.tokens_valid: - console.print("[bold red]Error: You must have a valid configuration (Option 6) to migrate messages.[/bold red]") - return - - try: - with console.status("[yellow]Fetching Discord channels & categories...[/yellow]"): - await self.engine.start_connections() - full_d_channels = await self.engine.discord_reader.get_channels() - d_channels = [c for c in full_d_channels if c.type in [discord.ChannelType.text, discord.ChannelType.news]] - d_categories = await self.engine.discord_reader.get_categories() - d_cat_map = {c.id: c.name for c in d_categories} - - if not d_channels: - console.print("[yellow]No text channels found in Discord server.[/yellow]") - return - - console.print("\n[bold]Select Source Discord Channel:[/bold]") - - # Identify duplicate names to show categories for them - d_name_counts = {} - for ch in d_channels: - d_name_counts[ch.name] = d_name_counts.get(ch.name, 0) + 1 - - for i, ch in enumerate(d_channels): - display_name = ch.name - if d_name_counts[ch.name] > 1: - cat_name = d_cat_map.get(ch.category_id) - if cat_name: - display_name = f"{ch.name} [{cat_name}]" - console.print(f"({i+1}) {display_name}") - - console.print("(B) Back") - d_choices = [str(i+1) for i in range(len(d_channels))] + ["B", "b"] - - prompt_msg = f"Select Discord Channel [[bold cyan](1-{len(d_channels)})/B[/bold cyan]]" - d_choice = Prompt.ask(prompt_msg, choices=d_choices, show_choices=False).upper() - - if d_choice == "B": - return - - source_channel = d_channels[int(d_choice) - 1] - - # Select appropriate migration module - migrate_mod = fluxer_migrate if self.engine.target_platform == "fluxer" else stoat_migrate - platform_name = self.engine.target_platform.capitalize() - - # 2. Select Target Channel - with console.status(f"[yellow]Fetching {platform_name} channels...[/yellow]"): - full_f_channels = await self.engine.writer.get_channels() - f_channels = [ - c for c in full_f_channels - if c.get('name') not in ["reaper_logs", "reaper-logs"] and c.get('type') not in [2, 4] - ] - if not f_channels: - console.print(f"[yellow]No channels found in {platform_name} community.[/yellow]") - return - - # Determine recommended channel - recommended_channel = None - # Priority 1: Check state.json mapping - mapped_id = self.engine.state.get_fluxer_channel_id(str(source_channel.id)) - if mapped_id: - recommended_channel = next((c for c in f_channels if str(c.get('id', '')) == mapped_id), None) - - # Priority 2: Check name match - if not recommended_channel: - recommended_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None) - - console.print(f"\n[bold]Select Target {platform_name} Channel:[/bold]") - - # Identify duplicate names to show categories for them - f_name_counts = {} - for ch in f_channels: - name = ch.get('name', 'Unnamed Channel') - f_name_counts[name] = f_name_counts.get(name, 0) + 1 - - # Get category map for target platform - # For Fluxer/Stoat, the channel object in f_channels contains 'parent_id' - # We need to resolve these IDs to names. - target_cat_names = {} - for ch in full_f_channels: # use full list including categories (type 4) - if ch.get('type') == 4: - target_cat_names[str(ch.get('id'))] = ch.get('name') - - for i, ch in enumerate(f_channels): - name = ch.get('name', 'Unnamed Channel') - display_name = name - if f_name_counts[name] > 1: - parent_id = ch.get('parent_id') - if parent_id and str(parent_id) in target_cat_names: - display_name = f"{name} [{target_cat_names[str(parent_id)]}]" - console.print(f"({i+1}) {display_name}") - - f_choices = [str(i+1) for i in range(len(f_channels))] + ["B", "b", "N", "n"] - - if recommended_channel: - console.print(f"(Y) [bold green]{recommended_channel.get('name')}[/bold green] (auto-matched)") - f_choices += ["Y", "y"] - - console.print("(N) Create new channel") - console.print("(B) Back") - - # Build simplified prompt string - prompt_parts = [f"(1-{len(f_channels)})", "B", "N"] - if recommended_channel: - prompt_parts.append("Y") - - prompt_msg = f"Select {platform_name} Channel [[bold cyan]{'/'.join(prompt_parts)}[/bold cyan]]" - - f_choice = Prompt.ask(prompt_msg, choices=f_choices, show_choices=False, default="Y" if recommended_channel else None).upper() - - if f_choice == "B": - return - - if f_choice == "Y" and recommended_channel: - target_channel = recommended_channel - elif f_choice == "N": - # Check if a channel with this name already exists - target_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None) - if not target_channel: - with console.status(f"[yellow]Creating {platform_name} channel #{source_channel.name}...[/yellow]"): - parent_id = None - if source_channel.category_id: - parent_id = self.engine.state.get_fluxer_category_id(str(source_channel.category_id)) - - topic = getattr(source_channel, 'topic', "") or "" - nsfw = getattr(source_channel, 'nsfw', False) - slowmode = getattr(source_channel, 'slowmode_delay', 0) - - new_id = await self.engine.writer.create_channel( - name=source_channel.name, - topic=topic, - type=0, - parent_id=parent_id, - nsfw=nsfw, - slowmode_delay=slowmode - ) - if new_id: - self.engine.state.set_channel_mapping(str(source_channel.id), new_id) - # Refresh list to get the channel object - f_channels = await self.engine.writer.get_channels() - target_channel = next((c for c in f_channels if str(c.get('id')) == new_id), None) - else: - console.print("[bold red]Failed to create channel.[/bold red]") - return - else: - target_channel = f_channels[int(f_choice) - 1] - - # 3. Handle Starting Message - with console.status("[yellow]Fetching first message...[/yellow]"): - first_msg = await self.engine.discord_reader.get_first_message(source_channel.id) - after_id = None - - if first_msg: - # Link format: https://discord.com/channels/GUILD_ID/CHANNEL_ID/MESSAGE_ID - first_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{first_msg.id}" - server_name = self.validation_results.get("discord_server_name", "server") - - # Check for existing migration - last_migrated_id = self.engine.state.get_last_message_id(str(target_channel.get('id'))) - next_msg = None - if last_migrated_id: - try: - # Try to fetch the immediate next message - async for msg in self.engine.discord_reader.fetch_message_history(source_channel.id, limit=1, after_id=int(last_migrated_id)): - next_msg = msg - break - except Exception as e: - # Assuming logger is defined elsewhere, if not, this would be an error. - # For this context, I'll assume it's available or a print is acceptable. - # If not, a simple print(f"Warning: Could not fetch next message after {last_migrated_id}: {e}") could be used. - # As per instructions, I'll use logger.warning. - import logging - logger = logging.getLogger(__name__) - logger.warning(f"Could not fetch next message after {last_migrated_id}: {e}") - - while True: - console.print(f"\n[bold green]Channel Found![/bold green]") - console.print(f"Oldest Message") - console.print(f"Link: {first_msg_link}") - console.print(f"Author: [blue]{first_msg.author.name}[/blue]") - console.print(f"Content Preview: {first_msg.content[:100]}") - - choices = ["M", "m", "B", "b"] - prompt_str = "Start migration" - - if next_msg: - next_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{next_msg.id}" - console.print(f"\n[bold yellow]Continue Next message[/bold yellow]") - console.print(f"Link: {next_msg_link}") - console.print(f"Author: [blue]{next_msg.author.name}[/blue]") - console.print(f"Content Preview: {next_msg.content[:100]}") - - console.print("\n(F) [red]Force start from oldest message[/red]") - console.print("(M) Provide Specific message link or ID") - console.print("(Y) [green]Continue Migration[/green]") - console.print("(B) Back") - - choices.extend(["Y", "y", "F", "f"]) - prompt_str = "Select an option [Y/F/M/B]" - default_choice = "Y" - else: - console.print("\n(Y) [green]Yes, start from oldest message[/green]") - console.print("(M) Provide Specific message link or ID") - console.print("(B) Back") - - choices.extend(["Y", "y"]) - prompt_str = "Start migration [Y/M/B]" - default_choice = "Y" - - start_mode = Prompt.ask(prompt_str, choices=choices, default=default_choice, show_choices=False).upper() - - if start_mode == "B": - return - elif start_mode == "Y": - if next_msg: - after_id = int(last_migrated_id) - break - elif start_mode == "F": - # User wants to ignore the previous migration and start from beginning - after_id = None - break - elif start_mode == "M": - prompt_msg = "Enter Discord Message Link or Message ID (or 'B' to go back)" - valid_message_found = False - while True: - custom_link = Prompt.ask(prompt_msg) - if str(custom_link).upper() in ["B", "BACK"]: - break - try: - # Extract message ID from end of link - custom_id = int(str(custom_link).strip().split("/")[-1]) - # Check if message exists to give feedback - msg = await self.engine.discord_reader.get_message(source_channel.id, custom_id) - if msg: - # To include this message, we start 'after' the ID before it - after_id = custom_id - 1 - console.print(f"\n[green]Confirmed: Starting from {msg.author.name}'s message.[/green]") - valid_message_found = True - break - else: - console.print("[red]Could not find message. Ensure it exists in this channel.[/red]") - except ValueError: - console.print("[red]Invalid ID or Link. Please provide a valid numeric ID or Discord Message URL.[/red]") - except Exception as e: - console.print(f"[red]Error fetching message: {str(e)}[/red]") - - if valid_message_found: - break - else: - console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]") - return - - # 4. Analysis and Confirmation - console.print("\n[yellow]Analyzing channel content...[/yellow]") - self.engine.is_running = True - stats = {"messages": 0, "threads": 0, "attachments": 0} - try: - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - console=console - ) as progress: - task = progress.add_task("[cyan]Scanning history...", total=None) - async def update_scan_progress(count: int): - progress.update(task, description=f"[cyan]Scanned {count} items...") - - stats = await migrate_mod.analyze_migration( - self.engine, - source_channel_id=source_channel.id, - after_message_id=after_id, - progress_callback=update_scan_progress - ) - finally: - self.engine.is_running = False - - table = Table(show_header=True, header_style="bold #4641D9", title="Migration Summary & Estimates") - table.add_column("Item", width=15) - table.add_column("Count", justify="right", width=10) - table.add_column("Overhead/Details") - - msg_time = stats['messages'] * self.config.migration.rate_limit_delay_seconds - table.add_row( - "Messages", - f"[bold cyan]{stats['messages']}[/bold cyan]", - f"~{msg_time}s delay (rate limiting), {stats['messages']} API writes" - ) - table.add_row( - "Threads", - f"[bold cyan]{stats['threads']}[/bold cyan]", - f"{stats['threads'] * 2} extra marker messages, {stats['threads']} extra history fetches" - ) - table.add_row( - "Attachments", - f"[bold cyan]{stats['attachments']}[/bold cyan]", - f"{stats['attachments']} downloads and uploads (bandwidth & API calls)" - ) - - console.print("") - console.print(table) - - if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to {platform_name} [#4641D9]#{target_channel.get('name')}[/#4641D9]?"): - return - - # 5. Migration Execution - console.print("\n[bold green]Starting Migration...[/bold green]") - self.engine.is_running = True - - total_messages = stats['messages'] - - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - RateLimitColumn(), - console=console - ) as progress: - task = progress.add_task(f"[cyan]Migrating 0/{total_messages} messages...", total=total_messages) - - async def update_msg_progress(count: int): - progress.update(task, completed=count, description=f"[cyan]Migrated {count}/{total_messages} messages...") - - result_stats = await migrate_mod.migrate_messages( - self.engine, - source_channel_id=source_channel.id, - target_channel_id=target_channel.get("id"), - after_message_id=after_id, - progress_callback=update_msg_progress - ) - - if not self.engine.is_running: - console.print(f"\n[bold yellow]Migration Interrupted! {result_stats['messages']} messages migrated to {target_channel.get('name')}.[/bold yellow]") - event_title = "Message History Migration Interrupted" - else: - console.print(f"\n[bold green]Success! {result_stats['messages']} messages migrated to {target_channel.get('name')}.[/bold green]") - event_title = "Message History Migrated" - - lines = [f"Migrated messages from Discord #{source_channel.name} to {platform_name} #{target_channel.get('name')}:"] - - if result_stats.get('first_message_url') or result_stats.get('last_message_url'): - lines.append(f"**Message Info:**") - if result_stats.get('first_message_url'): - lines.append(f"First message: <{result_stats['first_message_url']}>") - if result_stats.get('last_message_url'): - lines.append(f"Last message: <{result_stats['last_message_url']}>") - - lines.append(f"**Stats:**") - lines.append(f"{result_stats['messages']} messages") - lines.append(f"{result_stats['attachments']} attachments") - lines.append(f"{result_stats['threads']} threads") - - audit_desc = "\n".join(lines) - await log_audit_event(self.engine, event_title, audit_desc) - - except Exception as e: - err_str = str(e) - if "MissingPermission" in err_str and "Masquerade" in err_str: - console.print(f"\n[bold red]Migration stopped: Bot is missing the 'Masquerade' permission.[/bold red]") - console.print(f"[yellow]Grant the 'Masquerade' permission to the bot's role in your Stoat server settings, then retry.[/yellow]") - elif "MissingPermission" in err_str: - console.print(f"\n[bold red]Migration stopped: Bot is missing a required permission.[/bold red]") - console.print(f"[yellow]Error: {err_str}[/yellow]") - console.print(f"[yellow]Grant the required permission to the bot's role in your Stoat server settings, then retry.[/yellow]") - else: - console.print(f"[bold red]Migration encountered an error: {err_str}[/bold red]") - finally: - self.engine.is_running = False - await self.engine.close_connections() - - async def danger_zone(self): - """Danger Zone – irreversible destructive operations on the target community.""" - if self.target_platform == "fluxer": - from src.fluxer.danger_zone import danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers - else: - from src.stoat.danger_zone import danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers - - console.print("") - console.print(Panel.fit( - "[bold red]\u26a0 DANGER ZONE \u26a0[/bold red]\n" - f"[yellow]These actions are PERMANENT and IRREVERSIBLE on your {self.target_platform.capitalize()} community.[/yellow]\n" - "[yellow]Always double-check before confirming.[/yellow]", - style="bold red" - )) - console.print("(1) Delete all Channels & Categories") - console.print("(2) Reset Channel & Category Permissions") - console.print("(3) Delete all Roles [dim](bot role is protected)[/dim]") - console.print("(4) Delete all custom Emojis & Stickers") - console.print("(B) Back") - - choice = Prompt.ask( - "Select a Danger Zone option [1/2/3/4/B]", - choices=["1", "2", "3", "4", "B", "b"], - default="B", - show_choices=False - ).upper() - - if choice == "B": - return - - # ---- (1) Delete all Channels & Categories ---- - if choice == "1": - console.print("") - if self.target_platform == "fluxer": - community_name = self.validation_results.get("fluxer_community_name", "Unknown") - else: - community_name = self.validation_results.get("stoat_server_name", "Unknown") - console.print(f"[bold red]This will DELETE every channel and category in the {self.target_platform.capitalize()} community: \"{community_name}\"[/bold red]") - if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): - return - if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"): - return - try: - await self.engine.start_target_only() - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - console=console - ) as progress: - del_task = progress.add_task("[red]Deleting channels...", total=100) - - async def on_channel_deleted(name: str, current: int, total: int): - progress.update(del_task, total=total, completed=current, - description=f"[red]Deleting: {name}") - - count = await danger_delete_all_channels(self.engine, progress_callback=on_channel_deleted) - console.print(f"[bold green]{count} channels/categories deleted.[/bold green]") - await log_audit_event(self.engine, "Danger Zone: Channels Wiped", f"Deleted {count} channels and categories from the community.") - console.print("[bold green]Done.[/bold green]") - except Exception as e: - console.print(f"[bold red]Error: {e}[/bold red]") - finally: - await self.engine.close_target_only() - - # ---- (2) Reset Channel & Category Permissions ---- - elif choice == "2": - console.print("") - if self.target_platform == "fluxer": - community_name = self.validation_results.get("fluxer_community_name", "Unknown") - else: - community_name = self.validation_results.get("stoat_server_name", "Unknown") - console.print(f"[bold red]This will RESET all permission overwrites on every channel and category in the {self.target_platform.capitalize()} community: \"{community_name}\"[/bold red]") - if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): - return - if not Confirm.ask("[bold red]Last chance \u2013 all custom permission overrides will be wiped. Continue?[/bold red]"): - return - try: - await self.engine.start_target_only() - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - console=console - ) as progress: - perm_task = progress.add_task("[red]Resetting permissions...", total=100) - - async def on_perm_reset(name: str, current: int, total: int): - progress.update(perm_task, total=total, completed=current, - description=f"[red]Resetting: {name}") - - count = await danger_reset_channel_permissions(self.engine, progress_callback=on_perm_reset) - console.print(f"[bold green]Permissions reset on {count} channels/categories.[/bold green]") - await log_audit_event(self.engine, "Danger Zone: Permissions Wiped", f"Administrators reset permissions on {count} channels and categories.") - console.print("[bold green]Done.[/bold green]") - except Exception as e: - console.print(f"[bold red]Error: {e}[/bold red]") - finally: - await self.engine.close_target_only() - - # ---- (3) Delete all Roles ---- - elif choice == "3": - console.print("") - if self.target_platform == "fluxer": - community_name = self.validation_results.get("fluxer_community_name", "Unknown") - else: - community_name = self.validation_results.get("stoat_server_name", "Unknown") - console.print(f"[bold red]This will DELETE all roles in the {self.target_platform.capitalize()} community: \"{community_name}\"[/bold red]") - console.print("[dim]Managed roles (including the bot's own role) and @everyone are automatically protected.[/dim]") - if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): - return - if not Confirm.ask("[bold red]Last chance \u2013 confirm permanent role deletion?[/bold red]"): - return - try: - await self.engine.start_target_only() - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - console=console - ) as progress: - role_task = progress.add_task("[red]Deleting roles...", total=100) - - async def on_role_deleted(name: str, current: int, total: int): - progress.update(role_task, total=total, completed=current, - description=f"[red]Deleting role: {name}") - - count = await danger_delete_all_roles(self.engine, progress_callback=on_role_deleted) - console.print(f"[bold green]{count} roles deleted.[/bold green]") - await log_audit_event(self.engine, "Danger Zone: Roles Wiped", f"Deleted {count} roles from the community.") - console.print("[bold green]Done.[/bold green]") - except Exception as e: - console.print(f"[bold red]Error: {e}[/bold red]") - finally: - await self.engine.close_target_only() - - # ---- (4) Delete all Emojis & Stickers ---- - elif choice == "4": - console.print("") - if self.target_platform == "fluxer": - community_name = self.validation_results.get("fluxer_community_name", "Unknown") - else: - community_name = self.validation_results.get("stoat_server_name", "Unknown") - console.print(f"[bold red]This will DELETE all custom emojis and stickers in the {self.target_platform.capitalize()} community: \"{community_name}\"[/bold red]") - if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): - return - if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"): - return - try: - await self.engine.start_target_only() - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TaskProgressColumn(), - console=console - ) as progress: - asset_task = progress.add_task("[red]Deleting assets...", total=100) - - async def on_asset_deleted(name: str, asset_type: str, current: int, total: int): - progress.update(asset_task, total=total, completed=current, - description=f"[red]Deleting {asset_type}: {name}") - - counts = await danger_delete_all_emojis_and_stickers(self.engine, progress_callback=on_asset_deleted) - console.print(f"[bold green]{counts.get('emojis', 0)} emojis deleted.[/bold green]") - console.print(f"[bold green]{counts.get('stickers', 0)} stickers deleted.[/bold green]") - await log_audit_event(self.engine, "Danger Zone: Assets Wiped", f"Deleted {counts.get('emojis', 0)} emojis and {counts.get('stickers', 0)} stickers.") - console.print("[bold green]Done.[/bold green]") - except Exception as e: - console.print(f"[bold red]Error: {e}[/bold red]") - finally: - await self.engine.close_target_only() - - - -async def run_cli(target_platform="fluxer", config_path="config.yaml"): - cli = MigrationCLI(target_platform, config_path) - await cli.run() diff --git a/src/ui/shuttle_ops.py b/src/ui/shuttle_ops.py index 2b24546..de7043a 100644 --- a/src/ui/shuttle_ops.py +++ b/src/ui/shuttle_ops.py @@ -4,7 +4,6 @@ Embedded inside ModeScreen's "Migrate" tab. """ import asyncio -import discord import logging import re import time @@ -13,7 +12,7 @@ import traceback from pathlib import Path from textual.app import ComposeResult -from textual.containers import Container, Vertical, VerticalScroll +from textual.containers import Container, Vertical, Horizontal, VerticalScroll from textual.widgets import Button, Label, Rule from textual import work @@ -86,6 +85,11 @@ class ShuttlePane(Container): ShuttlePane #sp_info { height: auto; border: tall cyan; padding: 1; margin-bottom: 1; } + #sp_info_split { height: auto; layout: horizontal; } + .info_pane { width: 1fr; height: auto; } + .info_pane Label { width: 100%; } + .pane_header { text-style: bold; color: $accent; margin-bottom: 1; } + ShuttlePane #sp_actions { height: auto; } ShuttlePane #sp_actions Button { width: 100%; margin-bottom: 1; } """ @@ -104,8 +108,18 @@ class ShuttlePane(Container): def compose(self) -> ComposeResult: with VerticalScroll(): with Vertical(id="sp_info"): - yield Label("Discord: [yellow]Loading...[/yellow]", id="sp_lbl_discord") - yield Label("Target: [yellow]Loading...[/yellow]", id="sp_lbl_target") + with Horizontal(id="sp_info_split"): + with Vertical(classes="info_pane"): + yield Label("Discord", classes="pane_header") + yield Label("Server: [yellow]Loading...[/yellow]", id="sp_lbl_d_server") + yield Label("Bot: [yellow]Loading...[/yellow]", id="sp_lbl_d_bot") + + with Vertical(classes="info_pane"): + yield Label("Target", id="sp_lbl_t_header", classes="pane_header") + yield Label("Community: [yellow]Loading...[/yellow]", id="sp_lbl_t_comm") + yield Label("Bot: [yellow]Loading...[/yellow]", id="sp_lbl_t_bot") + + yield Rule() yield Label("Status: [yellow]Validating...[/yellow]", id="sp_lbl_status") with Vertical(id="sp_actions"): yield Button("Clone Server Template", id="sp_clone", disabled=True) @@ -138,28 +152,39 @@ class ShuttlePane(Container): # Discord d_name = v.get("discord_server_name") d_bot = v.get("discord_bot_name") + if v.get("discord_timeout"): - d_disp = "[red]TIMEOUT[/red]" - elif d_name and v.get("discord_token") and v.get("discord_server"): - d_disp = f'[green]"{d_name}"[/green] Bot: [green]{d_bot}[/green]' + s_disp, b_disp = "[red]TIMEOUT[/red]", "[red]TIMEOUT[/red]" + elif v.get("discord_token") and v.get("discord_server"): + s_disp = f'[green]"{d_name}"[/green]' + b_disp = f'[green]{d_bot}[/green]' elif v.get("discord_token") is False: - d_disp = "[red]INVALID TOKEN[/red]" + s_disp, b_disp = "[red]INVALID TOKEN[/red]", "[red]INVALID TOKEN[/red]" else: - d_disp = "[red]NOT SET UP[/red]" - self.query_one("#sp_lbl_discord", Label).update(f"Discord: {d_disp}") + s_disp, b_disp = "[red]NOT SET UP[/red]", "[red]NOT SET UP[/red]" + + self.query_one("#sp_lbl_d_server", Label).update(f"Server: {s_disp}") + self.query_one("#sp_lbl_d_bot", Label).update(f"Bot: {b_disp}") # Target plat = "Fluxer" if self.target_platform == "fluxer" else "Stoat" t_name = v.get("target_community_name") + t_bot = v.get("target_bot_name") + + self.query_one("#sp_lbl_t_header", Label).update(plat) + if v.get("target_timeout"): - t_disp = "[red]TIMEOUT[/red]" - elif t_name and v.get("target_token") and v.get("target_community"): - t_disp = f'[green]"{t_name}"[/green]' + c_disp, tb_disp = "[red]TIMEOUT[/red]", "[red]TIMEOUT[/red]" + elif v.get("target_token") and v.get("target_community"): + c_disp = f'[green]"{t_name}"[/green]' + tb_disp = f'[green]{t_bot}[/green]' elif v.get("target_token") is False: - t_disp = "[red]INVALID TOKEN[/red]" + c_disp, tb_disp = "[red]INVALID TOKEN[/red]", "[red]INVALID TOKEN[/red]" else: - t_disp = "[red]NOT SET UP[/red]" - self.query_one("#sp_lbl_target", Label).update(f"{plat}: {t_disp}") + c_disp, tb_disp = "[red]NOT SET UP[/red]", "[red]NOT SET UP[/red]" + + self.query_one("#sp_lbl_t_comm", Label).update(f"Community: {c_disp}") + self.query_one("#sp_lbl_t_bot", Label).update(f"Bot: {tb_disp}") # Status if not self.tokens_valid: @@ -299,11 +324,11 @@ class ShuttlePane(Container): def _open_sync_menu(self): options = [ - ("sub_emoji", "Sync Emojis"), - ("sub_sticker", "Sync Stickers"), - ("sub_name", "Sync Server Name"), - ("sub_icon", "Sync Server Icon"), - ("sub_banner", "Sync Server Banner"), + ("sub_emoji", "Custom Emojis"), + ("sub_sticker", "Custom Stickers"), + ("sub_name", "Server Name"), + ("sub_icon", "Server Icon"), + ("sub_banner", "Server Banner"), ] def on_result(choices): if choices: @@ -664,7 +689,7 @@ class ShuttlePane(Container): await self.engine.start_connections() full_d = await self.engine.discord_reader.get_channels() - d_channels = [c for c in full_d if c.type in [discord.ChannelType.text, discord.ChannelType.news]] + d_channels = [c for c in full_d if c.type in [self.engine.discord_reader.CHANNEL_TYPE_TEXT, self.engine.discord_reader.CHANNEL_TYPE_NEWS]] d_cats = await self.engine.discord_reader.get_categories() d_cat_map = {c.id: c.name for c in d_cats} diff --git a/src/ui/unifier_tui.py b/src/ui/unifier_tui.py deleted file mode 100644 index d0ec923..0000000 --- a/src/ui/unifier_tui.py +++ /dev/null @@ -1,1013 +0,0 @@ -import sys -import asyncio -import discord -import logging -import json -import re -from pathlib import Path -from datetime import datetime - -from textual.app import App, ComposeResult -from textual.containers import Container, Horizontal, Vertical, VerticalScroll, Grid -from textual.widgets import Header, Footer, Button, Static, Label, Input, Checkbox, RadioButton, ProgressBar, RichLog, Rule -from textual.screen import Screen, ModalScreen -from textual import work - -from src.core.configuration import load_config, save_config -from src.core.base import MigrationContext -from src.disco_reaper.exporter import DiscordExporter - -import src.fluxer.roles_permissions as fluxer_roles -import src.stoat.roles_permissions as stoat_roles -import src.fluxer.emoji_stickers as fluxer_emoji_stickers -import src.stoat.emoji_stickers as stoat_emoji_stickers -import src.fluxer.server_metadata as fluxer_metadata -import src.stoat.server_metadata as stoat_metadata -import src.fluxer.migrate_message as fluxer_migrate -import src.stoat.migrate_message as stoat_migrate -from src.core.audit import log_audit_event - -# ------------------------------------------------------------------------- -# Shared Modals -# ------------------------------------------------------------------------- - -class ConfigModal(ModalScreen[dict]): - """Modal for configuring token and server ID (Reaper focus but usable everywhere).""" - - def __init__(self, current_token: str, current_server: str): - super().__init__() - self.current_token = current_token - self.current_server = current_server - - def compose(self) -> ComposeResult: - with Vertical(id="config_dialog"): - yield Label("Discord Configuration", id="config_title") - yield Label("Discord Bot Token:") - yield Input(value=self.current_token, id="token_input") - yield Label("Discord Server ID:") - yield Input(value=self.current_server, id="server_input") - with Horizontal(id="config_buttons"): - yield Button("Save", variant="success", id="btn_save") - yield Button("Cancel", variant="primary", id="btn_cancel") - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_save": - token = self.query_one("#token_input", Input).value - server = self.query_one("#server_input", Input).value - self.dismiss({"token": token, "server": server}) - elif event.button.id == "btn_cancel": - self.dismiss(None) - -class ChannelSelectModal(ModalScreen[dict]): - """Modal for selecting channels using a simple checkbox list.""" - - def __init__(self, channels: list, categories: dict, backed_up_ids: set, any_found: bool): - super().__init__() - self.channels_by_category = {} - for c in channels: - cat_id = getattr(c, 'category_id', None) - if cat_id not in self.channels_by_category: - self.channels_by_category[cat_id] = [] - self.channels_by_category[cat_id].append(c) - - self.categories = categories - self.backed_up_ids = backed_up_ids - self.any_found = any_found - - def compose(self) -> ComposeResult: - with Vertical(id="channel_dialog"): - yield Label(f"Select Channels to Backup", id="chan_title") - - with VerticalScroll(id="channel_list_scroll"): - cat_ids = sorted([k for k in self.channels_by_category.keys() if k is not None], - key=lambda k: self.categories.get(k, "")) - - if None in self.channels_by_category: - for c in sorted(self.channels_by_category[None], key=lambda x: x.position if hasattr(x, 'position') else 0): - label = f"{c.name}" - color = "green" if c.id in self.backed_up_ids else "white" - yield RadioButton(f"[{color}]{label}[/]", value=False, id=f"chan_{c.id}") - - for cat_id in cat_ids: - cat_name = self.categories.get(cat_id, "Unknown Category") - yield Label(f"[cyan]{cat_name}[/cyan]", classes="category_header") - for c in sorted(self.channels_by_category[cat_id], key=lambda x: x.position if hasattr(x, 'position') else 0): - label = f"{c.name}" - color = "green" if c.id in self.backed_up_ids else "white" - yield RadioButton(f"[{color}]{label}[/]", value=False, id=f"chan_{c.id}") - - with Horizontal(id="select_all_buttons"): - yield Button("Select All", id="btn_all") - yield Button("Deselect All", id="btn_none") - - with Horizontal(id="confirm_buttons"): - if self.any_found: - yield Label("Existing backups found:", classes="label_warning") - yield Button("Sync", variant="success", id="btn_sync") - yield Button("Force Overwrite", variant="error", id="btn_force") - else: - yield Button("Backup", variant="success", id="btn_backup") - yield Button("Cancel", id="btn_cancel_chan") - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_all": - for cb in self.query(RadioButton): - cb.value = True - elif event.button.id == "btn_none": - for cb in self.query(RadioButton): - cb.value = False - elif event.button.id in ["btn_sync", "btn_force", "btn_backup"]: - selected = [] - for cb in self.query(RadioButton): - if cb.value and cb.id and cb.id.startswith("chan_"): - chan_id = int(cb.id.split("_")[1]) - selected.append(chan_id) - if not selected: - return - - force = event.button.id == "btn_force" - self.dismiss({"channels": selected, "force": force}) - elif event.button.id == "btn_cancel_chan": - self.dismiss(None) - -class ProgressModal(ModalScreen[None]): - """Modal to show progress of operations.""" - - def compose(self) -> ComposeResult: - with Vertical(id="progress_dialog"): - yield Label("Operation Status", id="progress_status") - yield ProgressBar(total=None, show_eta=False, id="progress_bar") - yield RichLog(id="progress_log", highlight=True, markup=True) - yield Button("Close", id="btn_close_progress", disabled=True) - - def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "btn_close_progress": - self.dismiss(None) - - def write_to_log(self, message: str): - self.query_one("#progress_log", RichLog).write(message) - - def set_status(self, status: str): - self.query_one("#progress_status", Label).update(status) - - def allow_close(self): - btn = self.query_one("#btn_close_progress", Button) - btn.disabled = False - btn.variant = "success" - pb = self.query_one("#progress_bar", ProgressBar) - if pb.total is None: pb.total = 100 - pb.progress = pb.total - -# ------------------------------------------------------------------------- -# Reaper Screen -# ------------------------------------------------------------------------- - -class ReaperScreen(Screen): - BINDINGS = [ - ("q", "app.exit", "Quit"), - ("c", "config", "Config"), - ("escape", "app.pop_screen", "Back"), - ] - - def __init__(self): - super().__init__() - self.validation_results = {} - self.tokens_valid = False - - def compose(self) -> ComposeResult: - yield Header(show_clock=True) - with Container(id="main_container"): - yield Label("Disco Reaper - Server Backup Tool", id="title_label") - with Vertical(id="info_container"): - yield Label("Loading...", id="lbl_server") - yield Label("", id="lbl_bot") - yield Label("", id="lbl_backup") - with Vertical(id="actions_container"): - yield Button("Backup Server Profile", id="btn_backup_profile", disabled=True) - yield Button("Backup Channel Messages", id="btn_backup_msgs", disabled=True) - yield Button("Update & Sync Backup", id="btn_backup_sync", disabled=True) - yield Rule() - yield Button("Configuration", id="btn_config") - yield Button("Back to Menu", id="btn_back") - yield Footer() - - def on_mount(self) -> None: - self.validate_config() - - @property - def config(self): return self.app.config - @property - def engine(self): return self.app.reaper_engine - @property - def exporter(self): return self.app.exporter - - @work(exclusive=True) - async def validate_config(self) -> None: - def update_ui(server_text, bot_text, backup_text, buttons_enabled): - self.query_one("#lbl_server", Label).update(f"Source Server: {server_text}") - self.query_one("#lbl_bot", Label).update(f"Bot name: {bot_text}") - self.query_one("#lbl_backup", Label).update(backup_text) - for btn_id in ["#btn_backup_profile", "#btn_backup_msgs", "#btn_backup_sync"]: - self.query_one(btn_id, Button).disabled = not buttons_enabled - - d_token = self.config.discord_bot_token - fillers = ["DISCORD_BOT_TOKEN", "000000000000000000", "DISCORD_SERVER_ID", "", None] - if d_token in fillers or self.config.discord_server_id in fillers: - update_ui("[red]NOT CONNECTED[/red]", "[red]UNKNOWN[/red]", "", False) - self.tokens_valid = False - return - - update_ui("[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", "", False) - - try: - res = await self.engine.discord_reader.validate() - except Exception as e: - res = {} - update_ui(f"[red]Validation Error: {e}[/red]", "", "", False) - self.tokens_valid = False - return - - self.validation_results["discord_token"] = res.get("token", False) - self.validation_results["discord_bot_name"] = res.get("bot_name") - self.validation_results["discord_server"] = res.get("server", False) - self.validation_results["discord_server_name"] = res.get("server_name") - - self.tokens_valid = res.get("token") and res.get("server") - - d_name = self.validation_results.get("discord_server_name") - server_display = f"[green]\"{d_name}\"[/green]" if d_name else "[red]NOT CONNECTED[/red]" - - b_name = self.validation_results.get("discord_bot_name") - bot_display = f"[green]\"{b_name}\"[/green]" if b_name else "[red]UNKNOWN[/red]" - - backup_text = "" - if self.tokens_valid: - backup_ts = self.get_backup_info() - if backup_ts: - backup_text = f"Backup Found: [yellow]{backup_ts}[/yellow]" - - update_ui(server_display, bot_display, backup_text, self.tokens_valid) - - def get_backup_info(self): - d_name = self.validation_results.get("discord_server_name") - d_id = self.config.discord_server_id - if not d_name or not d_id: - return None - - safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', d_name) - export_path = Path(".") / f"EXPORT-{safe_name}-{d_id}" - profile_file = export_path / "server_profile.json" - - if profile_file.exists(): - try: - with open(profile_file, "r", encoding="utf-8") as f: - data = json.load(f) - ts_str = data.get("last_backup") - if ts_str: - dt = datetime.fromisoformat(ts_str) - return dt.strftime("%d-%b-%Y %H:%M") - except Exception: - pass - return None - - def action_config(self) -> None: - self.open_config() - - def open_config(self) -> None: - def check_reply(reply: dict | None) -> None: - if reply is not None: - self.config.discord_bot_token = reply["token"] - self.config.discord_server_id = reply["server"] - save_config(self.config, self.app.config_path) - self.app.reinit_engines() - self.validate_config() - - self.app.push_screen( - ConfigModal(self.config.discord_bot_token, self.config.discord_server_id), - check_reply - ) - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_back": - self.app.pop_screen() - elif event.button.id == "btn_config": - self.open_config() - elif event.button.id == "btn_backup_profile": - self.run_backup_profile() - elif event.button.id == "btn_backup_msgs": - self.run_backup_messages() - elif event.button.id == "btn_backup_sync": - self.run_backup_sync() - - @work(exclusive=True) - async def run_backup_profile(self) -> None: - modal = ProgressModal() - self.app.push_screen(modal) - await asyncio.sleep(0.1) - - try: - modal.set_status("Starting readers...") - await self.engine.discord_reader.start() - meta = await self.exporter.setup() - - modal.write_to_log("[yellow]Backing up server profile & skeleton...[/yellow]") - await self.exporter.export_metadata() - await self.exporter.download_server_assets() - - modal.write_to_log("Exporting structure...") - _, cat_count, chan_count = await self.exporter.export_channels_structure() - - modal.write_to_log("Exporting assets...") - e_count, s_count = await self.exporter.export_assets() - - modal.write_to_log(f"[bold green]Server Profile backed up to: {self.exporter.export_path}[/bold green]") - modal.write_to_log(f"- {cat_count} categories & {chan_count} channels") - modal.write_to_log(f"- {e_count} emojis, {s_count} stickers.") - - except discord.Forbidden as e: - modal.write_to_log(f"[bold red]Backup failed: {e}[/bold red]") - except Exception as e: - modal.write_to_log(f"[bold red]Error: {e}[/bold red]") - finally: - await self.engine.close_connections() - modal.set_status("Finished.") - modal.allow_close() - - @work(exclusive=True) - async def run_backup_messages(self) -> None: - modal_prog = ProgressModal() - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - try: - modal_prog.set_status("Fetching channels...") - await self.engine.discord_reader.start() - await self.exporter.setup() - - await self.exporter.export_channels_structure() - all_channels = await self.engine.discord_reader.get_channels() - all_categories = await self.engine.discord_reader.get_categories() - cat_map = {c.id: c.name for c in all_categories} - - eligible_channels = [ - c for c in all_channels - if c.type in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.forum] - ] - - if not eligible_channels: - modal_prog.write_to_log("[yellow]No text/news channels found to backup.[/yellow]") - modal_prog.allow_close() - return - - any_found = False - backed_up_ids = set() - for chan in eligible_channels: - if (self.exporter.export_path / "message_backup" / f"{chan.id}.json").exists(): - any_found = True - backed_up_ids.add(chan.id) - - self.app.pop_screen() - - loop = asyncio.get_running_loop() - future = loop.create_future() - - def check_channels(reply: dict | None) -> None: - if not future.done(): - future.set_result(reply) - - self.app.push_screen( - ChannelSelectModal(eligible_channels, cat_map, backed_up_ids, any_found), - check_channels - ) - - reply = await future - if not reply: - return - - selected_ids = reply["channels"] - force_overwrite = reply["force"] - - selected_channels = [c for c in eligible_channels if c.id in selected_ids] - - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - total_chans = len(selected_channels) - modal_prog.set_status("Backing up messages...") - modal_prog.write_to_log(f"[yellow]Starting backup for {total_chans} channels...[/yellow]") - - for chan in selected_channels: - backup_exists = (self.exporter.export_path / "message_backup" / f"{chan.id}.json").exists() - is_sync = backup_exists and not force_overwrite - - label = "Syncing Backup" if is_sync else "Backing up" - modal_prog.write_to_log(f"[cyan]{label}: {chan.name}[/cyan]") - - async def update_msg_count(name, count): - modal_prog.set_status(f"{name}: {count} messages") - - await self.exporter.export_channel_messages(chan.id, progress_callback=update_msg_count, force=force_overwrite) - await self.exporter.export_threads(chan.id, progress_callback=update_msg_count, force=force_overwrite) - - modal_prog.write_to_log(f"[green]Completed: {chan.name}[/green]") - - await self.exporter.export_metadata() - modal_prog.write_to_log("[bold green]Message backup complete![/bold green]") - - except Exception as e: - modal_prog.write_to_log(f"[bold red]Message backup failed: {e}[/bold red]") - finally: - await self.engine.close_connections() - modal_prog.set_status("Finished.") - modal_prog.allow_close() - - @work(exclusive=True) - async def run_backup_sync(self) -> None: - modal_prog = ProgressModal() - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - try: - modal_prog.set_status("Starting sync...") - await self.engine.discord_reader.start() - await self.exporter.setup() - - modal_prog.write_to_log("Updating structure...") - await self.exporter.export_metadata() - await self.exporter.download_server_assets() - await self.exporter.export_channels_structure() - await self.exporter.export_assets() - - all_channels = await self.engine.discord_reader.get_channels() - eligible_channels = [ - c for c in all_channels - if c.type in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.forum] - ] - - selected_channels = [ - c for c in eligible_channels - if (self.exporter.export_path / "message_backup" / f"{c.id}.json").exists() - ] - - if not selected_channels: - modal_prog.write_to_log("[yellow]No existing backups found to sync.[/yellow]") - else: - modal_prog.write_to_log(f"[yellow]Syncing {len(selected_channels)} channels...[/yellow]") - for chan in selected_channels: - modal_prog.write_to_log(f"[cyan]Syncing: {chan.name}[/cyan]") - - async def update_msg_count(name, count): - modal_prog.set_status(f"{name}: {count} messages") - - await self.exporter.export_channel_messages(chan.id, progress_callback=update_msg_count, force=False) - await self.exporter.export_threads(chan.id, progress_callback=update_msg_count, force=False) - modal_prog.write_to_log(f"[green]Synced: {chan.name}[/green]") - - await self.exporter.export_metadata() - modal_prog.write_to_log("[bold green]Sync operation complete![/bold green]") - - except Exception as e: - modal_prog.write_to_log(f"[bold red]Sync failed: {e}[/bold red]") - finally: - await self.engine.close_connections() - modal_prog.set_status("Finished.") - modal_prog.allow_close() - -# ------------------------------------------------------------------------- -# Shuttle Screen -# ------------------------------------------------------------------------- - -class ShuttleConfigModal(ModalScreen[dict]): - """Modal for configuring Shuttle targeted platform and tokens.""" - - def __init__(self, config): - super().__init__() - self.config = config - - def compose(self) -> ComposeResult: - with Vertical(id="config_dialog"): - yield Label("Shuttle Target Configuration", id="config_title") - - yield Label("Platform:") - with Horizontal(id="platform_radios"): - yield RadioButton("Fluxer", id="opt_fluxer", value=not self.config.use_stoat) - yield RadioButton("Stoat", id="opt_stoat", value=self.config.use_stoat) - - yield Label("Fluxer Bot Token:") - yield Input(value=self.config.fluxer_bot_token or "", id="fluxer_token") - yield Label("Fluxer Community ID:") - yield Input(value=self.config.fluxer_community_id or "", id="fluxer_community") - yield Label("Stoat Bot Token:") - yield Input(value=self.config.stoat_bot_token or "", id="stoat_token") - yield Label("Stoat Server ID:") - yield Input(value=self.config.stoat_server_id or "", id="stoat_server") - - with Horizontal(id="config_buttons"): - yield Button("Save", variant="success", id="btn_save") - yield Button("Cancel", variant="primary", id="btn_cancel") - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_save": - is_stoat = self.query_one("#opt_stoat", RadioButton).value - - f_token = self.query_one("#fluxer_token", Input).value - f_comm = self.query_one("#fluxer_community", Input).value - s_token = self.query_one("#stoat_token", Input).value - s_server = self.query_one("#stoat_server", Input).value - - # Save dummy if empty - if f_token == "": f_token = None - if f_comm == "": f_comm = None - if s_token == "": s_token = None - if s_server == "": s_server = None - - self.dismiss({ - "use_stoat": is_stoat, - "fluxer_token": f_token, - "fluxer_community": f_comm, - "stoat_token": s_token, - "stoat_server": s_server - }) - elif event.button.id == "btn_cancel": - self.dismiss(None) - - -class ShuttleScreen(Screen): - BINDINGS = [ - ("q", "app.exit", "Quit"), - ("c", "config", "Config"), - ("escape", "app.pop_screen", "Back"), - ] - - def __init__(self): - super().__init__() - self.validation_results = {} - self.tokens_valid = False - - def compose(self) -> ComposeResult: - yield Header(show_clock=True) - with Container(id="main_container"): - yield Label("", id="shuttle_title_label") - with Vertical(id="info_container"): - yield Label("Loading...", id="lbl_target") - yield Label("Loading...", id="lbl_discord") - yield Label("", id="lbl_perms") - with Vertical(id="actions_container"): - yield Button("Clone Server Template (Channels)", id="btn_clone", disabled=True) - yield Button("Copy Roles & Permissions", id="btn_roles", disabled=True) - yield Button("Copy Emojis & Stickers", id="btn_emojis", disabled=True) - yield Button("Sync Content & Server Meta", id="btn_meta", disabled=True) - yield Button("Migrate message history", id="btn_history", disabled=True) - yield Rule() - yield Button("Shuttle & Platform Configuration", id="btn_config") - yield Button("Danger Zone \u26a0", id="btn_danger", variant="error", disabled=True) - yield Button("Back to Menu", id="btn_back") - yield Footer() - - def on_mount(self) -> None: - self.validate_config() - - @property - def config(self): return self.app.config - - @property - def engine(self): - is_stoat = getattr(self.config, 'use_stoat', False) - return self.app.stoat_engine if is_stoat else self.app.fluxer_engine - - @work(exclusive=True) - async def validate_config(self) -> None: - is_stoat = getattr(self.config, 'use_stoat', False) - platform_name = "Stoat" if is_stoat else "Fluxer" - - def update_ui(target_text, discord_text, perms_text, buttons_enabled): - self.query_one("#shuttle_title_label", Label).update(f"Server Shuttle Tools ({platform_name})") - self.query_one("#lbl_target", Label).update(f"Target {platform_name}: {target_text}") - self.query_one("#lbl_discord", Label).update(f"Source Discord: {discord_text}") - self.query_one("#lbl_perms", Label).update(f"Permission Status: {perms_text}") - for btn_id in ["#btn_clone", "#btn_roles", "#btn_emojis", "#btn_meta", "#btn_history", "#btn_danger"]: - self.query_one(btn_id, Button).disabled = not buttons_enabled - - update_ui("[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", False) - - engine = self.engine - - d_token = self.config.discord_bot_token - fillers = ["DISCORD_BOT_TOKEN", "FLUXER_BOT_TOKEN", "STOAT_BOT_TOKEN", "000000000000000000", "DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID", "STOAT_SERVER_ID", "", None] - - discord_dummy = d_token in fillers or self.config.discord_server_id in fillers - if is_stoat: - target_dummy = self.config.stoat_bot_token in fillers or self.config.stoat_server_id in fillers - else: - target_dummy = self.config.fluxer_bot_token in fillers or self.config.fluxer_community_id in fillers - - discord_task = None - if not discord_dummy: - discord_task = asyncio.create_task(engine.discord_reader.validate()) - - target_task = None - if not target_dummy: - target_task = asyncio.create_task(engine.writer.validate()) - - tasks_to_wait = [t for t in [discord_task, target_task] if t is not None] - - try: - done = set() - if tasks_to_wait: - done, _ = await asyncio.wait(tasks_to_wait, timeout=10.0, return_when=asyncio.ALL_COMPLETED) - - d_res = discord_task.result() if discord_task in done else {} - t_res = target_task.result() if target_task in done else {} - - self.validation_results["discord_token"] = d_res.get("token", False) - self.validation_results["discord_server"] = d_res.get("server", False) - self.validation_results["discord_server_name"] = d_res.get("server_name") - self.validation_results["discord_intents"] = d_res.get("intents", {}) - self.validation_results["discord_permissions"] = d_res.get("permissions", {}) - - self.validation_results["target_token"] = t_res.get("token", False) - self.validation_results["target_server"] = t_res.get("community", False) - self.validation_results["target_server_name"] = t_res.get("community_name") - self.validation_results["target_permissions"] = t_res.get("permissions", {}) - - # Formatting - d_name = self.validation_results.get("discord_server_name") - d_display = f"[green]\"{d_name}\"[/green]" if d_name else "[red]NOT CONNECTED[/red]" - - t_name = self.validation_results.get("target_server_name") - t_display = f"[green]\"{t_name}\"[/green]" if t_name else "[red]NOT CONNECTED[/red]" - - discord_valid = self.validation_results.get("discord_token") and self.validation_results.get("discord_server") - target_valid = self.validation_results.get("target_token") and self.validation_results.get("target_server") - self.tokens_valid = discord_valid and target_valid - - # Set up folder cache - if self.tokens_valid and t_name: - t_id = self.config.stoat_server_id if is_stoat else self.config.fluxer_community_id - safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', t_name) - engine.state.set_folder(str(t_id), safe_name) - - # Check permissions - self.permissions_complete = True - if self.tokens_valid: - d_intents = self.validation_results.get("discord_intents", {}) - d_perms = self.validation_results.get("discord_permissions", {}) - if not all([d_intents.get("message_content"), d_perms.get("view_channel"), d_perms.get("read_message_history")]): - self.permissions_complete = False - - t_perms = self.validation_results.get("target_permissions", {}) - if not all(t_perms.values()) if t_perms else True: - self.permissions_complete = False - - if not self.tokens_valid: - perms_display = "[red]INVALID TOKENS[/red]" - elif not self.permissions_complete: - perms_display = "[yellow]MISSING REQUIRED PERMISSIONS[/yellow]" - else: - perms_display = "[green]VALID[/green]" - - update_ui(t_display, d_display, perms_display, self.tokens_valid) - - except Exception as e: - update_ui(f"[red]Validation Error: {e}[/red]", "", "", False) - self.tokens_valid = False - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_back": - self.app.pop_screen() - elif event.button.id == "btn_config": - def check_reply(reply: dict | None) -> None: - if reply is not None: - self.config.use_stoat = reply["use_stoat"] - self.config.fluxer_bot_token = reply["fluxer_token"] - self.config.fluxer_community_id = reply["fluxer_community"] - self.config.stoat_bot_token = reply["stoat_token"] - self.config.stoat_server_id = reply["stoat_server"] - save_config(self.config, self.app.config_path) - self.app.reinit_engines() - self.validate_config() - self.app.push_screen(ShuttleConfigModal(self.config), check_reply) - elif event.button.id == "btn_clone": - self.run_clone() - elif event.button.id == "btn_roles": - self.run_roles() - elif event.button.id == "btn_emojis": - self.run_emojis() - elif event.button.id == "btn_meta": - self.run_meta() - elif event.button.id == "btn_history": - pass # Implement later - - @work(exclusive=True) - async def run_clone(self) -> None: - modal_prog = ProgressModal() - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - is_stoat = getattr(self.config, 'use_stoat', False) - if not is_stoat: - from src.fluxer.clone_server import sync_channel_state, migrate_channels - else: - from src.stoat.clone_server import sync_channel_state, migrate_channels - - try: - modal_prog.set_status("Fetching structure...") - await self.engine.start_connections() - modal_prog.write_to_log("Syncing channel state...") - await sync_channel_state(self.engine) - - categories = await self.engine.discord_reader.get_categories() - channels = await self.engine.discord_reader.get_channels() - - async def update_progress(item_name: str, status: str, current: int, total: int): - modal_prog.set_status(f"{status} Channel: {item_name} ({current}/{total})") - - modal_prog.write_to_log(f"[yellow]Starting Channel Cloning...[/yellow]") - self.engine.is_running = True - - cloned_info = await migrate_channels(self.engine, progress_callback=update_progress, force=False) - - modal_prog.write_to_log("[bold green]Server Template cloned![/bold green]") - - if cloned_info and cloned_info.get("structure"): - await log_audit_event(self.engine, "Server Template Cloned", "Successfully cloned target structure.") - else: - await log_audit_event(self.engine, "Server Template Cloned", "No new channels were cloned.") - - except Exception as e: - modal_prog.write_to_log(f"[bold red]Error during channel clone: {e}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - modal_prog.set_status("Finished.") - modal_prog.allow_close() - - @work(exclusive=True) - async def run_roles(self) -> None: - modal_prog = ProgressModal() - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - is_stoat = getattr(self.config, 'use_stoat', False) - roles_mod = stoat_roles if is_stoat else fluxer_roles - - try: - modal_prog.set_status("Fetching roles...") - await self.engine.start_connections() - modal_prog.write_to_log("Syncing role state...") - await roles_mod.sync_roles_state(self.engine) - - async def update_progress(item_name: str, current: int, total: int): - modal_prog.set_status(f"Copying Role: {item_name} ({current}/{total})") - - modal_prog.write_to_log(f"[yellow]Starting Role Migration...[/yellow]") - self.engine.is_running = True - - cloned_roles = await roles_mod.migrate_roles(self.engine, progress_callback=update_progress, force=False) - - modal_prog.write_to_log("[bold green]Role migration complete![/bold green]") - - if cloned_roles: - await log_audit_event(self.engine, "Roles Cloned", "Successfully cloned roles.") - else: - await log_audit_event(self.engine, "Roles Cloned", "No new roles were cloned.") - - except Exception as e: - modal_prog.write_to_log(f"[bold red]Error during role clone: {e}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - modal_prog.set_status("Finished.") - modal_prog.allow_close() - - @work(exclusive=True) - async def run_emojis(self) -> None: - modal_prog = ProgressModal() - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - is_stoat = getattr(self.config, 'use_stoat', False) - emo_mod = stoat_emoji_stickers if is_stoat else fluxer_emoji_stickers - - try: - modal_prog.set_status("Fetching emojis...") - await self.engine.start_connections() - modal_prog.write_to_log("Syncing emoji & sticker state...") - await emo_mod.sync_emojis_state(self.engine) - await emo_mod.sync_stickers_state(self.engine) - - async def update_progress(item_name: str, item_type: str, current: int, total: int): - modal_prog.set_status(f"Copying {item_type}: {item_name} ({current}/{total})") - - modal_prog.write_to_log(f"[yellow]Starting Asset Migration...[/yellow]") - self.engine.is_running = True - - # Using force=False - res_e, res_s = await emo_mod.migrate_emojis(self.engine, progress_callback=update_progress, force=False) - - modal_prog.write_to_log("[bold green]Asset migration complete![/bold green]") - await log_audit_event(self.engine, "Assets Cloned", f"E: {res_e} S: {res_s}") - - except Exception as e: - modal_prog.write_to_log(f"[bold red]Error during asset migrate: {e}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - modal_prog.set_status("Finished.") - modal_prog.allow_close() - - @work(exclusive=True) - async def run_meta(self) -> None: - modal_prog = ProgressModal() - self.app.push_screen(modal_prog) - await asyncio.sleep(0.1) - - is_stoat = getattr(self.config, 'use_stoat', False) - meta_mod = stoat_metadata if is_stoat else fluxer_metadata - - try: - modal_prog.set_status("Connecting to platforms...") - await self.engine.start_connections() - - def cb(item, status): - modal_prog.write_to_log(f"{item}: {status}") - - modal_prog.write_to_log(f"[yellow]Starting Meta Migration...[/yellow]") - self.engine.is_running = True - - await meta_mod.migrate_server_metadata(self.engine, progress_callback=cb) - - modal_prog.write_to_log("[bold green]Meta migration complete![/bold green]") - - except Exception as e: - modal_prog.write_to_log(f"[bold red]Error during meta migrate: {e}[/bold red]") - finally: - await self.engine.close_connections() - self.engine.is_running = False - modal_prog.set_status("Finished.") - modal_prog.allow_close() - - -# ------------------------------------------------------------------------- -# Main Home Screen -# ------------------------------------------------------------------------- - -class HomeScreen(Screen): - BINDINGS = [ - ("q", "app.exit", "Quit"), - ] - - def compose(self) -> ComposeResult: - yield Header(show_clock=True) - with Container(id="home_container"): - yield Label("Unified Discord Toolkit", id="home_title") - with Grid(id="home_grid"): - yield Button("Reaper Mode\n(Export from Discord)", id="btn_reaper", variant="success") - yield Button("Shuttle Mode\n(Restore to Fluxer/Stoat)", id="btn_shuttle", variant="primary") - yield Rule() - yield Button("Exit", id="btn_exit", variant="error") - yield Footer() - - def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "btn_reaper": - self.app.push_screen(ReaperScreen()) - elif event.button.id == "btn_shuttle": - self.app.push_screen(ShuttleScreen()) - elif event.button.id == "btn_exit": - self.app.exit() - -# ------------------------------------------------------------------------- -# App Definition -# ------------------------------------------------------------------------- - -class UnifierApp(App): - CSS = """ - Screen { align: center middle; } - - #title_label, #shuttle_title_label, #home_title { - text-style: bold; - color: green; - margin-bottom: 1; - content-align: center middle; - width: 100%; - } - - #home_title { color: magenta; margin-bottom: 2; text-style: bold italic; } - - #main_container, #home_container { - width: 80%; - height: 80%; - border: solid green; - padding: 1 2; - } - #home_container { border: double magenta; } - - #home_grid { - grid-size: 2 1; - grid-rows: 1fr; - grid-columns: 1fr; - grid-gutter: 2; - padding: 1; - height: 12; - } - #home_grid Button { - height: 100%; - width: 100%; - } - - #info_container { - height: auto; - margin-bottom: 2; - border: tall cyan; - padding: 1; - } - - #actions_container { - height: auto; - layout: vertical; - align: center top; - margin-top: 1; - } - - Button { - width: 100%; - margin-bottom: 1; - } - - #config_dialog { - width: 70%; - height: 80%; - border: thick $background 80%; - background: $surface; - padding: 1 2; - } - #config_title { text-style: bold; margin-bottom: 1; } - #config_buttons { height: auto; margin-top: 1; } - #config_buttons Button { width: 1fr; margin: 0 1; } - - #channel_dialog { - width: 80%; - height: 80%; - border: thick $background 80%; - background: $surface; - padding: 1 2; - } - #chan_title { text-style: bold; margin-bottom: 1; } - - .category_header { - margin-top: 1; - background: $primary 10%; - text-style: bold; - padding-left: 1; - } - - #channel_list_scroll { - height: 1fr; - border: solid $primary; - margin-bottom: 1; - padding: 0 1; - } - - #select_all_buttons { height: auto; margin-bottom: 1; } - #select_all_buttons Button { width: auto; margin-right: 1; } - - #confirm_buttons { height: auto; margin-top: 1; } - #confirm_buttons Button { width: auto; margin: 0 1; } - - #progress_dialog { - width: 80%; - height: 80%; - border: thick $background 80%; - background: $surface; - padding: 1 2; - } - #progress_status { text-style: bold; margin-bottom: 1; } - #progress_bar { margin-bottom: 1; } - #progress_log { height: 1fr; margin-bottom: 1; border: solid $primary; } - .label_warning { color: yellow; margin-right: 1; content-align: center middle;} - - #platform_radios { height: auto; margin-bottom: 1; } - #platform_radios RadioButton { width: auto; margin-right: 2; } - """ - - def __init__(self, config_path="config.yaml"): - super().__init__() - self.config_path = config_path - self.config = load_config(self.config_path) - self.reinit_engines() - - def reinit_engines(self): - self.reaper_engine = MigrationContext(self.config, target_platform="fluxer") - self.exporter = DiscordExporter(self.reaper_engine.discord_reader) - - # Determine active shuttle engine - is_stoat = getattr(self.config, 'use_stoat', False) - self.fluxer_engine = MigrationContext(self.config, "fluxer") - self.stoat_engine = MigrationContext(self.config, "stoat") - - def on_mount(self) -> None: - self.push_screen(HomeScreen()) - -async def run_unifier_tui(config_path="config.yaml"): - app = UnifierApp(config_path) - await app.run_async()