diff --git a/src/core/engine.py b/src/core/engine.py index eee4f70..1013598 100644 --- a/src/core/engine.py +++ b/src/core/engine.py @@ -176,34 +176,37 @@ class MigrationEngine: if progress_callback: await progress_callback(channel.name, current_idx, total) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) - async def migrate_messages(self, channel_id: int): + async def migrate_messages(self, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None): """Migrate messages for a specific channel.""" - fluxer_channel_id = self.state.get_fluxer_channel_id(str(channel_id)) - if not fluxer_channel_id: - logger.error(f"Cannot migrate messages: channel {channel_id} not mapped.") - return - message_count = 0 - async for msg in self.discord_reader.fetch_message_history(channel_id): + async for msg in self.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): if not self.is_running: break # Process attachments files = [] for att in msg.attachments: - att_data = await self.discord_reader.download_attachment(att) - files.append({"filename": att.filename, "data": att_data}) + try: + att_data = await self.discord_reader.download_attachment(att) + files.append({"filename": att.filename, "data": att_data}) + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") - await self.fluxer_writer.send_message( - channel_id=fluxer_channel_id, - author_name=msg.author.name, - content=msg.content, - timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), - files=files if files else None - ) - - self.state.update_last_message_timestamp(str(channel_id), str(msg.created_at)) - message_count += 1 + try: + await self.fluxer_writer.send_message( + channel_id=target_channel_id, + author_name=msg.author.name, + content=msg.content, + timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), + files=files if files else None + ) + + self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) + message_count += 1 + if progress_callback: + await progress_callback(message_count) + except Exception as e: + logger.error(f"Failed to send message to Fluxer: {e}") # Delay for rate limit safety await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) diff --git a/src/discord_bot/reader.py b/src/discord_bot/reader.py index b70a841..f62f683 100644 --- a/src/discord_bot/reader.py +++ b/src/discord_bot/reader.py @@ -97,12 +97,32 @@ class DiscordReader: all_channels = [c for c in all_channels if c.category_id == category_id] return all_channels - async def fetch_message_history(self, channel_id: int, limit: int = None) -> AsyncGenerator[discord.Message, None]: - """Yields messages from a given channel, optionally handling pagination.""" - channel = await self.client.fetch_channel(channel_id) + async def get_channel(self, channel_id: int): + """Returns a channel object.""" + return await self.client.fetch_channel(channel_id) + + async def get_message(self, channel_id: int, message_id: int): + """Returns a specific message.""" + channel = await self.get_channel(channel_id) + if hasattr(channel, "fetch_message"): + return await channel.fetch_message(message_id) + return None + + async def get_first_message(self, channel_id: int): + """Returns the first (oldest) message in a channel.""" + channel = await self.get_channel(channel_id) if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread): + async for message in channel.history(limit=1, oldest_first=True): + return message + return None + + async def fetch_message_history(self, channel_id: int, limit: int = None, after_id: int = None) -> AsyncGenerator[discord.Message, None]: + """Yields messages from a given channel, optionally handling pagination.""" + channel = await self.get_channel(channel_id) + if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread): + after = discord.Object(id=after_id) if after_id else None # To avoid exploding RAM, we yield items one by one - async for message in channel.history(limit=limit, oldest_first=True): + async for message in channel.history(limit=limit, oldest_first=True, after=after): yield message async def download_emoji(self, emoji: discord.Emoji) -> bytes: diff --git a/src/fluxer_bot/writer.py b/src/fluxer_bot/writer.py index 017cdde..fb1f84b 100644 --- a/src/fluxer_bot/writer.py +++ b/src/fluxer_bot/writer.py @@ -1,22 +1,47 @@ -# Supposing fluxer.py has an API similar to discord.py or requests based -# Since we don't have the exact library reference, we create a conceptual skeleton. - +import asyncio from typing import Optional, List, Dict, Any -from fluxer.http import HTTPClient +from fluxer import Bot class FluxerWriter: def __init__(self, token: str, community_id: str): self.token = token self.community_id = str(community_id) - self.client: Optional[HTTPClient] = None + self.bot: Optional[Bot] = None + self._bot_task: Optional[asyncio.Task] = None + self._ready_event = asyncio.Event() async def start(self): - """Authenticate with Fluxer.""" - self.client = HTTPClient(token=self.token, is_bot=True) + """Authenticate with Fluxer and start the background bot session.""" + if self.bot and self._bot_task and not self._bot_task.done(): + return + + self.bot = Bot() + self._ready_event.clear() + + # Define a simple on_ready listener to signal when we're connected + @self.bot.event + async def on_ready(): + self._ready_event.set() + + # Start the bot in the background + self._bot_task = asyncio.create_task(self.bot.start(self.token)) + + # Wait for the bot to be ready (timeout of 10s to be safe) + try: + await asyncio.wait_for(self._ready_event.wait(), timeout=10.0) + except asyncio.TimeoutError: + # If it's not ready, it might still work for some REST calls, + # but send_message might fail later. + pass + + @property + def client(self): + """Helper to access the underlying HTTP client.""" + return self.bot._http if self.bot else None async def validate(self) -> dict: """Validates the token and community ID.""" - if not self.client: + if not self.bot or not self._ready_event.is_set(): await self.start() is_token_valid = False @@ -25,10 +50,15 @@ class FluxerWriter: community_name = None try: # Check token by fetching me - me = await self.client.get_current_user() - if me: + if self.bot and self.bot.user: is_token_valid = True - bot_name = me.get("username") + bot_name = self.bot.user.username + else: + # Fallback if on_ready didn't fire yet but we want to check token + me = await self.client.get_current_user() + if me: + is_token_valid = True + bot_name = me.get("username") # Check community guild = await self.client.get_guild(self.community_id) @@ -51,27 +81,34 @@ class FluxerWriter: Returns the new Fluxer channel ID. """ assert self.client is not None - payload = { - "name": name, - "type": type, - } - if topic: - payload["topic"] = topic - if parent_id: - payload["parent_id"] = parent_id - - guild_channel = await self.client.request( - self.client._route("POST", "/guilds/{guild_id}/channels", guild_id=self.community_id), - json=payload + + guild_channel = await self.client.create_guild_channel( + guild_id=self.community_id, + name=name, + type=type, + topic=topic or None, + parent_id=parent_id ) return str(guild_channel["id"]) + async def get_channels(self) -> List[Dict[str, Any]]: + """Returns all channels in the community.""" + assert self.client is not None + return await self.client.get_guild_channels(self.community_id) + async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, files: Optional[List[Dict[str, Any]]] = None) -> None: """ Sends a message to the target channel. """ assert self.client is not None + # Ensure we are ready before sending (wait a bit if needed) + if not self._ready_event.is_set(): + try: + await asyncio.wait_for(self._ready_event.wait(), timeout=5.0) + except asyncio.TimeoutError: + pass + prefix = f"**[{timestamp}] {author_name}**:\n" final_content = prefix + content if content else prefix @@ -83,7 +120,10 @@ class FluxerWriter: ) except Exception as e: # Handle empty messages if an attachment is the only content - print(f"Failed to copy message: {e}") + err_msg = f"Failed to copy message: {e}" + if hasattr(e, 'errors') and e.errors: + err_msg += f" - Details: {e.errors}" + print(err_msg) async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str: """ @@ -170,6 +210,14 @@ class FluxerWriter: print(f"Failed to update community metadata: {e}") async def close(self): - """Cleanly close connection.""" - if self.client: - await self.client.close() + """Cleanly close connection and stop bot task.""" + if self.bot: + await self.bot.close() + if self._bot_task: + self._bot_task.cancel() + try: + await self._bot_task + except asyncio.CancelledError: + pass + self._ready_event.clear() + diff --git a/src/ui/app.py b/src/ui/app.py index deab79c..90fdec3 100644 --- a/src/ui/app.py +++ b/src/ui/app.py @@ -40,7 +40,7 @@ class MigrationCLI: start_time = asyncio.get_event_loop().time() done, pending = await asyncio.wait( [discord_task, fluxer_task], - timeout=3.0, + timeout=10.0, return_when=asyncio.ALL_COMPLETED ) @@ -59,7 +59,7 @@ class MigrationCLI: except Exception as e: console.print(f"[bold red]Discord validation failed with error: {e}[/bold red]") else: - console.print("[bold red]Discord bot token validation timed out after 3 seconds.[/bold red]") + console.print("[bold red]Discord bot token validation timed out after 10 seconds.[/bold red]") discord_task.cancel() # Process Fluxer Result @@ -77,7 +77,7 @@ class MigrationCLI: except Exception as e: console.print(f"[bold red]Fluxer validation failed with error: {e}[/bold red]") else: - console.print("[bold red]Fluxer bot token validation timed out after 3 seconds.[/bold red]") + console.print("[bold red]Fluxer bot token validation timed out after 10 seconds.[/bold red]") fluxer_task.cancel() self.tokens_valid = all(self.validation_results.values()) @@ -108,7 +108,7 @@ class MigrationCLI: console.print("(2) Copy Roles & Permissions") console.print("(3) Copy Emojis & Stickers") console.print("(4) Sync Server Name, Logo and Banner") - console.print("(5) Migrate message history [NOT IMPLEMENTED]") + console.print("(5) Migrate message history") val_status = "[bold green][VALID][/bold green]" if self.tokens_valid else "[bold red][INVALID][/bold red]" console.print(f"(6) Configuration {val_status}") @@ -131,6 +131,7 @@ class MigrationCLI: await self.edit_configuration() elif choice == "Q": console.print("[yellow]Exiting tool...[/yellow]") + await self.engine.close_connections() break async def edit_configuration(self): @@ -445,24 +446,116 @@ class MigrationCLI: await self.engine.close_connections() async def migrate_message_history(self): - if not Confirm.ask("Are you sure you want to migrate message history?"): - return - - console.print("\n[bold green]Starting Message History Migration...[/bold green]") + console.print("\n[bold]Message History Migration (Manual Selection)[/bold]") + + if not self.tokens_valid: + console.print("[bold red]Error: You must have a valid configuration (Option 6) to migrate messages.[/bold red]") + return + try: + # Note: We don't call start_connections here because engine methods handles it + # or we handle it manually if we need to fetch data before migration. + # Actually, to get channels, we need connections. await self.engine.start_connections() - # Mock example of passing message progress. - console.print("[cyan]Migrating messages for the first channel (Demo)...[/cyan]") - channels = await self.engine.discord_reader.get_channels() - if channels: - self.engine.is_running = True - await self.engine.migrate_messages(channels[0].id) - console.print("[bold green]Message history migration complete![/bold green]") + + # 1. Select Source Discord Channel + d_channels = await self.engine.discord_reader.get_channels() + if not d_channels: + console.print("[yellow]No text channels found in Discord server.[/yellow]") + return + + console.print("\n[bold]Select Source Discord Channel:[/bold]") + for i, ch in enumerate(d_channels): + console.print(f"({i+1}) {ch.name}") + + d_choices = [str(i+1) for i in range(len(d_channels))] + d_choice = Prompt.ask("Select Discord Channel", choices=d_choices) + source_channel = d_channels[int(d_choice) - 1] + + # 2. Select Target Fluxer Channel + f_channels = await self.engine.fluxer_writer.get_channels() + if not f_channels: + console.print("[yellow]No channels found in Fluxer community.[/yellow]") + return + + console.print("\n[bold]Select Target Fluxer Channel:[/bold]") + for i, ch in enumerate(f_channels): + console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}") + + f_choices = [str(i+1) for i in range(len(f_channels))] + f_choice = Prompt.ask("Select Fluxer Channel", choices=f_choices) + target_channel = f_channels[int(f_choice) - 1] + + # 3. Handle Starting Message + first_msg = await self.engine.discord_reader.get_first_message(source_channel.id) + after_id = None + + if first_msg: + # Link format: https://discord.com/channels/GUILD_ID/CHANNEL_ID/MESSAGE_ID + first_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{first_msg.id}" + console.print(f"\n[bold green]Channel Found![/bold green]") + console.print(f"Oldest Message Link: {first_msg_link}") + console.print(f"Author: [blue]{first_msg.author.name}[/blue]") + console.print(f"Content Preview: {first_msg.content[:100]}...") + + prompt_text = "Start migration from this oldest message? (Y for Yes, S for Specific Link, B to Back)" + start_mode = Prompt.ask(prompt_text, choices=["Y", "S", "B"], default="Y").upper() + + if start_mode == "B": + return + elif start_mode == "S": + custom_link = Prompt.ask("Enter Discord Message Link") + try: + # Extract message ID from end of link + custom_id = int(custom_link.split("/")[-1]) + # Check if message exists to give feedback + msg = await self.engine.discord_reader.get_message(source_channel.id, custom_id) + if msg: + # To include this message, we start 'after' the ID before it + after_id = custom_id - 1 + console.print(f"[green]Confirmed: Starting from {msg.author.name}'s message.[/green]") + else: + console.print("[red]Warning: Message ID not found in this channel. Starting from beginning.[/red]") + except Exception as e: + console.print(f"[red]Error parsing link: {e}. Starting from beginning.[/red]") + else: + console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]") + return + + # 4. Final Confirmation + if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [magenta]#{target_channel.get('name')}[/magenta]?"): + return + + # 5. Migration Execution + console.print("\n[bold green]Starting Migration...[/bold green]") + self.engine.is_running = True + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console + ) as progress: + task = progress.add_task("[cyan]Migrating messages...", total=None) + + async def update_msg_progress(count: int): + progress.update(task, description=f"[cyan]Migrated {count} messages...") + + count = await self.engine.migrate_messages( + source_channel_id=source_channel.id, + target_channel_id=target_channel.get("id"), + after_message_id=after_id, + progress_callback=update_msg_progress + ) + + console.print(f"\n[bold green]Success! {count} messages migrated to {target_channel.get('name')}.[/bold green]") + except Exception as e: - console.print(f"[bold red]Error during message migration: {str(e)}[/bold red]") + console.print(f"[bold red]Migration encountered an error: {str(e)}[/bold red]") finally: - await self.engine.close_connections() self.engine.is_running = False + await self.engine.close_connections() async def run_cli(): cli = MigrationCLI()