show migration info and approx overhead

This commit is contained in:
rambros 2026-02-21 20:58:31 +05:30
parent 39dfb789e5
commit a89347f8ee
4 changed files with 182 additions and 17 deletions

View file

@ -110,8 +110,13 @@ class MigrationEngine:
except Exception: except Exception:
await progress_callback("Server Banner", "ERROR") await progress_callback("Server Banner", "ERROR")
async def migrate_channels(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): async def migrate_channels(self, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, force: bool = False):
"""Clones categories and text channels.""" """Clones categories and text channels.
Args:
progress_callback: Optional callback receiving (item_name, status, current, total)
force: If True, re-create channels even if they exist in state.
"""
categories = await self.discord_reader.get_categories() categories = await self.discord_reader.get_categories()
channels = await self.discord_reader.get_channels() channels = await self.discord_reader.get_channels()
@ -121,21 +126,30 @@ class MigrationEngine:
# Migrate Categories first # Migrate Categories first
for cat in categories: for cat in categories:
if not self.is_running: break if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(str(cat.id))
state_key = str(cat.id)
fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key)
status = "Copying"
if not fluxer_id: if not fluxer_id:
# 4 corresponds to Category type in Discord/Fluxer typically # 4 corresponds to Category type in Discord/Fluxer typically
fluxer_id = await self.fluxer_writer.create_channel(cat.name, type=4) fluxer_id = await self.fluxer_writer.create_channel(cat.name, type=4)
self.state.set_channel_mapping(str(cat.id), fluxer_id) self.state.set_channel_mapping(state_key, fluxer_id)
else:
status = "Skipping"
current_idx += 1 current_idx += 1
if progress_callback: await progress_callback(f"Cat: {cat.name}", current_idx, total) if progress_callback: await progress_callback(f"Cat: {cat.name}", status, current_idx, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
# Migrate Text Channels # Migrate Text Channels
for channel in channels: for channel in channels:
if not self.is_running: break if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(str(channel.id)) state_key = str(channel.id)
fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key)
status = "Copying"
if not fluxer_id: if not fluxer_id:
topic = channel.topic if channel.topic else "" topic = channel.topic if channel.topic else ""
parent_id = self.state.get_fluxer_channel_id(str(channel.category_id)) if channel.category_id else None parent_id = self.state.get_fluxer_channel_id(str(channel.category_id)) if channel.category_id else None
@ -146,10 +160,12 @@ class MigrationEngine:
type=0, type=0,
parent_id=parent_id parent_id=parent_id
) )
self.state.set_channel_mapping(str(channel.id), fluxer_id) self.state.set_channel_mapping(state_key, fluxer_id)
else:
status = "Skipping"
current_idx += 1 current_idx += 1
if progress_callback: await progress_callback(channel.name, current_idx, total) if progress_callback: await progress_callback(channel.name, status, current_idx, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def sync_permissions(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None): async def sync_permissions(self, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None):
@ -184,6 +200,33 @@ class MigrationEngine:
if progress_callback: await progress_callback(channel.name, current_idx, total) if progress_callback: await progress_callback(channel.name, current_idx, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def analyze_migration(self, source_channel_id: int, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, int]:
"""
Scans channel history to count messages, threads, and attachments.
"""
stats = {"messages": 0, "threads": 0, "attachments": 0}
async for msg in self.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id):
if not self.is_running:
break
stats["messages"] += 1
stats["attachments"] += len(msg.attachments)
# Count thread messages and markers
if hasattr(msg, 'thread') and msg.thread:
stats["threads"] += 1
# Recursively count thread content
thread_stats = await self.analyze_migration(msg.thread.id)
stats["messages"] += thread_stats["messages"]
stats["attachments"] += thread_stats["attachments"]
stats["threads"] += thread_stats["threads"] # Nested threads (rare in Discord but possible in forum channels)
if progress_callback and stats["messages"] % 10 == 0:
await progress_callback(stats["messages"])
return stats
async def migrate_messages(self, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None): async def migrate_messages(self, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None):
"""Migrate messages for a specific channel.""" """Migrate messages for a specific channel."""
message_count = 0 message_count = 0
@ -241,12 +284,39 @@ class MigrationEngine:
if fluxer_msg_id: if fluxer_msg_id:
self.state.set_message_mapping(str(msg.id), fluxer_msg_id) self.state.set_message_mapping(str(msg.id), fluxer_msg_id)
# Check for associated thread
if hasattr(msg, 'thread') and msg.thread:
thread = msg.thread
logger.info(f"Detected thread '{thread.name}' on message {msg.id}")
# Send Start Marker
await self.fluxer_writer.send_marker(
channel_id=target_channel_id,
content=f"> <<< THREAD: **{thread.name}** >>>"
)
# Migrate thread messages
# We don't pass a progress callback here to avoid confusing the UI
# but we do want to track count if possible.
await self.migrate_messages(
source_channel_id=thread.id,
target_channel_id=target_channel_id
)
# Send End Marker
await self.fluxer_writer.send_marker(
channel_id=target_channel_id,
content=f"> <<< END OF THREAD >>>"
)
self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at))
message_count += 1 message_count += 1
if progress_callback: if progress_callback:
await progress_callback(message_count) await progress_callback(message_count)
except Exception as e: except Exception as e:
logger.error(f"Failed to send message to Fluxer: {e}") logger.error(f"Failed to process message {msg.id}: {e}")
import traceback
logger.error(traceback.format_exc())
# Delay for rate limit safety # Delay for rate limit safety
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
@ -345,7 +415,10 @@ class MigrationEngine:
async def danger_delete_all_channels(self, progress_callback=None) -> int: async def danger_delete_all_channels(self, progress_callback=None) -> int:
"""Deletes every channel and category in the Fluxer community.""" """Deletes every channel and category in the Fluxer community."""
return await self.fluxer_writer.delete_all_channels(progress_callback=progress_callback) count = await self.fluxer_writer.delete_all_channels(progress_callback=progress_callback)
self.state.clear_channel_mappings()
self.state.clear_message_history()
return count
async def danger_reset_channel_permissions(self, progress_callback=None) -> int: async def danger_reset_channel_permissions(self, progress_callback=None) -> int:
"""Resets all permission overwrites on every channel and category.""" """Resets all permission overwrites on every channel and category."""
@ -353,9 +426,13 @@ class MigrationEngine:
async def danger_delete_all_roles(self, progress_callback=None) -> int: async def danger_delete_all_roles(self, progress_callback=None) -> int:
"""Deletes all deletable roles (skips managed/bot roles and @everyone).""" """Deletes all deletable roles (skips managed/bot roles and @everyone)."""
return await self.fluxer_writer.delete_all_roles(progress_callback=progress_callback) count = await self.fluxer_writer.delete_all_roles(progress_callback=progress_callback)
self.state.clear_role_mappings()
return count
async def danger_delete_all_emojis_and_stickers(self, progress_callback=None) -> dict: async def danger_delete_all_emojis_and_stickers(self, progress_callback=None) -> dict:
"""Deletes all custom emojis and stickers. Returns {"emojis": int, "stickers": int}.""" """Deletes all custom emojis and stickers. Returns {"emojis": int, "stickers": int}."""
return await self.fluxer_writer.delete_all_emojis_and_stickers(progress_callback=progress_callback) counts = await self.fluxer_writer.delete_all_emojis_and_stickers(progress_callback=progress_callback)
self.state.clear_asset_mappings()
return counts

