message migration resume function

This commit is contained in:
rambros 2026-02-23 15:40:11 +05:30
parent 1a17fad4a9
commit 4f557dc20c
4 changed files with 150 additions and 27 deletions

View file

@ -28,7 +28,7 @@ Fluxer Reaper is a simple tool to help you move an entire Discord server over to
### 5. Migrate Message History ### 5. Migrate Message History
* **Masquerade User**: Uses webhooks to mirror original user avatars and nicknames for a seamless transition. * **Masquerade User**: Uses webhooks to mirror original user avatars and nicknames for a seamless transition.
* **Contextual Pairing**: Easily map Discord channels to their Fluxer counterparts. * **Contextual Pairing**: Easily map Discord channels to their Fluxer counterparts.
* **Flexible Start Points**: Start from the oldest message or a specific custom message ID/link. * **Flexible Start Points**: Start from the oldest message, a specific custom message ID/link, or continue from where you left off.
* **Rich History**: Migrates message content, author markers, and attachments. * **Rich History**: Migrates message content, author markers, and attachments.
* **Thread Support**: Handles threads with dedicated markers in the target channel. * **Thread Support**: Handles threads with dedicated markers in the target channel.
@ -103,6 +103,9 @@ Simply run the downloaded file to start the tool. No Python installation is requ
- **windows**: run the **fluxer-reaper.exe** file - **windows**: run the **fluxer-reaper.exe** file
### Documentation
Detailed info about the code can be found at https://deepwiki.com/rambros3d/fluxer-reaper
## Vibe Code Notice ## Vibe Code Notice
- Code is provided as is; This tool was developed with AI. - Code is provided as is; This tool was developed with AI.

View file

