add initial message migrate support

This commit is contained in:
rambros 2026-02-21 03:05:59 +05:30
parent 3de5447819
commit 3ca84f7c86
4 changed files with 231 additions and 67 deletions

View file

@ -176,34 +176,37 @@ class MigrationEngine:
if progress_callback: await progress_callback(channel.name, current_idx, total) if progress_callback: await progress_callback(channel.name, current_idx, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def migrate_messages(self, channel_id: int): async def migrate_messages(self, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None):
"""Migrate messages for a specific channel.""" """Migrate messages for a specific channel."""
fluxer_channel_id = self.state.get_fluxer_channel_id(str(channel_id))
if not fluxer_channel_id:
logger.error(f"Cannot migrate messages: channel {channel_id} not mapped.")
return
message_count = 0 message_count = 0
async for msg in self.discord_reader.fetch_message_history(channel_id): async for msg in self.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id):
if not self.is_running: if not self.is_running:
break break
# Process attachments # Process attachments
files = [] files = []
for att in msg.attachments: for att in msg.attachments:
try:
att_data = await self.discord_reader.download_attachment(att) att_data = await self.discord_reader.download_attachment(att)
files.append({"filename": att.filename, "data": att_data}) files.append({"filename": att.filename, "data": att_data})
except Exception as e:
logger.error(f"Failed to download attachment {att.filename}: {e}")
try:
await self.fluxer_writer.send_message( await self.fluxer_writer.send_message(
channel_id=fluxer_channel_id, channel_id=target_channel_id,
author_name=msg.author.name, author_name=msg.author.name,
content=msg.content, content=msg.content,
timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"),
files=files if files else None files=files if files else None
) )
self.state.update_last_message_timestamp(str(channel_id), str(msg.created_at)) self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at))
message_count += 1 message_count += 1
if progress_callback:
await progress_callback(message_count)
except Exception as e:
logger.error(f"Failed to send message to Fluxer: {e}")
# Delay for rate limit safety # Delay for rate limit safety
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)

View file

@ -97,12 +97,32 @@ class DiscordReader:
all_channels = [c for c in all_channels if c.category_id == category_id] all_channels = [c for c in all_channels if c.category_id == category_id]
return all_channels return all_channels
async def fetch_message_history(self, channel_id: int, limit: int = None) -> AsyncGenerator[discord.Message, None]: async def get_channel(self, channel_id: int):
"""Yields messages from a given channel, optionally handling pagination.""" """Returns a channel object."""
channel = await self.client.fetch_channel(channel_id) return await self.client.fetch_channel(channel_id)
async def get_message(self, channel_id: int, message_id: int):
"""Returns a specific message."""
channel = await self.get_channel(channel_id)
if hasattr(channel, "fetch_message"):
return await channel.fetch_message(message_id)
return None
async def get_first_message(self, channel_id: int):
"""Returns the first (oldest) message in a channel."""
channel = await self.get_channel(channel_id)
if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread): if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread):
async for message in channel.history(limit=1, oldest_first=True):
return message
return None
async def fetch_message_history(self, channel_id: int, limit: int = None, after_id: int = None) -> AsyncGenerator[discord.Message, None]:
"""Yields messages from a given channel, optionally handling pagination."""
channel = await self.get_channel(channel_id)
if isinstance(channel, discord.TextChannel) or isinstance(channel, discord.Thread):
after = discord.Object(id=after_id) if after_id else None
# To avoid exploding RAM, we yield items one by one # To avoid exploding RAM, we yield items one by one
async for message in channel.history(limit=limit, oldest_first=True): async for message in channel.history(limit=limit, oldest_first=True, after=after):
yield message yield message
async def download_emoji(self, emoji: discord.Emoji) -> bytes: async def download_emoji(self, emoji: discord.Emoji) -> bytes:

View file

