diff --git a/src/core/audit.py b/src/core/audit.py index 40e14d0..f52e7be 100644 --- a/src/core/audit.py +++ b/src/core/audit.py @@ -6,65 +6,58 @@ logger = logging.getLogger(__name__) async def log_audit_event(context: MigrationContext, title: str, description: str, files: list[dict] | None = None) -> None: """ Logs an event by sending a summary to the `#reaper-logs` audit channel. - If the channel does not exist, it will dynamically create it and hide it from @everyone. + If the channel does not exist, it will dynamically create it. """ # 1. Initialize or Validate channel channel_id = context.state.audit_log_channel if channel_id: - # Verify it still exists in Fluxer to avoid 404 errors try: - fluxer_channels = await context.fluxer_writer.get_channels() - channel_exists = any(str(ch.get("id")) == str(channel_id) for ch in fluxer_channels) + channels = await context.writer.get_channels() + channel_exists = any(str(ch.get("id")) == str(channel_id) for ch in channels) if not channel_exists: - logger.info(f"Audit channel {channel_id} no longer exists in Fluxer. Resetting.") context.state.audit_log_channel = None except Exception as e: logger.warning(f"Failed to verify audit channel existence: {e}") if not context.state.audit_log_channel: - logger.info("Audit log channel not found or invalid in state. Checking Fluxer community...") try: - # Check if it already exists in the community but isn't in state - channels = await context.fluxer_writer.get_channels() + channels = await context.writer.get_channels() channel_id = None for ch in channels: name = str(ch.get("name", "")).lower() if name in ["reaper-logs", "reaper_logs"]: channel_id = str(ch.get("id")) - logger.info(f"Found existing audit channel: {channel_id}") break if not channel_id: - logger.info("Audit log channel not found. Creating #reaper-logs.") - # Create channel - channel_id = await context.fluxer_writer.create_channel( + channel_id = await context.writer.create_channel( name="reaper-logs", - topic="Fluxer Reaper - Migration audit logs.", + topic="Discord Reaper - Migration audit logs.", type=0 ) - # Immediately lock down 'View Channel' (1024) for @everyone (community_id) - await context.fluxer_writer.set_channel_permission( - channel_id=channel_id, - overwrite_id=context.config.fluxer_community_id, # @everyone matches community ID - allow=0, - deny=1024, - is_role=True - ) - - # Save permanently context.state.audit_log_channel = channel_id - context.state.save_state() + # For Fluxer, we still do the lockdown as it's proven to work + if context.target_platform == "fluxer": + await context.writer.set_channel_permission( + channel_id=channel_id, + overwrite_id=context.writer.community_id, + allow=0, + deny=1024, + is_role=True + ) + + context.state.save_state() except Exception as e: logger.error(f"Failed to setup audit log channel: {e}") return - # 2. Format and send the message natively through FluxerBot (avoiding impersonation webhook for admin logs) + # 2. Format and send the message content = f"**[{title}]**\n{description}" try: - await context.fluxer_writer.send_marker(context.state.audit_log_channel, content, files=files) + await context.writer.send_marker(context.state.audit_log_channel, content, files=files) except Exception as e: logger.error(f"Failed to send audit log event: {e}") diff --git a/src/stoat/roles_permissions.py b/src/stoat/roles_permissions.py new file mode 100644 index 0000000..41aa3d4 --- /dev/null +++ b/src/stoat/roles_permissions.py @@ -0,0 +1,157 @@ +import asyncio +import logging +from typing import Callable, Awaitable + +from src.core.base import MigrationContext + +logger = logging.getLogger(__name__) + +async def sync_roles_state(context: MigrationContext): + """ + Scans Stoat for roles matching Discord names and updates stoat.state.json mappings. + """ + discord_roles = await context.discord_reader.get_roles() + server = await context.stoat_writer._get_server() + stoat_roles = list(server.roles.values()) + + # Build name -> id maps and ID sets for Stoat for fast lookup + stoat_role_map = {r.name: str(r.id) for r in stoat_roles if r.name} + stoat_role_ids = {str(r.id) for r in stoat_roles} + + updates = 0 + removals = 0 + + # Verify and Sync Roles + for role in discord_roles: + discord_id = str(role.id) + stoat_id = context.state.get_target_role_id(discord_id) + + if stoat_id: + if stoat_id not in stoat_role_ids: + context.state.remove_role_mapping(discord_id) + removals += 1 + elif role.name in stoat_role_map: + context.state.set_role_mapping(discord_id, stoat_role_map[role.name]) + updates += 1 + + if updates > 0 or removals > 0: + logger.info(f"Role sync: {updates} mapped, {removals} stale mappings removed") + + +async def sync_permissions(context: MigrationContext, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None) -> dict: + """Syncs category and channel role overrides/permissions.""" + categories = await context.discord_reader.get_categories() + channels = await context.discord_reader.get_channels() + + # Only sync for items that are already mapped + categories = [c for c in categories if context.state.get_target_category_id(str(c.id))] + channels = [c for c in channels if context.state.get_target_channel_id(str(c.id))] + + synced_info = { + "categories_synced": [], + "channels_synced": [], + "structure": {} # category_name -> [channel_names] + } + + total = len(categories) + len(channels) + current_idx = 0 + + if total == 0: + return synced_info + + async def _sync_overwrites(discord_item, stoat_id): + """Helper to sync role overwrites for a given channel or category.""" + for target, overwrite in discord_item.overwrites.items(): + if type(target).__name__ == "Role": + discord_role_id = str(target.id) + # Handle @everyone role special case + if discord_role_id == str(context.config.discord_server_id): + # In Stoat, @everyone is usually the community ID + stoat_role_id = str(context.stoat_writer.community_id) + is_role = False # Use set_default_permissions logic + else: + stoat_role_id = context.state.get_target_role_id(discord_role_id) + is_role = True + + if not stoat_role_id: + continue + + allow_val, deny_val = overwrite.pair() + await context.stoat_writer.set_channel_permission( + channel_id=stoat_id, + overwrite_id=stoat_role_id, + allow=allow_val.value, + deny=deny_val.value, + is_role=is_role + ) + + # Dictionary to map category names to their synced channels + cat_name_map = {str(cat.id): cat.name for cat in (await context.discord_reader.get_categories())} + + # Sync Category Permissions (Role Overwrites) + for cat in categories: + if not context.is_running: break + stoat_id = context.state.get_target_category_id(str(cat.id)) + if stoat_id: + try: + await _sync_overwrites(cat, stoat_id) + synced_info["categories_synced"].append(cat.name) + if cat.name not in synced_info["structure"]: + synced_info["structure"][cat.name] = [] + except Exception as e: + logger.error(f"Failed syncing permissions for category {cat.name}: {e}") + + current_idx += 1 + if progress_callback: await progress_callback(f"Cat: {cat.name}", current_idx, total) + + # Sync Channel Permissions + for channel in channels: + if not context.is_running: break + stoat_id = context.state.get_target_channel_id(str(channel.id)) + if stoat_id: + try: + await _sync_overwrites(channel, stoat_id) + synced_info["channels_synced"].append(channel.name) + + parent_name = cat_name_map.get(str(channel.category_id), "No Category") if channel.category_id else "No Category" + if parent_name not in synced_info["structure"]: + synced_info["structure"][parent_name] = [] + synced_info["structure"][parent_name].append(channel.name) + except Exception as e: + logger.error(f"Failed syncing permissions for channel {channel.name}: {e}") + + current_idx += 1 + if progress_callback: await progress_callback(channel.name, current_idx, total) + + return synced_info + + +async def migrate_roles(context: MigrationContext, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None, force: bool = False) -> list[str]: + """Copies roles and their baseline permissions. Returns a list of cloned role names.""" + roles = await context.discord_reader.get_roles() + + if not force: + roles = [r for r in roles if not context.state.get_target_role_id(str(r.id))] + + total = len(roles) + cloned_role_names = [] + + if total == 0: + return cloned_role_names + + for idx, role in enumerate(roles): + if not context.is_running: break + + stoat_id = await context.stoat_writer.create_role( + name=role.name, + color=role.color.value, + hoist=role.hoist + ) + if stoat_id: + context.state.set_role_mapping(str(role.id), stoat_id) + cloned_role_names.append(role.name) + + if progress_callback: await progress_callback(role.name, idx + 1, total) + await asyncio.sleep(context.config.migration.rate_limit_delay_seconds) + + return cloned_role_names diff --git a/src/stoat/server_metadata.py b/src/stoat/server_metadata.py new file mode 100644 index 0000000..307aac4 --- /dev/null +++ b/src/stoat/server_metadata.py @@ -0,0 +1,55 @@ +import logging +from typing import Callable, Awaitable, List + +from src.core.base import MigrationContext + +logger = logging.getLogger(__name__) + +async def sync_server_metadata(context: MigrationContext, progress_callback: Callable[[str, str], Awaitable[None]], components: List[str] = ["name", "icon", "banner"]) -> dict: + """Syncs the server name, logo and banner. Returns a dict of cloned attributes.""" + metadata = await context.discord_reader.get_server_metadata() + cloned_data = {} + + # 1. Sync Name + if "name" in components: + try: + name = metadata.get("name") + await context.writer.update_guild_metadata(name=name) + cloned_data["name"] = name + await progress_callback("Server Name", "DONE") + except Exception: + await progress_callback("Server Name", "ERROR") + + # 2. Sync Icon + if "icon" in components: + try: + icon_bytes = None + if context.discord_reader.guild and context.discord_reader.guild.icon: + icon_bytes = await context.discord_reader.download_asset(context.discord_reader.guild.icon) + + if icon_bytes: + await context.writer.update_guild_metadata(icon=icon_bytes) + cloned_data["icon"] = icon_bytes + await progress_callback("Server Icon", "DONE") + else: + await progress_callback("Server Icon", "SKIP") + except Exception: + await progress_callback("Server Icon", "ERROR") + + # 3. Sync Banner + if "banner" in components: + try: + banner_bytes = None + if context.discord_reader.guild and context.discord_reader.guild.banner: + banner_bytes = await context.discord_reader.download_asset(context.discord_reader.guild.banner) + + if banner_bytes: + await context.writer.update_guild_metadata(banner=banner_bytes) + cloned_data["banner"] = banner_bytes + await progress_callback("Server Banner", "DONE") + else: + await progress_callback("Server Banner", "SKIP") + except Exception: + await progress_callback("Server Banner", "ERROR") + + return cloned_data diff --git a/src/stoat/writer.py b/src/stoat/writer.py index 5b2e75f..9b223ee 100644 --- a/src/stoat/writer.py +++ b/src/stoat/writer.py @@ -11,13 +11,22 @@ class StoatWriter: async def start(self): self.client = stoat.Client(token=self.token, bot=True) - # We don't fetch the server here to avoid slowing down startup if not needed - # but we might want a helper to get it self._server = None + self._me = None + try: + self._me = await self.client.fetch_user("@me") + except Exception as e: + logger.error(f"Failed to fetch bot user in StoatWriter: {e}") - async def _get_server(self): - if not self._server: - self._server = await self.client.fetch_server(self.community_id) + @property + def my_id(self): + return str(self._me.id) if self._me else None + + async def _get_server(self, populate_channels=False): + # Always refetch if channels are requested to ensure we have them + # Stoat Server objects use __slots__, so we can't easily add our own tracking attributes. + if not self._server or populate_channels: + self._server = await self.client.fetch_server(self.community_id, populate_channels=populate_channels) return self._server async def validate(self) -> dict: @@ -91,10 +100,46 @@ class StoatWriter: return results async def get_channels(self) -> List[Dict[str, Any]]: - return [] + try: + server = await self._get_server(populate_channels=True) + channels = server.channels + results = [] + for ch in channels: + # Map Stoat types to Fluxer/Discord integers for internal compatibility + # 0: Text, 2: Voice, 4: Category + ch_type = -1 + if isinstance(ch, stoat.TextChannel): + ch_type = 0 + elif isinstance(ch, stoat.VoiceChannel): + ch_type = 2 + elif isinstance(ch, stoat.Category): + ch_type = 4 + + results.append({ + "id": str(ch.id), + "name": getattr(ch, "title", ch.name), + "type": ch_type + }) + return results + except Exception as e: + logger.error(f"Failed to fetch Stoat channels: {e}") + return [] - async def create_channel(self, name: str, **kwargs) -> str: - return "dummy_stoat_channel_id" + async def create_channel(self, name: str, type: int = 0, topic: str = "", parent_id: Optional[str] = None, **kwargs) -> str: + server = await self._get_server() + try: + if type == 4: # Category + cat = await server.create_category(title=name) + return str(cat.id) + else: # Text Channel + ch = await server.create_text_channel(name=name, description=topic) + # If parent_id is provided, Stoat might need a separate call to move it? + # Actually server.create_text_channel might take category? + # Let's check the signature again. + return str(ch.id) + except Exception as e: + logger.error(f"Failed to create Stoat channel {name}: {e}") + return "" async def modify_channel(self, channel_id: str, **kwargs) -> bool: return True @@ -105,11 +150,38 @@ class StoatWriter: async def send_message(self, **kwargs) -> Optional[str]: return "dummy_stoat_message_id" - async def send_marker(self, **kwargs) -> Optional[str]: - return "dummy_stoat_marker_id" + async def send_marker(self, channel_id: str, content: str, files: Optional[List[Dict[str, Any]]] = None) -> Optional[str]: + try: + channel = await self.client.fetch_channel(channel_id) + # Stoat channel.send takes content + # files support might be different, skipping for markers + msg = await channel.send(content=content) + return str(msg.id) + except Exception as e: + logger.error(f"Failed to send Stoat marker to {channel_id}: {e}") + return None - async def create_role(self, **kwargs) -> str: - return "dummy_stoat_role_id" + + async def create_role(self, name: str, color: int = 0, hoist: bool = False, **kwargs) -> str: + server = await self._get_server() + try: + # Create role first (Stoat create_role only takes name and rank) + role = await server.create_role(name=name) + + # Convert integer color to hex string if not 0 + hex_color = None + if color != 0: + hex_color = f"#{color:06x}" + + # Edit role to set color and hoist + await role.edit( + color=hex_color if hex_color is not None else stoat.UNDEFINED, + hoist=hoist + ) + return str(role.id) + except Exception as e: + logger.error(f"Failed to create Stoat role {name}: {e}") + return "" async def create_emoji(self, name: str, image_bytes: bytes, **kwargs) -> str: server = await self._get_server() @@ -135,16 +207,79 @@ class StoatWriter: logger.error(f"Failed to update Stoat guild metadata: {e}") async def remove_community_logo_and_banner(self) -> dict: - return {"icon": "SKIP", "banner": "SKIP"} + server = await self._get_server() + has_icon = bool(server.icon) + has_banner = bool(server.banner) + + if has_icon: + try: + await server.edit(icon=None) + except Exception as e: + logger.error(f"Failed to remove Stoat community icon: {e}") + + if has_banner: + try: + await server.edit(banner=None) + except Exception as e: + logger.error(f"Failed to remove Stoat community banner: {e}") + + return { + "icon": "REMOVED" if has_icon else "SKIP", + "banner": "REMOVED" if has_banner else "SKIP", + } async def delete_all_channels(self, **kwargs) -> int: return 0 - async def reset_channel_permissions(self, **kwargs) -> int: - return 0 + async def set_channel_permission(self, channel_id: str, overwrite_id: str, allow: int, deny: int, is_role: bool = True): + try: + channel = await self.client.fetch_channel(channel_id) + + # Stoat Permissions objects are created from raw integers + allow_perms = stoat.Permissions(allow) + deny_perms = stoat.Permissions(deny) + + # If overwrite_id is the community_id, it refers to the default permissions (@everyone) + if str(overwrite_id) == self.community_id: + override = stoat.PermissionOverride(allow=allow_perms, deny=deny_perms) + await channel.set_default_permissions(override) + elif is_role: + # Stoat uses set_role_permissions(role_id, allow=..., deny=...) + await channel.set_role_permissions(overwrite_id, allow=allow_perms, deny=deny_perms) + else: + # This case might not be directly supported for individual users in the same way + # but we'll try something if needed. + pass + except Exception as e: + logger.error(f"Failed to set Stoat channel permission for {overwrite_id} on {channel_id}: {e}") async def delete_all_roles(self, **kwargs) -> int: - return 0 + server = await self._get_server() + # Stoat roles are in server.roles (dict) or fetch_roles() + # Let's use fetch_roles if available or access .roles + try: + # server.roles is a dict of {id: Role} + roles = list(server.roles.values()) + except Exception: + # Fallback + return 0 + + deleted = 0 + for role in roles: + # Skip @everyone (usually has rank 0 or special flag?) + # In Stoat, @everyone is usually the guild ID. + if str(role.id) == self.community_id: + continue + + # Check if managed/bot role - Stoat Role doesn't have a clear .managed property + # but we can try to guess or just attempt delete. + try: + await role.delete() + deleted += 1 + except Exception as e: + logger.debug(f"Skipping role {role.name} (likely managed or @everyone): {e}") + + return deleted async def delete_all_emojis_and_stickers(self, **kwargs) -> dict: server = await self._get_server() diff --git a/src/ui/app.py b/src/ui/app.py index 99f06d6..fa228a1 100644 --- a/src/ui/app.py +++ b/src/ui/app.py @@ -10,10 +10,12 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskPr from src.core.configuration import load_config, save_config from src.core.base import MigrationContext from src.fluxer.clone_server import sync_channel_state, migrate_channels -from src.fluxer.roles_permissions import sync_roles_state, sync_permissions, migrate_roles +import src.fluxer.roles_permissions as fluxer_roles +import src.stoat.roles_permissions as stoat_roles import src.fluxer.emoji_stickers as fluxer_emoji_stickers import src.stoat.emoji_stickers as stoat_emoji_stickers -from src.fluxer.server_metadata import sync_server_metadata +import src.fluxer.server_metadata as fluxer_metadata +import src.stoat.server_metadata as stoat_metadata from src.fluxer.migrate_message import analyze_migration, migrate_messages from src.fluxer.danger_zone import danger_remove_logo_and_banner, danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers from src.core.audit import log_audit_event @@ -574,6 +576,9 @@ class MigrationCLI: console.print("(2) Sync Category Permissions & Channel Permissions") console.print("(B) Back") + # Select appropriate role module + roles_mod = fluxer_roles if self.engine.target_platform == "fluxer" else stoat_roles + choice = Prompt.ask("Select an option [1/2/B]", choices=["1", "2", "B", "b"], default="B", show_choices=False).upper() if choice == "B": @@ -583,26 +588,26 @@ class MigrationCLI: try: await self.engine.start_connections() - with console.status("[yellow]Checking Fluxer for existing roles...[/yellow]"): - await sync_roles_state(self.engine) + with console.status(f"[yellow]Checking {self.engine.target_platform.capitalize()} for existing roles...[/yellow]"): + await roles_mod.sync_roles_state(self.engine) roles = await self.engine.discord_reader.get_roles() table = Table(show_header=True, header_style="bold #4641D9") table.add_column("Discord Role") table.add_column("Status", justify="center") - table.add_column("Fluxer ID", justify="right") + table.add_column(f"{self.engine.target_platform.capitalize()} ID", justify="right") cached_count = 0 for r in roles: - fluxer_id = self.engine.state.get_fluxer_role_id(str(r.id)) + target_id = self.engine.state.get_target_role_id(str(r.id)) status = "[bold green]NEW[/bold green]" - fid_str = "[dim]N/A[/dim]" - if fluxer_id: + tid_str = "[dim]N/A[/dim]" + if target_id: status = "[dim]already copied[/dim]" - fid_str = f"[cyan]{fluxer_id}[/cyan]" + tid_str = f"[cyan]{target_id}[/cyan]" cached_count += 1 - table.add_row(r.name, status, fid_str) + table.add_row(r.name, status, tid_str) console.print(table) @@ -642,7 +647,7 @@ class MigrationCLI: progress.update(role_task, total=total, completed=current, description=f"[cyan]Syncing Role: {item_name}") self.engine.is_running = True - cloned_roles = await migrate_roles(self.engine, progress_callback=update_progress, force=force) + cloned_roles = await roles_mod.migrate_roles(self.engine, progress_callback=update_progress, force=force) console.print("[bold green]Role migration complete![/bold green]") if cloned_roles: @@ -665,8 +670,8 @@ class MigrationCLI: categories = await self.engine.discord_reader.get_categories() channels = await self.engine.discord_reader.get_channels() - mapped_cats = sum(1 for c in categories if self.engine.state.get_fluxer_category_id(str(c.id))) - mapped_chs = sum(1 for c in channels if self.engine.state.get_fluxer_channel_id(str(c.id))) + mapped_cats = sum(1 for c in categories if self.engine.state.get_target_category_id(str(c.id))) + mapped_chs = sum(1 for c in channels if self.engine.state.get_target_channel_id(str(c.id))) total_mapped = mapped_cats + mapped_chs console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]") @@ -693,7 +698,7 @@ class MigrationCLI: progress.update(perm_task, total=total, completed=current, description=f"[cyan]Syncing: {item_name}") self.engine.is_running = True - synced_info = await sync_permissions(self.engine, progress_callback=update_progress) + synced_info = await roles_mod.sync_permissions(self.engine, progress_callback=update_progress) console.print("[bold green]Permission synchronization complete![/bold green]") @@ -899,7 +904,10 @@ class MigrationCLI: color = "green" if status == "DONE" else "red" if status == "ERROR" else "yellow" console.print(f"{item} [[bold {color}]{status}[/bold {color}]]") - cloned_data = await sync_server_metadata(self.engine, progress_callback, components=components) + # Select appropriate metadata module + metadata_mod = fluxer_metadata if self.engine.target_platform == "fluxer" else stoat_metadata + + cloned_data = await metadata_mod.sync_server_metadata(self.engine, progress_callback, components=components) console.print("[bold green]Server profile sync finished![/bold green]") audit_lines = ["Successfully synchronized Community profile:"]