@ -1,11 +1,37 @@
import asyncio import asyncio
import logging import logging
from typing import Callable, Awaitable, Dict import re
from typing import Callable, Awaitable, Dict, Any
from src.core.base import MigrationContext from src.core.base import MigrationContext
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def clean_mentions(content: str, guild) -> str:
if not content or not guild:
return content
def replace_user(match):
uid = int(match.group(1))
member = guild.get_member(uid)
return f"@{member.display_name}" if member else match.group(0)
def replace_role(match):
rid = int(match.group(1))
role = guild.get_role(rid)
return f"@{role.name}" if role else match.group(0)
def replace_channel(match):
cid = int(match.group(1))
channel = guild.get_channel(cid)
return f"#{channel.name}" if channel else match.group(0)
content = re.sub(r'<@!?([0-9]+)>', replace_user, content)
content = re.sub(r'<@&([0-9]+)>', replace_role, content)
content = re.sub(r'<#([0-9]+)>', replace_channel, content)
return content
async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, int]: async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, int]:
""" """
Scans channel history to count messages, threads, and attachments. Scans channel history to count messages, threads, and attachments.
@ -34,9 +60,15 @@ async def analyze_migration(context: MigrationContext, source_channel_id: int, a
return stats return stats
async def migrate_messages(context: MigrationContext, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None): async def migrate_messages(context: MigrationContext, source_channel_id: int, target_channel_id: str, after_message_id: int | None = None, progress_callback: Callable[[int], Awaitable[None]] | None = None) -> Dict[str, Any]:
"""Migrate messages for a specific channel.""" """Migrate messages for a specific channel and returns detailed statistics."""
message_count = 0 stats = {
"messages": 0,
"attachments": 0,
"threads": 0,
"first_message_url": None,
"last_message_url": None
}
async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id):
if not context.is_running: if not context.is_running:
break break
@ -52,7 +84,7 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
is_forwarded = msg.flags.forwarded is_forwarded = msg.flags.forwarded
# If forwarded, the content and attachments might be in message_snapshots (discord.py 2.5+) # If forwarded, the content and attachments might be in message_snapshots (discord.py 2.5+)
content = msg.content content = msg.clean_content
if is_forwarded: if is_forwarded:
logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}") logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}")
if hasattr(msg, 'message_snapshots') and msg.message_snapshots: if hasattr(msg, 'message_snapshots') and msg.message_snapshots:
@ -60,6 +92,8 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
snapshot = msg.message_snapshots[0] snapshot = msg.message_snapshots[0]
if not content: if not content:
content = snapshot.content content = snapshot.content
if hasattr(msg, 'guild') and msg.guild:
content = clean_mentions(content, msg.guild)
# Add snapshot attachments to the list to process # Add snapshot attachments to the list to process
attachments_to_process.extend(snapshot.attachments) attachments_to_process.extend(snapshot.attachments)
logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments") logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments")
@ -68,6 +102,7 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
try: try:
att_data = await context.discord_reader.download_attachment(att) att_data = await context.discord_reader.download_attachment(att)
files.append({"filename": att.filename, "data": att_data}) files.append({"filename": att.filename, "data": att_data})
stats["attachments"] += 1
except Exception as e: except Exception as e:
logger.error(f"Failed to download attachment {att.filename}: {e}") logger.error(f"Failed to download attachment {att.filename}: {e}")
@ -97,6 +132,7 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
logger.info(f"Detected thread '{thread.name}' on message {msg.id}") logger.info(f"Detected thread '{thread.name}' on message {msg.id}")
# Send Start Marker # Send Start Marker
stats["threads"] += 1
await context.fluxer_writer.send_marker( await context.fluxer_writer.send_marker(
channel_id=target_channel_id, channel_id=target_channel_id,
content=f"> <<< THREAD: **{thread.name}** >>>" content=f"> <<< THREAD: **{thread.name}** >>>"
@ -105,11 +141,14 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
# Migrate thread messages # Migrate thread messages
# We don't pass a progress callback here to avoid confusing the UI # We don't pass a progress callback here to avoid confusing the UI
# but we do want to track count if possible. # but we do want to track count if possible.
await migrate_messages( thread_stats = await migrate_messages(
context=context, context=context,
source_channel_id=thread.id, source_channel_id=thread.id,
target_channel_id=target_channel_id target_channel_id=target_channel_id
) )
stats["messages"] += thread_stats["messages"]
stats["attachments"] += thread_stats["attachments"]
stats["threads"] += thread_stats["threads"]
# Send End Marker # Send End Marker
await context.fluxer_writer.send_marker( await context.fluxer_writer.send_marker(
@ -118,9 +157,17 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
) )
context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at))
message_count += 1 context.state.update_last_message_id(str(source_channel_id), str(msg.id))
stats["messages"] += 1
# Update Link Tracking (but prevent threaded messages from overwriting the parent channel pointers)
# The 'after_message_id' param usually means it's the main function call and not a thread recursive call
if not stats["first_message_url"]:
stats["first_message_url"] = msg.jump_url
stats["last_message_url"] = msg.jump_url
if progress_callback: if progress_callback:
await progress_callback(message_count) await progress_callback(stats["messages"])
except Exception as e: except Exception as e:
logger.error(f"Failed to process message {msg.id}: {e}") logger.error(f"Failed to process message {msg.id}: {e}")
import traceback import traceback
@ -129,4 +176,4 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta
# Delay for rate limit safety # Delay for rate limit safety
await asyncio.sleep(context.config.migration.rate_limit_delay_seconds) await asyncio.sleep(context.config.migration.rate_limit_delay_seconds)
return message_count return stats

View file