@ -1,22 +1,47 @@
# Supposing fluxer.py has an API similar to discord.py or requests based import asyncio
# Since we don't have the exact library reference, we create a conceptual skeleton.
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
from fluxer.http import HTTPClient from fluxer import Bot
class FluxerWriter: class FluxerWriter:
def __init__(self, token: str, community_id: str): def __init__(self, token: str, community_id: str):
self.token = token self.token = token
self.community_id = str(community_id) self.community_id = str(community_id)
self.client: Optional[HTTPClient] = None self.bot: Optional[Bot] = None
self._bot_task: Optional[asyncio.Task] = None
self._ready_event = asyncio.Event()
async def start(self): async def start(self):
"""Authenticate with Fluxer.""" """Authenticate with Fluxer and start the background bot session."""
self.client = HTTPClient(token=self.token, is_bot=True) if self.bot and self._bot_task and not self._bot_task.done():
return
self.bot = Bot()
self._ready_event.clear()
# Define a simple on_ready listener to signal when we're connected
@self.bot.event
async def on_ready():
self._ready_event.set()
# Start the bot in the background
self._bot_task = asyncio.create_task(self.bot.start(self.token))
# Wait for the bot to be ready (timeout of 10s to be safe)
try:
await asyncio.wait_for(self._ready_event.wait(), timeout=10.0)
except asyncio.TimeoutError:
# If it's not ready, it might still work for some REST calls,
# but send_message might fail later.
pass
@property
def client(self):
"""Helper to access the underlying HTTP client."""
return self.bot._http if self.bot else None
async def validate(self) -> dict: async def validate(self) -> dict:
"""Validates the token and community ID.""" """Validates the token and community ID."""
if not self.client: if not self.bot or not self._ready_event.is_set():
await self.start() await self.start()
is_token_valid = False is_token_valid = False
@ -25,6 +50,11 @@ class FluxerWriter:
community_name = None community_name = None
try: try:
# Check token by fetching me # Check token by fetching me
if self.bot and self.bot.user:
is_token_valid = True
bot_name = self.bot.user.username
else:
# Fallback if on_ready didn't fire yet but we want to check token
me = await self.client.get_current_user() me = await self.client.get_current_user()
if me: if me:
is_token_valid = True is_token_valid = True
@ -51,27 +81,34 @@ class FluxerWriter:
Returns the new Fluxer channel ID. Returns the new Fluxer channel ID.
""" """
assert self.client is not None assert self.client is not None
payload = {
"name": name,
"type": type,
}
if topic:
payload["topic"] = topic
if parent_id:
payload["parent_id"] = parent_id
guild_channel = await self.client.request( guild_channel = await self.client.create_guild_channel(
self.client._route("POST", "/guilds/{guild_id}/channels", guild_id=self.community_id), guild_id=self.community_id,
json=payload name=name,
type=type,
topic=topic or None,
parent_id=parent_id
) )
return str(guild_channel["id"]) return str(guild_channel["id"])
async def get_channels(self) -> List[Dict[str, Any]]:
"""Returns all channels in the community."""
assert self.client is not None
return await self.client.get_guild_channels(self.community_id)
async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, files: Optional[List[Dict[str, Any]]] = None) -> None: async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, files: Optional[List[Dict[str, Any]]] = None) -> None:
""" """
Sends a message to the target channel. Sends a message to the target channel.
""" """
assert self.client is not None assert self.client is not None
# Ensure we are ready before sending (wait a bit if needed)
if not self._ready_event.is_set():
try:
await asyncio.wait_for(self._ready_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
prefix = f"**[{timestamp}] {author_name}**:\n" prefix = f"**[{timestamp}] {author_name}**:\n"
final_content = prefix + content if content else prefix final_content = prefix + content if content else prefix
@ -83,7 +120,10 @@ class FluxerWriter:
) )
except Exception as e: except Exception as e:
# Handle empty messages if an attachment is the only content # Handle empty messages if an attachment is the only content
print(f"Failed to copy message: {e}") err_msg = f"Failed to copy message: {e}"
if hasattr(e, 'errors') and e.errors:
err_msg += f" - Details: {e.errors}"
print(err_msg)
async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str: async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str:
""" """
@ -170,6 +210,14 @@ class FluxerWriter:
print(f"Failed to update community metadata: {e}") print(f"Failed to update community metadata: {e}")
async def close(self): async def close(self):
"""Cleanly close connection.""" """Cleanly close connection and stop bot task."""
if self.client: if self.bot:
await self.client.close() await self.bot.close()
if self._bot_task:
self._bot_task.cancel()
try:
await self._bot_task
except asyncio.CancelledError:
pass
self._ready_event.clear()