View file

@ -56,3 +56,31 @@ class MigrationState:
def update_last_message_timestamp(self, channel_id: str, timestamp: str): def update_last_message_timestamp(self, channel_id: str, timestamp: str):
self.last_message_timestamps[str(channel_id)] = timestamp self.last_message_timestamps[str(channel_id)] = timestamp
self.save() self.save()
def clear_channel_mappings(self):
"""Clears all channel and category mappings (excludes roles/emojis/stickers)."""
to_remove = [k for k in self.channel_map.keys() if k.isdigit()]
for k in to_remove:
del self.channel_map[k]
self.save()
def clear_role_mappings(self):
"""Clears all role mappings."""
to_remove = [k for k in self.channel_map.keys() if k.startswith("role_")]
for k in to_remove:
del self.channel_map[k]
self.role_map.clear()
self.save()
def clear_asset_mappings(self):
"""Clears all emoji and sticker mappings."""
to_remove = [k for k in self.channel_map.keys() if k.startswith("emoji_") or k.startswith("sticker_")]
for k in to_remove:
del self.channel_map[k]
self.save()
def clear_message_history(self):
"""Clears all message mappings and timestamps."""
self.message_map.clear()
self.last_message_timestamps.clear()
self.save()

View file

@ -193,6 +193,20 @@ class FluxerWriter:
print(err_msg) print(err_msg)
return None return None
async def send_marker(self, channel_id: str, content: str) -> Optional[str]:
"""
Sends a simple marker message (e.g., thread start/end) using the bot directly.
"""
assert self.client is not None
try:
msg_data = await self.client.send_message(
channel_id=channel_id,
content=content
)
return str(msg_data["id"]) if msg_data else None
except Exception as e:
print(f"Failed to send marker: {e}")
return None
async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str: async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str:
""" """