@ -22,6 +22,7 @@ class MigrationState:
# message tracking # message tracking
self.message_map: Dict[str, str] = {} self.message_map: Dict[str, str] = {}
self.last_message_timestamps: Dict[str, str] = {} self.last_message_timestamps: Dict[str, str] = {}
self.last_message_ids: Dict[str, str] = {}
self.load() self.load()
@ -44,6 +45,7 @@ class MigrationState:
if "messages" in data or "last_message_timestamps" in data: if "messages" in data or "last_message_timestamps" in data:
self.message_map = data.get("messages", {}) self.message_map = data.get("messages", {})
self.last_message_timestamps = data.get("last_message_timestamps", {}) self.last_message_timestamps = data.get("last_message_timestamps", {})
self.last_message_ids = data.get("last_message_ids", {})
migrated_messages = True # We found legacy data, we should write it out to messages.json later migrated_messages = True # We found legacy data, we should write it out to messages.json later
# Legacy Migration: Move role_, emoji_, sticker_ from channel_map to dedicated maps # Legacy Migration: Move role_, emoji_, sticker_ from channel_map to dedicated maps
@ -68,6 +70,7 @@ class MigrationState:
msg_data = json.load(f) msg_data = json.load(f)
self.message_map = msg_data.get("messages", {}) self.message_map = msg_data.get("messages", {})
self.last_message_timestamps = msg_data.get("last_message_timestamps", {}) self.last_message_timestamps = msg_data.get("last_message_timestamps", {})
self.last_message_ids = msg_data.get("last_message_ids", {})
migrated_messages = False # No need to force a migrating save since it already exists migrated_messages = False # No need to force a migrating save since it already exists
# 3. Save if we migrated any legacy data to separate maps/files # 3. Save if we migrated any legacy data to separate maps/files
@ -93,6 +96,7 @@ class MigrationState:
"""Saves only the message tracking data.""" """Saves only the message tracking data."""
data = { data = {
"last_message_timestamps": self.last_message_timestamps, "last_message_timestamps": self.last_message_timestamps,
"last_message_ids": self.last_message_ids,
"messages": self.message_map "messages": self.message_map
} }
with open(self.messages_file, "w", encoding="utf-8") as f: with open(self.messages_file, "w", encoding="utf-8") as f:
@ -168,6 +172,13 @@ class MigrationState:
self.last_message_timestamps[str(channel_id)] = timestamp self.last_message_timestamps[str(channel_id)] = timestamp
self.save_messages() self.save_messages()
def update_last_message_id(self, channel_id: str, message_id: str):
self.last_message_ids[str(channel_id)] = message_id
self.save_messages()
def get_last_message_id(self, channel_id: str) -> str | None:
return self.last_message_ids.get(str(channel_id))
# --- Danger Zone Clearing --- # --- Danger Zone Clearing ---
def clear_channel_mappings(self): def clear_channel_mappings(self):
@ -191,4 +202,5 @@ class MigrationState:
"""Clears all message mappings and timestamps.""" """Clears all message mappings and timestamps."""
self.message_map.clear() self.message_map.clear()
self.last_message_timestamps.clear() self.last_message_timestamps.clear()
self.last_message_ids.clear()
self.save_messages() self.save_messages()

View file