View file

@ -40,7 +40,7 @@ class MigrationCLI:
start_time = asyncio.get_event_loop().time() start_time = asyncio.get_event_loop().time()
done, pending = await asyncio.wait( done, pending = await asyncio.wait(
[discord_task, fluxer_task], [discord_task, fluxer_task],
timeout=3.0, timeout=10.0,
return_when=asyncio.ALL_COMPLETED return_when=asyncio.ALL_COMPLETED
) )
@ -59,7 +59,7 @@ class MigrationCLI:
except Exception as e: except Exception as e:
console.print(f"[bold red]Discord validation failed with error: {e}[/bold red]") console.print(f"[bold red]Discord validation failed with error: {e}[/bold red]")
else: else:
console.print("[bold red]Discord bot token validation timed out after 3 seconds.[/bold red]") console.print("[bold red]Discord bot token validation timed out after 10 seconds.[/bold red]")
discord_task.cancel() discord_task.cancel()
# Process Fluxer Result # Process Fluxer Result
@ -77,7 +77,7 @@ class MigrationCLI:
except Exception as e: except Exception as e:
console.print(f"[bold red]Fluxer validation failed with error: {e}[/bold red]") console.print(f"[bold red]Fluxer validation failed with error: {e}[/bold red]")
else: else:
console.print("[bold red]Fluxer bot token validation timed out after 3 seconds.[/bold red]") console.print("[bold red]Fluxer bot token validation timed out after 10 seconds.[/bold red]")
fluxer_task.cancel() fluxer_task.cancel()
self.tokens_valid = all(self.validation_results.values()) self.tokens_valid = all(self.validation_results.values())
@ -108,7 +108,7 @@ class MigrationCLI:
console.print("(2) Copy Roles & Permissions") console.print("(2) Copy Roles & Permissions")
console.print("(3) Copy Emojis & Stickers") console.print("(3) Copy Emojis & Stickers")
console.print("(4) Sync Server Name, Logo and Banner") console.print("(4) Sync Server Name, Logo and Banner")
console.print("(5) Migrate message history [NOT IMPLEMENTED]") console.print("(5) Migrate message history")
val_status = "[bold green][VALID][/bold green]" if self.tokens_valid else "[bold red][INVALID][/bold red]" val_status = "[bold green][VALID][/bold green]" if self.tokens_valid else "[bold red][INVALID][/bold red]"
console.print(f"(6) Configuration {val_status}") console.print(f"(6) Configuration {val_status}")
@ -131,6 +131,7 @@ class MigrationCLI:
await self.edit_configuration() await self.edit_configuration()
elif choice == "Q": elif choice == "Q":
console.print("[yellow]Exiting tool...[/yellow]") console.print("[yellow]Exiting tool...[/yellow]")
await self.engine.close_connections()
break break
async def edit_configuration(self): async def edit_configuration(self):
@ -445,24 +446,116 @@ class MigrationCLI:
await self.engine.close_connections() await self.engine.close_connections()
async def migrate_message_history(self): async def migrate_message_history(self):
if not Confirm.ask("Are you sure you want to migrate message history?"): console.print("\n[bold]Message History Migration (Manual Selection)[/bold]")
if not self.tokens_valid:
console.print("[bold red]Error: You must have a valid configuration (Option 6) to migrate messages.[/bold red]")
return return
console.print("\n[bold green]Starting Message History Migration...[/bold green]")
try: try:
# Note: We don't call start_connections here because engine methods handles it
# or we handle it manually if we need to fetch data before migration.
# Actually, to get channels, we need connections.
await self.engine.start_connections() await self.engine.start_connections()
# Mock example of passing message progress.
console.print("[cyan]Migrating messages for the first channel (Demo)...[/cyan]") # 1. Select Source Discord Channel
channels = await self.engine.discord_reader.get_channels() d_channels = await self.engine.discord_reader.get_channels()
if channels: if not d_channels:
self.engine.is_running = True console.print("[yellow]No text channels found in Discord server.[/yellow]")
await self.engine.migrate_messages(channels[0].id) return
console.print("[bold green]Message history migration complete![/bold green]")
console.print("\n[bold]Select Source Discord Channel:[/bold]")
for i, ch in enumerate(d_channels):
console.print(f"({i+1}) {ch.name}")
d_choices = [str(i+1) for i in range(len(d_channels))]
d_choice = Prompt.ask("Select Discord Channel", choices=d_choices)
source_channel = d_channels[int(d_choice) - 1]
# 2. Select Target Fluxer Channel
f_channels = await self.engine.fluxer_writer.get_channels()
if not f_channels:
console.print("[yellow]No channels found in Fluxer community.[/yellow]")
return
console.print("\n[bold]Select Target Fluxer Channel:[/bold]")
for i, ch in enumerate(f_channels):
console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}")
f_choices = [str(i+1) for i in range(len(f_channels))]
f_choice = Prompt.ask("Select Fluxer Channel", choices=f_choices)
target_channel = f_channels[int(f_choice) - 1]
# 3. Handle Starting Message
first_msg = await self.engine.discord_reader.get_first_message(source_channel.id)
after_id = None
if first_msg:
# Link format: https://discord.com/channels/GUILD_ID/CHANNEL_ID/MESSAGE_ID
first_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{first_msg.id}"
console.print(f"\n[bold green]Channel Found![/bold green]")
console.print(f"Oldest Message Link: {first_msg_link}")
console.print(f"Author: [blue]{first_msg.author.name}[/blue]")
console.print(f"Content Preview: {first_msg.content[:100]}...")
prompt_text = "Start migration from this oldest message? (Y for Yes, S for Specific Link, B to Back)"
start_mode = Prompt.ask(prompt_text, choices=["Y", "S", "B"], default="Y").upper()
if start_mode == "B":
return
elif start_mode == "S":
custom_link = Prompt.ask("Enter Discord Message Link")
try:
# Extract message ID from end of link
custom_id = int(custom_link.split("/")[-1])
# Check if message exists to give feedback
msg = await self.engine.discord_reader.get_message(source_channel.id, custom_id)
if msg:
# To include this message, we start 'after' the ID before it
after_id = custom_id - 1
console.print(f"[green]Confirmed: Starting from {msg.author.name}'s message.[/green]")
else:
console.print("[red]Warning: Message ID not found in this channel. Starting from beginning.[/red]")
except Exception as e: except Exception as e:
console.print(f"[bold red]Error during message migration: {str(e)}[/bold red]") console.print(f"[red]Error parsing link: {e}. Starting from beginning.[/red]")
else:
console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]")
return
# 4. Final Confirmation
if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [magenta]#{target_channel.get('name')}[/magenta]?"):
return
# 5. Migration Execution
console.print("\n[bold green]Starting Migration...[/bold green]")
self.engine.is_running = True
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
task = progress.add_task("[cyan]Migrating messages...", total=None)
async def update_msg_progress(count: int):
progress.update(task, description=f"[cyan]Migrated {count} messages...")
count = await self.engine.migrate_messages(
source_channel_id=source_channel.id,
target_channel_id=target_channel.get("id"),
after_message_id=after_id,
progress_callback=update_msg_progress
)
console.print(f"\n[bold green]Success! {count} messages migrated to {target_channel.get('name')}.[/bold green]")
except Exception as e:
console.print(f"[bold red]Migration encountered an error: {str(e)}[/bold red]")
finally: finally:
await self.engine.close_connections()
self.engine.is_running = False self.engine.is_running = False
await self.engine.close_connections()
async def run_cli(): async def run_cli():
cli = MigrationCLI() cli = MigrationCLI()