View file

@ -245,9 +245,22 @@ class MigrationCLI:
console.print("") console.print("")
if not Confirm.ask("Are you sure you want to clone channels and categories?"): # Check for existing mappings to determine if we should suggest a force re-copy
all_ids = [str(cat.id) for cat in categories] + [str(ch.id) for ch in channels]
cached_count = sum(1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k))
force = False
if cached_count > 0:
console.print(f"[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) already in state.json cache.[/yellow]")
force = Confirm.ask("Force re-clone anyway?", default=False)
elif not Confirm.ask("Are you sure you want to clone channels and categories?"):
await self.engine.close_connections() await self.engine.close_connections()
return return
if cached_count == 0 or force:
if not force and not Confirm.ask("Are you sure you want to clone channels and categories?"):
await self.engine.close_connections()
return
console.print("\n[bold green]Starting Channel Cloning...[/bold green]") console.print("\n[bold green]Starting Channel Cloning...[/bold green]")
try: try:
@ -261,11 +274,12 @@ class MigrationCLI:
channel_task = progress.add_task("[cyan]Copying Channels...", total=100) channel_task = progress.add_task("[cyan]Copying Channels...", total=100)
async def update_progress(item_name: str, current: int, total: int): async def update_progress(item_name: str, status: str, current: int, total: int):
progress.update(channel_task, total=total, completed=current, description=f"[cyan]Copying Channel: {item_name}") color = "cyan" if status == "Copying" else "yellow"
progress.update(channel_task, total=total, completed=current, description=f"[{color}]{status} Channel: {item_name}")
self.engine.is_running = True self.engine.is_running = True
await self.engine.migrate_channels(progress_callback=update_progress) await self.engine.migrate_channels(progress_callback=update_progress, force=force)
console.print("[bold green]Server Template cloned![/bold green]") console.print("[bold green]Server Template cloned![/bold green]")
@ -561,7 +575,39 @@ class MigrationCLI:
console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]") console.print("[yellow]Source channel appears to be empty. Nothing to migrate.[/yellow]")
return return
# 4. Final Confirmation # 4. Analysis and Confirmation
console.print("\n[yellow]Analyzing channel content...[/yellow]")
self.engine.is_running = True
stats = {"messages": 0, "threads": 0, "attachments": 0}
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console
) as progress:
task = progress.add_task("[cyan]Scanning history...", total=None)
async def update_scan_progress(count: int):
progress.update(task, description=f"[cyan]Scanned {count} items...")
stats = await self.engine.analyze_migration(
source_channel_id=source_channel.id,
after_message_id=after_id,
progress_callback=update_scan_progress
)
finally:
self.engine.is_running = False
console.print(f"\n[bold]Migration Summary:[/bold]")
console.print(f"Number of messages: [bold cyan]{stats['messages']}[/bold cyan]")
console.print(f"Number of threads: [bold cyan]{stats['threads']}[/bold cyan]")
console.print(f"Number of attachments: [bold cyan]{stats['attachments']}[/bold cyan]")
console.print("\n[bold yellow]Estimated Overhead:[/bold yellow]")
msg_time = stats['messages'] * self.config.migration.rate_limit_delay_seconds
console.print(f"- [bold]Messages:[/bold] ~{msg_time}s delay (rate limiting), {stats['messages']} API writes.")
console.print(f"- [bold]Threads:[/bold] {stats['threads'] * 2} extra marker messages, {stats['threads']} extra history fetches.")
console.print(f"- [bold]Attachments:[/bold] {stats['attachments']} downloads and uploads (bandwidth & API calls).")
if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [magenta]#{target_channel.get('name')}[/magenta]?"): if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [magenta]#{target_channel.get('name')}[/magenta]?"):
return return