message migration for stoat

This commit is contained in:
rambros 2026-02-25 03:11:10 +05:30
parent bd7d8c68b6
commit f9a664b2ee
3 changed files with 284 additions and 19 deletions

View file

@ -0,0 +1,175 @@
import asyncio
import logging
import re
from typing import Callable, Awaitable, Dict, Any
from src.core.base import MigrationContext
logger = logging.getLogger(__name__)
def clean_mentions(content: str, guild) -> str:
if not content or not guild:
return content
def replace_user(match):
uid = int(match.group(1))
member = guild.get_member(uid)
return f"@{member.display_name}" if member else match.group(0)
def replace_role(match):
rid = int(match.group(1))
role = guild.get_role(rid)
return f"@{role.name}" if role else match.group(0)
def replace_channel(match):
cid = int(match.group(1))
channel = guild.get_channel(cid)
return f"#{channel.name}" if channel else match.group(0)
content = re.sub(r'<@!?([0-9]+)>', replace_user, content)
content = re.sub(r'<@&([0-9]+)>', replace_role, content)
content = re.sub(r'<#([0-9]+)>', replace_channel, content)
return content
async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, int]:
"""
Scans channel history to count messages, threads, and attachments.
"""
stats = {"messages": 0, "threads": 0, "attachments": 0}
async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id):
if not context.is_running:
break
stats["messages"] += 1
stats["attachments"] += len(msg.attachments)
# Count thread messages and markers
if hasattr(msg, 'thread') and msg.thread:
stats["threads"] += 1
thread_stats = await analyze_migration(context, msg.thread.id)
stats["messages"] += thread_stats["messages"]
stats["attachments"] += thread_stats["attachments"]
stats["threads"] += thread_stats["threads"]
if progress_callback and stats["messages"] % 10 == 0:
await progress_callback(stats["messages"])
return stats
async def migrate_messages(context: MigrationContext, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, Any]:
"""Migrate messages for a specific channel using Stoat masquerade for author impersonation."""
stats = {
"messages": 0,
"attachments": 0,
"threads": 0,
"first_message_url": None,
"last_message_url": None
}
async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id):
if not context.is_running:
break
# Process attachments
files = []
attachments_to_process = list(msg.attachments)
# Check if this message is forwarded
is_forwarded = False
if hasattr(msg.flags, 'forwarded'):
is_forwarded = msg.flags.forwarded
# Get clean content
content = msg.clean_content
if is_forwarded:
logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}")
if hasattr(msg, 'message_snapshots') and msg.message_snapshots:
snapshot = msg.message_snapshots[0]
if not content:
content = snapshot.content
if hasattr(msg, 'guild') and msg.guild:
content = clean_mentions(content, msg.guild)
attachments_to_process.extend(snapshot.attachments)
logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments")
for att in attachments_to_process:
try:
att_data = await context.discord_reader.download_attachment(att)
files.append({"filename": att.filename, "data": att_data})
stats["attachments"] += 1
except Exception as e:
logger.error(f"Failed to download attachment {att.filename}: {e}")
try:
# Check if this message is a reply
reply_to_stoat_id = None
if msg.reference and msg.reference.message_id:
reply_to_stoat_id = context.state.get_target_message_id(str(msg.reference.message_id))
stoat_msg_id = await context.stoat_writer.send_message(
channel_id=target_channel_id,
author_name=msg.author.display_name,
author_avatar_url=str(msg.author.display_avatar.url),
content=content,
timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"),
files=files if files else None,
reply_to_message_id=reply_to_stoat_id,
is_forwarded=is_forwarded
)
if stoat_msg_id:
context.state.set_message_mapping(str(msg.id), stoat_msg_id)
# Check for associated thread
if hasattr(msg, 'thread') and msg.thread:
thread = msg.thread
logger.info(f"Detected thread '{thread.name}' on message {msg.id}")
# Send Start Marker
stats["threads"] += 1
await context.stoat_writer.send_marker(
channel_id=target_channel_id,
content=f"> <<< THREAD: **{thread.name}** >>>"
)
# Migrate thread messages recursively
thread_stats = await migrate_messages(
context=context,
source_channel_id=thread.id,
target_channel_id=target_channel_id
)
stats["messages"] += thread_stats["messages"]
stats["attachments"] += thread_stats["attachments"]
stats["threads"] += thread_stats["threads"]
# Send End Marker
await context.stoat_writer.send_marker(
channel_id=target_channel_id,
content=f"> <<< END OF THREAD >>>"
)
context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at))
context.state.update_last_message_id(str(source_channel_id), str(msg.id))
stats["messages"] += 1
# Update Link Tracking
if not stats["first_message_url"]:
stats["first_message_url"] = msg.jump_url
stats["last_message_url"] = msg.jump_url
if progress_callback:
await progress_callback(stats["messages"])
except Exception as e:
# If it's a permission error, stop the entire migration
if "MissingPermission" in str(e):
raise
logger.error(f"Failed to process message {msg.id}: {e}")
import traceback
logger.error(traceback.format_exc())
# Delay for rate limit safety
await asyncio.sleep(context.config.migration.rate_limit_delay_seconds)
return stats

