diff --git a/src/core/engine.py b/src/core/engine.py index be204ce..e4e41a0 100644 --- a/src/core/engine.py +++ b/src/core/engine.py @@ -110,8 +110,13 @@ class MigrationEngine: except Exception: await progress_callback("Server Banner", "ERROR") - async def migrate_channels(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): - """Clones categories and text channels.""" + async def migrate_channels(self, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, force: bool = False): + """Clones categories and text channels. + + Args: + progress_callback: Optional callback receiving (item_name, status, current, total) + force: If True, re-create channels even if they exist in state. + """ categories = await self.discord_reader.get_categories() channels = await self.discord_reader.get_channels() @@ -121,21 +126,30 @@ class MigrationEngine: # Migrate Categories first for cat in categories: if not self.is_running: break - fluxer_id = self.state.get_fluxer_channel_id(str(cat.id)) + + state_key = str(cat.id) + fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key) + status = "Copying" + if not fluxer_id: # 4 corresponds to Category type in Discord/Fluxer typically fluxer_id = await self.fluxer_writer.create_channel(cat.name, type=4) - self.state.set_channel_mapping(str(cat.id), fluxer_id) + self.state.set_channel_mapping(state_key, fluxer_id) + else: + status = "Skipping" current_idx += 1 - if progress_callback: await progress_callback(f"Cat: {cat.name}", current_idx, total) + if progress_callback: await progress_callback(f"Cat: {cat.name}", status, current_idx, total) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) # Migrate Text Channels for channel in channels: if not self.is_running: break - fluxer_id = self.state.get_fluxer_channel_id(str(channel.id)) + state_key = str(channel.id) + fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key) + status = "Copying" + if not fluxer_id: topic = channel.topic if channel.topic else "" parent_id = self.state.get_fluxer_channel_id(str(channel.category_id)) if channel.category_id else None @@ -146,10 +160,12 @@ class MigrationEngine: type=0, parent_id=parent_id ) - self.state.set_channel_mapping(str(channel.id), fluxer_id) + self.state.set_channel_mapping(state_key, fluxer_id) + else: + status = "Skipping" current_idx += 1 - if progress_callback: await progress_callback(channel.name, current_idx, total) + if progress_callback: await progress_callback(channel.name, status, current_idx, total) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) async def sync_permissions(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): @@ -184,6 +200,33 @@ class MigrationEngine: if progress_callback: await progress_callback(channel.name, current_idx, total) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) + async def analyze_migration(self, source_channel_id: int, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, int]: + """ + Scans channel history to count messages, threads, and attachments. + """ + stats = {"messages": 0, "threads": 0, "attachments": 0} + + async for msg in self.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): + if not self.is_running: + break + + stats["messages"] += 1 + stats["attachments"] += len(msg.attachments) + + # Count thread messages and markers + if hasattr(msg, 'thread') and msg.thread: + stats["threads"] += 1 + # Recursively count thread content + thread_stats = await self.analyze_migration(msg.thread.id) + stats["messages"] += thread_stats["messages"] + stats["attachments"] += thread_stats["attachments"] + stats["threads"] += thread_stats["threads"] # Nested threads (rare in Discord but possible in forum channels) + + if progress_callback and stats["messages"] % 10 == 0: + await progress_callback(stats["messages"]) + + return stats + async def migrate_messages(self, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None): """Migrate messages for a specific channel.""" message_count = 0 @@ -241,12 +284,39 @@ class MigrationEngine: if fluxer_msg_id: self.state.set_message_mapping(str(msg.id), fluxer_msg_id) + # Check for associated thread + if hasattr(msg, 'thread') and msg.thread: + thread = msg.thread + logger.info(f"Detected thread '{thread.name}' on message {msg.id}") + + # Send Start Marker + await self.fluxer_writer.send_marker( + channel_id=target_channel_id, + content=f"> <<< THREAD: **{thread.name}** >>>" + ) + + # Migrate thread messages + # We don't pass a progress callback here to avoid confusing the UI + # but we do want to track count if possible. + await self.migrate_messages( + source_channel_id=thread.id, + target_channel_id=target_channel_id + ) + + # Send End Marker + await self.fluxer_writer.send_marker( + channel_id=target_channel_id, + content=f"> <<< END OF THREAD >>>" + ) + self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) message_count += 1 if progress_callback: await progress_callback(message_count) except Exception as e: - logger.error(f"Failed to send message to Fluxer: {e}") + logger.error(f"Failed to process message {msg.id}: {e}") + import traceback + logger.error(traceback.format_exc()) # Delay for rate limit safety await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) @@ -345,7 +415,10 @@ class MigrationEngine: async def danger_delete_all_channels(self, progress_callback=None) -> int: """Deletes every channel and category in the Fluxer community.""" - return await self.fluxer_writer.delete_all_channels(progress_callback=progress_callback) + count = await self.fluxer_writer.delete_all_channels(progress_callback=progress_callback) + self.state.clear_channel_mappings() + self.state.clear_message_history() + return count async def danger_reset_channel_permissions(self, progress_callback=None) -> int: """Resets all permission overwrites on every channel and category.""" @@ -353,9 +426,13 @@ class MigrationEngine: async def danger_delete_all_roles(self, progress_callback=None) -> int: """Deletes all deletable roles (skips managed/bot roles and @everyone).""" - return await self.fluxer_writer.delete_all_roles(progress_callback=progress_callback) + count = await self.fluxer_writer.delete_all_roles(progress_callback=progress_callback) + self.state.clear_role_mappings() + return count async def danger_delete_all_emojis_and_stickers(self, progress_callback=None) -> dict: """Deletes all custom emojis and stickers. Returns {"emojis": int, "stickers": int}.""" - return await self.fluxer_writer.delete_all_emojis_and_stickers(progress_callback=progress_callback) + counts = await self.fluxer_writer.delete_all_emojis_and_stickers(progress_callback=progress_callback) + self.state.clear_asset_mappings() + return counts diff --git a/src/core/state.py b/src/core/state.py index 9991b85..f1f98fc 100644 --- a/src/core/state.py +++ b/src/core/state.py @@ -56,3 +56,31 @@ class MigrationState: def update_last_message_timestamp(self, channel_id: str, timestamp: str): self.last_message_timestamps[str(channel_id)] = timestamp self.save() + + def clear_channel_mappings(self): + """Clears all channel and category mappings (excludes roles/emojis/stickers).""" + to_remove = [k for k in self.channel_map.keys() if k.isdigit()] + for k in to_remove: + del self.channel_map[k] + self.save() + + def clear_role_mappings(self): + """Clears all role mappings.""" + to_remove = [k for k in self.channel_map.keys() if k.startswith("role_")] + for k in to_remove: + del self.channel_map[k] + self.role_map.clear() + self.save() + + def clear_asset_mappings(self): + """Clears all emoji and sticker mappings.""" + to_remove = [k for k in self.channel_map.keys() if k.startswith("emoji_") or k.startswith("sticker_")] + for k in to_remove: + del self.channel_map[k] + self.save() + + def clear_message_history(self): + """Clears all message mappings and timestamps.""" + self.message_map.clear() + self.last_message_timestamps.clear() + self.save() diff --git a/src/fluxer_bot/writer.py b/src/fluxer_bot/writer.py index 37d2908..917df40 100644 --- a/src/fluxer_bot/writer.py +++ b/src/fluxer_bot/writer.py @@ -193,6 +193,20 @@ class FluxerWriter: print(err_msg) return None + async def send_marker(self, channel_id: str, content: str) -> Optional[str]: + """ + Sends a simple marker message (e.g., thread start/end) using the bot directly. + """ + assert self.client is not None + try: + msg_data = await self.client.send_message( + channel_id=channel_id, + content=content + ) + return str(msg_data["id"]) if msg_data else None + except Exception as e: + print(f"Failed to send marker: {e}") + return None async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str: """ diff --git a/src/ui/app.py b/src/ui/app.py index c25a985..2d030dc 100644 --- a/src/ui/app.py +++ b/src/ui/app.py @@ -245,9 +245,22 @@ class MigrationCLI: console.print("") - if not Confirm.ask("Are you sure you want to clone channels and categories?"): + # Check for existing mappings to determine if we should suggest a force re-copy + all_ids = [str(cat.id) for cat in categories] + [str(ch.id) for ch in channels] + cached_count = sum(1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k)) + + force = False + if cached_count > 0: + console.print(f"[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) already in state.json cache.[/yellow]") + force = Confirm.ask("Force re-clone anyway?", default=False) + elif not Confirm.ask("Are you sure you want to clone channels and categories?"): await self.engine.close_connections() return + + if cached_count == 0 or force: + if not force and not Confirm.ask("Are you sure you want to clone channels and categories?"): + await self.engine.close_connections() + return console.print("\n[bold green]Starting Channel Cloning...[/bold green]") try: @@ -261,11 +274,12 @@ class MigrationCLI: channel_task = progress.add_task("[cyan]Copying Channels...", total=100) - async def update_progress(item_name: str, current: int, total: int): - progress.update(channel_task, total=total, completed=current, description=f"[cyan]Copying Channel: {item_name}") + async def update_progress(item_name: str, status: str, current: int, total: int): + color = "cyan" if status == "Copying" else "yellow" + progress.update(channel_task, total=total, completed=current, description=f"[{color}]{status} Channel: {item_name}") self.engine.is_running = True - await self.engine.migrate_channels(progress_callback=update_progress) + await self.engine.migrate_channels(progress_callback=update_progress, force=force) console.print("[bold green]Server Template cloned![/bold green]") @@ -561,7 +575,39 @@ class MigrationCLI: console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]") return - # 4. Final Confirmation + # 4. Analysis and Confirmation + console.print("\n[yellow]Analyzing channel content...[/yellow]") + self.engine.is_running = True + stats = {"messages": 0, "threads": 0, "attachments": 0} + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console + ) as progress: + task = progress.add_task("[cyan]Scanning history...", total=None) + async def update_scan_progress(count: int): + progress.update(task, description=f"[cyan]Scanned {count} items...") + + stats = await self.engine.analyze_migration( + source_channel_id=source_channel.id, + after_message_id=after_id, + progress_callback=update_scan_progress + ) + finally: + self.engine.is_running = False + + console.print(f"\n[bold]Migration Summary:[/bold]") + console.print(f"Number of messages: [bold cyan]{stats['messages']}[/bold cyan]") + console.print(f"Number of threads: [bold cyan]{stats['threads']}[/bold cyan]") + console.print(f"Number of attachments: [bold cyan]{stats['attachments']}[/bold cyan]") + + console.print("\n[bold yellow]Estimated Overhead:[/bold yellow]") + msg_time = stats['messages'] * self.config.migration.rate_limit_delay_seconds + console.print(f"- [bold]Messages:[/bold] ~{msg_time}s delay (rate limiting), {stats['messages']} API writes.") + console.print(f"- [bold]Threads:[/bold] {stats['threads'] * 2} extra marker messages, {stats['threads']} extra history fetches.") + console.print(f"- [bold]Attachments:[/bold] {stats['attachments']} downloads and uploads (bandwidth & API calls).") + if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [magenta]#{target_channel.get('name')}[/magenta]?"): return