@ -845,24 +845,72 @@ class MigrationCLI:
first_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{first_msg.id}" first_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{first_msg.id}"
server_name = self.validation_results.get("discord_server_name", "server") server_name = self.validation_results.get("discord_server_name", "server")
# Check for existing migration
last_migrated_id = self.engine.state.get_last_message_id(str(source_channel.id))
next_msg = None
if last_migrated_id:
try:
# Try to fetch the immediate next message
async for msg in self.engine.discord_reader.fetch_message_history(source_channel.id, limit=1, after_id=int(last_migrated_id)):
next_msg = msg
break
except Exception as e:
# Assuming logger is defined elsewhere, if not, this would be an error.
# For this context, I'll assume it's available or a print is acceptable.
# If not, a simple print(f"Warning: Could not fetch next message after {last_migrated_id}: {e}") could be used.
# As per instructions, I'll use logger.warning.
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Could not fetch next message after {last_migrated_id}: {e}")
while True: while True:
console.print(f"\n[bold green]Channel Found![/bold green]") console.print(f"\n[bold green]Channel Found![/bold green]")
console.print(f"Oldest Message Link: {first_msg_link}") console.print(f"Oldest Message")
console.print(f"Link: {first_msg_link}")
console.print(f"Author: [blue]{first_msg.author.name}[/blue]") console.print(f"Author: [blue]{first_msg.author.name}[/blue]")
console.print(f"Content Preview: {first_msg.content[:100]}...") console.print(f"Content Preview: {first_msg.content[:100]}")
console.print("\n(Y) [green]Yes, start from oldest message[/green]") choices = ["M", "m", "B", "b"]
console.print("(M) Provide Specific message link or ID") prompt_str = "Start migration"
console.print("(B) Back")
start_mode = Prompt.ask("Start migration [Y/M/B]", choices=["Y", "y", "M", "m", "B", "b"], default="Y", show_choices=False).upper() if next_msg:
next_msg_link = f"https://discord.com/channels/{self.config.discord_server_id}/{source_channel.id}/{next_msg.id}"
console.print(f"\n[bold yellow]Continue Next message[/bold yellow]")
console.print(f"Link: {next_msg_link}")
console.print(f"Author: [blue]{next_msg.author.name}[/blue]")
console.print(f"Content Preview: {next_msg.content[:100]}")
console.print("\n(F) [red]Force start from oldest message[/red]")
console.print("(M) Provide Specific message link or ID")
console.print("(Y) [green]Continue Migration[/green]")
console.print("(B) Back")
choices.extend(["Y", "y", "F", "f"])
prompt_str = "Select an option [Y/F/M/B]"
default_choice = "Y"
else:
console.print("\n(Y) [green]Yes, start from oldest message[/green]")
console.print("(M) Provide Specific message link or ID")
console.print("(B) Back")
choices.extend(["Y", "y"])
prompt_str = "Start migration [Y/M/B]"
default_choice = "Y"
start_mode = Prompt.ask(prompt_str, choices=choices, default=default_choice, show_choices=False).upper()
if start_mode == "B": if start_mode == "B":
return return
elif start_mode == "Y": elif start_mode == "Y":
if next_msg:
after_id = int(last_migrated_id)
break
elif start_mode == "F":
# User wants to ignore the previous migration and start from beginning
after_id = None
break break
elif start_mode == "M": elif start_mode == "M":
prompt_msg = "Enter Discord Message Link" prompt_msg = "Enter Discord Message Link or Message ID (or 'B' to go back)"
valid_message_found = False valid_message_found = False
while True: while True:
custom_link = Prompt.ask(prompt_msg) custom_link = Prompt.ask(prompt_msg)
@ -880,11 +928,11 @@ class MigrationCLI:
valid_message_found = True valid_message_found = True
break break
else: else:
console.print("\n[red]Error parsing link: 404 Not Found (error code: 10008): Unknown Message.[/red]") console.print("[red]Could not find message. Ensure it exists in this channel.[/red]")
prompt_msg = f"[yellow]Provide a valid Message link or Message ID from \"{server_name}\" (or 'B' to go back)[/yellow]" except ValueError:
console.print("[red]Invalid ID or Link. Please provide a valid numeric ID or Discord Message URL.[/red]")
except Exception as e: except Exception as e:
console.print(f"\n[red]Error parsing link: {e}[/red]") console.print(f"[red]Error fetching message: {str(e)}[/red]")
prompt_msg = f"[yellow]Provide a valid Message link or Message ID from \"{server_name}\" (or 'B' to go back)[/yellow]"
if valid_message_found: if valid_message_found:
break break
@ -947,6 +995,8 @@ class MigrationCLI:
console.print("\n[bold green]Starting Migration...[/bold green]") console.print("\n[bold green]Starting Migration...[/bold green]")
self.engine.is_running = True self.engine.is_running = True
total_messages = stats['messages']
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
@ -954,12 +1004,12 @@ class MigrationCLI:
TaskProgressColumn(), TaskProgressColumn(),
console=console console=console
) as progress: ) as progress:
task = progress.add_task("[cyan]Migrating messages...", total=None) task = progress.add_task(f"[cyan]Migrating 0/{total_messages} messages...", total=total_messages)
async def update_msg_progress(count: int): async def update_msg_progress(count: int):
progress.update(task, description=f"[cyan]Migrated {count} messages...") progress.update(task, completed=count, description=f"[cyan]Migrated {count}/{total_messages} messages...")
count = await migrate_messages( result_stats = await migrate_messages(
self.engine, self.engine,
source_channel_id=source_channel.id, source_channel_id=source_channel.id,
target_channel_id=target_channel.get("id"), target_channel_id=target_channel.get("id"),
@ -967,8 +1017,19 @@ class MigrationCLI:
progress_callback=update_msg_progress progress_callback=update_msg_progress
) )
console.print(f"\n[bold green]Success! {count} messages migrated to {target_channel.get('name')}.[/bold green]") console.print(f"\n[bold green]Success! {result_stats['messages']} messages migrated to {target_channel.get('name')}.[/bold green]")
await log_audit_event(self.engine, "Message History Migrated", f"Successfully migrated {count} messages to #{target_channel.get('name')}.")
audit_text = (
f"**Message info:**\n"
f"first message: {result_stats['first_message_url'] or 'N/A'}\n"
f"last message: {result_stats['last_message_url'] or 'N/A'}\n"
f"**Stats:**\n"
f"{stats['messages']} messages\n"
f"{stats['attachments']} attachments\n"
f"{stats['threads']} threads\n"
f"Successfully migrated to <#{target_channel.get('id')}>\n"
)
await log_audit_event(self.engine, "Message History Migrated", audit_text)
except Exception as e: except Exception as e:
console.print(f"[bold red]Migration encountered an error: {str(e)}[/bold red]") console.print(f"[bold red]Migration encountered an error: {str(e)}[/bold red]")