View file

@ -76,7 +76,14 @@ class StoatWriter:
"manage_server": perms.manage_server, "manage_server": perms.manage_server,
"manage_permissions": perms.manage_permissions, "manage_permissions": perms.manage_permissions,
"manage_roles": perms.manage_roles, "manage_roles": perms.manage_roles,
"manage_customization": perms.manage_customization "manage_customization": perms.manage_customization,
"manage_messages": perms.manage_messages,
"send_messages": perms.send_messages,
"masquerade": perms.use_masquerade,
"upload_files": perms.upload_files,
"react": perms.react,
"mention_everyone": perms.mention_everyone,
"mention_roles": perms.mention_roles
} }
except stoat.NotFound: except stoat.NotFound:
logger.error(f"Bot member {current_user.id} not found in Stoat server {self.community_id}.") logger.error(f"Bot member {current_user.id} not found in Stoat server {self.community_id}.")
@ -199,15 +206,79 @@ class StoatWriter:
async def move_channel(self, channel_id: str, parent_id: Optional[str]) -> bool: async def move_channel(self, channel_id: str, parent_id: Optional[str]) -> bool:
return True return True
async def send_message(self, **kwargs) -> Optional[str]: async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str,
return "dummy_stoat_message_id" author_avatar_url: Optional[str] = None, files: Optional[List[Dict[str, Any]]] = None,
reply_to_message_id: Optional[str] = None, is_forwarded: bool = False) -> Optional[str]:
"""
Sends a message to the target channel using Stoat's masquerade feature.
Raises on permission errors caller must handle.
"""
try:
channel = await self.client.fetch_channel(channel_id)
# Build masquerade to impersonate original author
masquerade = stoat.MessageMasquerade(
name=f"{author_name} (discord)",
avatar=author_avatar_url
)
# Build content with timestamp prefix
prefix = f"##### {timestamp}\n"
if is_forwarded:
prefix += "##### -->*forwarded*\n"
display_content = content
if is_forwarded and content:
display_content = f"> {content}"
final_content = prefix + display_content if display_content else prefix
# Build replies list
replies = None
if reply_to_message_id:
replies = [stoat.Reply(id=reply_to_message_id, mention=False)]
# Build attachments list using ResolvableResource tuple format: (filename, bytes)
attachments = None
if files:
attachments = []
for f in files:
attachments.append((f["filename"], f["data"]))
try:
msg = await channel.send(
content=final_content,
masquerade=masquerade,
replies=replies,
attachments=attachments
)
return str(msg.id) if msg else None
except Exception as e:
# If file type not allowed, skip attachments and still send the message
if "FileTypeNotAllowed" in str(e) and attachments:
logger.warning(f"File type blocked by server, sending without attachments: {e}")
filenames = "\n".join(f"- {a[0]}" for a in attachments)
note = f"\n{len(attachments)} attachment(s) (file type not allowed by stoat)\n{filenames}"
msg = await channel.send(
content=final_content + note,
masquerade=masquerade,
replies=replies
)
return str(msg.id) if msg else None
raise # Re-raise MissingPermission and other errors
except Exception as e:
logger.error(f"Failed to send Stoat message to {channel_id}: {e}")
raise # Let caller handle (migration loop will stop for permission errors)
async def send_marker(self, channel_id: str, content: str, files: Optional[List[Dict[str, Any]]] = None) -> Optional[str]: async def send_marker(self, channel_id: str, content: str, files: Optional[List[Dict[str, Any]]] = None) -> Optional[str]:
try: try:
channel = await self.client.fetch_channel(channel_id) channel = await self.client.fetch_channel(channel_id)
# Stoat channel.send takes content attachments = None
# files support might be different, skipping for markers if files:
msg = await channel.send(content=content) attachments = []
for f in files:
attachments.append((f["filename"], f["data"]))
msg = await channel.send(content=content, attachments=attachments)
return str(msg.id) return str(msg.id)
except Exception as e: except Exception as e:
logger.error(f"Failed to send Stoat marker to {channel_id}: {e}") logger.error(f"Failed to send Stoat marker to {channel_id}: {e}")

View file

@ -16,7 +16,8 @@ import src.fluxer.emoji_stickers as fluxer_emoji_stickers
import src.stoat.emoji_stickers as stoat_emoji_stickers import src.stoat.emoji_stickers as stoat_emoji_stickers
import src.fluxer.server_metadata as fluxer_metadata import src.fluxer.server_metadata as fluxer_metadata
import src.stoat.server_metadata as stoat_metadata import src.stoat.server_metadata as stoat_metadata
from src.fluxer.migrate_message import analyze_migration, migrate_messages import src.fluxer.migrate_message as fluxer_migrate
import src.stoat.migrate_message as stoat_migrate
from src.core.audit import log_audit_event from src.core.audit import log_audit_event
@ -295,6 +296,9 @@ class MigrationCLI:
choice = Prompt.ask("\nSelect an option [1/2/3/4/5/6/7/Q]", choices=["1", "2", "3", "4", "5", "6", "7", "Q", "q"], default="Q", show_choices=False).upper() choice = Prompt.ask("\nSelect an option [1/2/3/4/5/6/7/Q]", choices=["1", "2", "3", "4", "5", "6", "7", "Q", "q"], default="Q", show_choices=False).upper()
if choice in ("1", "2", "3", "4", "5", "7") and self.tokens_valid and not self.permissions_complete:
console.print("[bold red]Warning:[/bold red][bold yellow] Permission Missing - Check[/bold yellow] [bold blue](6) Configuration[/bold blue] [bold yellow]for more info[/bold yellow]")
if choice == "1": if choice == "1":
await self.clone_server_template() await self.clone_server_template()
elif choice == "2": elif choice == "2":
@ -347,7 +351,9 @@ class MigrationCLI:
perm_table.add_row( perm_table.add_row(
"[bold #FF8C00]Stoat Bot[/bold #FF8C00]", "[bold #FF8C00]Stoat Bot[/bold #FF8C00]",
f"• [bold]Permissions:[/bold] {fmt('Manage Channel', s_perms.get('manage_channels'))}, {fmt('Manage Server', s_perms.get('manage_server'))},\n" f"• [bold]Permissions:[/bold] {fmt('Manage Channel', s_perms.get('manage_channels'))}, {fmt('Manage Server', s_perms.get('manage_server'))},\n"
f" {fmt('Manage Permissions', s_perms.get('manage_permissions'))}, {fmt('Manage Roles', s_perms.get('manage_roles'))}, {fmt('Manage Customization', s_perms.get('manage_customization'))}" f" {fmt('Manage Permissions', s_perms.get('manage_permissions'))}, {fmt('Manage Roles', s_perms.get('manage_roles'))}, {fmt('Manage Customization', s_perms.get('manage_customization'))},\n"
f" {fmt('Manage Messages', s_perms.get('manage_messages'))}, {fmt('Send Messages', s_perms.get('send_messages'))}, {fmt('Masquerade', s_perms.get('masquerade'))},\n"
f" {fmt('Upload Files', s_perms.get('upload_files'))}, {fmt('React', s_perms.get('react'))}, {fmt('Mention Everyone', s_perms.get('mention_everyone'))}, {fmt('Mention Roles', s_perms.get('mention_roles'))}"
) )
console.print("\n") # Add spacing before panel console.print("\n") # Add spacing before panel
@ -680,7 +686,7 @@ class MigrationCLI:
total_mapped = mapped_cats + mapped_chs total_mapped = mapped_cats + mapped_chs
console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]") console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]")
console.print("[bold green](Y) Proceed with Permission synchronization[/bold green]") console.print("[bold green](Y) Proceed with Permission sync[/bold green]")
console.print("[bold yellow](B) Back[/bold yellow]") console.print("[bold yellow](B) Back[/bold yellow]")
sub_choice = Prompt.ask("Select an option [Y/B]", choices=["Y", "y", "B", "b"], default="Y", show_choices=False).upper() sub_choice = Prompt.ask("Select an option [Y/B]", choices=["Y", "y", "B", "b"], default="Y", show_choices=False).upper()
@ -978,11 +984,15 @@ class MigrationCLI:
source_channel = d_channels[int(d_choice) - 1] source_channel = d_channels[int(d_choice) - 1]
# 2. Select Target Fluxer Channel # Select appropriate migration module
with console.status("[yellow]Fetching Fluxer channels...[/yellow]"): migrate_mod = fluxer_migrate if self.engine.target_platform == "fluxer" else stoat_migrate
platform_name = self.engine.target_platform.capitalize()
# 2. Select Target Channel
with console.status(f"[yellow]Fetching {platform_name} channels...[/yellow]"):
f_channels = await self.engine.writer.get_channels() f_channels = await self.engine.writer.get_channels()
if not f_channels: if not f_channels:
console.print("[yellow]No channels found in Fluxer community.[/yellow]") console.print(f"[yellow]No channels found in {platform_name} community.[/yellow]")
return return
# Determine recommended channel # Determine recommended channel
@ -996,7 +1006,7 @@ class MigrationCLI:
if not recommended_channel: if not recommended_channel:
recommended_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None) recommended_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None)
console.print("\n[bold]Select Target Fluxer Channel:[/bold]") console.print(f"\n[bold]Select Target {platform_name} Channel:[/bold]")
for i, ch in enumerate(f_channels): for i, ch in enumerate(f_channels):
console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}") console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}")
@ -1015,7 +1025,7 @@ class MigrationCLI:
if recommended_channel: if recommended_channel:
prompt_parts.append("Y") prompt_parts.append("Y")
prompt_msg = f"Select Fluxer Channel [[bold cyan]{'/'.join(prompt_parts)}[/bold cyan]]" prompt_msg = f"Select {platform_name} Channel [[bold cyan]{'/'.join(prompt_parts)}[/bold cyan]]"
f_choice = Prompt.ask(prompt_msg, choices=f_choices, show_choices=False, default="Y" if recommended_channel else None).upper() f_choice = Prompt.ask(prompt_msg, choices=f_choices, show_choices=False, default="Y" if recommended_channel else None).upper()
@ -1028,7 +1038,7 @@ class MigrationCLI:
# Check if a channel with this name already exists # Check if a channel with this name already exists
target_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None) target_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None)
if not target_channel: if not target_channel:
with console.status(f"[yellow]Creating Fluxer channel #{source_channel.name}...[/yellow]"): with console.status(f"[yellow]Creating {platform_name} channel #{source_channel.name}...[/yellow]"):
parent_id = None parent_id = None
if source_channel.category_id: if source_channel.category_id:
parent_id = self.engine.state.get_fluxer_category_id(str(source_channel.category_id)) parent_id = self.engine.state.get_fluxer_category_id(str(source_channel.category_id))
@ -1175,7 +1185,7 @@ class MigrationCLI:
async def update_scan_progress(count: int): async def update_scan_progress(count: int):
progress.update(task, description=f"[cyan]Scanned {count} items...") progress.update(task, description=f"[cyan]Scanned {count} items...")
stats = await analyze_migration( stats = await migrate_mod.analyze_migration(
self.engine, self.engine,
source_channel_id=source_channel.id, source_channel_id=source_channel.id,
after_message_id=after_id, after_message_id=after_id,
@ -1209,7 +1219,7 @@ class MigrationCLI:
console.print("") console.print("")
console.print(table) console.print(table)
if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to Fluxer [#4641D9]#{target_channel.get('name')}[/#4641D9]?"): if not Confirm.ask(f"\nMigrate messages from Discord [cyan]#{source_channel.name}[/cyan] to {platform_name} [#4641D9]#{target_channel.get('name')}[/#4641D9]?"):
return return
# 5. Migration Execution # 5. Migration Execution
@ -1230,7 +1240,7 @@ class MigrationCLI:
async def update_msg_progress(count: int): async def update_msg_progress(count: int):
progress.update(task, completed=count, description=f"[cyan]Migrated {count}/{total_messages} messages...") progress.update(task, completed=count, description=f"[cyan]Migrated {count}/{total_messages} messages...")
result_stats = await migrate_messages( result_stats = await migrate_mod.migrate_messages(
self.engine, self.engine,
source_channel_id=source_channel.id, source_channel_id=source_channel.id,
target_channel_id=target_channel.get("id"), target_channel_id=target_channel.get("id"),
@ -1257,7 +1267,16 @@ class MigrationCLI:
await log_audit_event(self.engine, "Message History Migrated", audit_desc) await log_audit_event(self.engine, "Message History Migrated", audit_desc)
except Exception as e: except Exception as e:
console.print(f"[bold red]Migration encountered an error: {str(e)}[/bold red]") err_str = str(e)
if "MissingPermission" in err_str and "Masquerade" in err_str:
console.print(f"\n[bold red]Migration stopped: Bot is missing the 'Masquerade' permission.[/bold red]")
console.print(f"[yellow]Grant the 'Masquerade' permission to the bot's role in your Stoat server settings, then retry.[/yellow]")
elif "MissingPermission" in err_str:
console.print(f"\n[bold red]Migration stopped: Bot is missing a required permission.[/bold red]")
console.print(f"[yellow]Error: {err_str}[/yellow]")
console.print(f"[yellow]Grant the required permission to the bot's role in your Stoat server settings, then retry.[/yellow]")
else:
console.print(f"[bold red]Migration encountered an error: {err_str}[/bold red]")
finally: finally:
self.engine.is_running = False self.engine.is_running = False
await self.engine.close_connections() await self.engine.close_connections()