add waterfall mode

This commit is contained in:
rambros 2026-03-28 16:58:38 +05:30
parent 011c0ca4e0
commit 220f97aad4
8 changed files with 828 additions and 36 deletions

View file

@ -9,8 +9,8 @@
![Disco Reaper](images/fluxer-reaper.jpg) ![Disco Reaper](images/fluxer-reaper.jpg)
### Modern Terminal Interface ### Video Guide - [Youtube](https://www.youtube.com/watch?v=SwIPQDxLzqA)
The tool now features a unified, intuitive TUI (Terminal User Interface) - no more text commands
| Features | Fluxer | Stoat | | Features | Fluxer | Stoat |
| :--- | :---: | :---: | | :--- | :---: | :---: |

View file

@ -791,6 +791,83 @@ class BackupDatabase:
return msg_list return msg_list
def get_global_messages_paged(self, limit: int = 100, offset: int = 0, after_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Fetches messages across ALL channels globally, ordered by timestamp/ID ascending."""
with self._lock:
query = "SELECT * FROM messages"
params = []
if after_id:
query += " WHERE id > ?"
params.append(parse_snowflake(after_id))
query += " ORDER BY id ASC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = self._conn.execute(query, params).fetchall()
msg_list = [dict(r) for r in rows]
if msg_list:
msg_ids = [m["id"] for m in msg_list]
placeholders = ",".join(["?"] * len(msg_ids))
att_rows = self._conn.execute(f"SELECT * FROM attachments WHERE message_id IN ({placeholders})", msg_ids).fetchall()
atts_by_msg = {}
for ar in att_rows:
mid = ar["message_id"]
if mid not in atts_by_msg: atts_by_msg[mid] = []
atts_by_msg[mid].append(dict(ar))
emb_rows = self._conn.execute(f"SELECT * FROM embeds WHERE message_id IN ({placeholders})", msg_ids).fetchall()
embs_by_msg = {}
for er in emb_rows:
mid = er["message_id"]
if mid not in embs_by_msg: embs_by_msg[mid] = []
e_dict = {
"title": er["title"],
"description": er["description"],
"url": er["url"],
"color": er["color"],
"timestamp": er["timestamp"],
"thumbnail": {"url": er["thumbnail_url"]} if er["thumbnail_url"] else None,
"image": {"url": er["image_url"]} if er["image_url"] else None,
"author": {
"name": er["author_name"],
"url": er["author_url"],
"icon_url": er["author_icon_url"]
} if er["author_name"] else None,
"footer": {
"text": er["footer_text"],
"icon_url": er["footer_icon_url"]
} if er["footer_text"] else None,
"fields": json.loads(er["fields"]) if er["fields"] else []
}
embs_by_msg[mid].append(e_dict)
rea_rows = self._conn.execute(f"SELECT * FROM reactions WHERE message_id IN ({placeholders})", msg_ids).fetchall()
reas_by_msg = {}
for rr in rea_rows:
mid = rr["message_id"]
if mid not in reas_by_msg: reas_by_msg[mid] = []
reas_by_msg[mid].append(dict(rr))
st_rows = self._conn.execute(f"SELECT * FROM message_stickers WHERE message_id IN ({placeholders})", msg_ids).fetchall()
sts_by_msg = {}
for sr in st_rows:
mid = sr["message_id"]
if mid not in sts_by_msg: sts_by_msg[mid] = []
sts_by_msg[mid].append(dict(sr))
for m in msg_list:
m_id = m["id"]
m["attachments"] = atts_by_msg.get(m_id, [])
m["embeds"] = embs_by_msg.get(m_id, [])
m["reactions"] = reas_by_msg.get(m_id, [])
m["stickers"] = sts_by_msg.get(m_id, [])
return msg_list
def delete_channel_messages(self, channel_id: Union[str, int]): def delete_channel_messages(self, channel_id: Union[str, int]):
"""Deletes all messages and related metadata for a specific channel and its threads.""" """Deletes all messages and related metadata for a specific channel and its threads."""
cid = parse_snowflake(channel_id) cid = parse_snowflake(channel_id)

View file

@ -1417,6 +1417,42 @@ class BackupReader:
if len(msgs) < batch_size: if len(msgs) < batch_size:
break break
async def fetch_global_message_history(
self,
limit: int = None,
after_id: int = None
) -> AsyncGenerator["BackupMessage", None]:
"""Yields BackupMessages globally from SQLite across all channels, natively ordered by timestamp/ID."""
if not self.db: return
offset = 0
batch_size = 100
count = 0
while True:
actual_limit = batch_size
if limit:
rem = limit - count
if rem <= 0: break
actual_limit = min(batch_size, rem)
msgs = self.db.get_global_messages_paged(
limit=actual_limit,
offset=offset,
after_id=str(after_id) if after_id else None
)
if not msgs:
break
for m in msgs:
yield self._hydrate_message(m)
count += 1
offset += len(msgs)
if len(msgs) < batch_size:
break
# ── download helpers ───────────────────────────────────────────────── # ── download helpers ─────────────────────────────────────────────────
async def download_emoji(self, emoji: BackupEmoji) -> bytes: async def download_emoji(self, emoji: BackupEmoji) -> bytes:

View file

@ -451,6 +451,35 @@ class MigrationDatabase:
return dict(row) return dict(row)
return {"last_msg_id": None, "last_msg_ts": None, "msg_count": 0, "file_count": 0} return {"last_msg_id": None, "last_msg_ts": None, "msg_count": 0, "file_count": 0}
def get_global_min_last_message_id(self, mapped_channel_ids: List[str]) -> Optional[str]:
"""Returns the minimum last_msg_id across all mapped channels. If any mapped channel has NO last_msg_id, returns None."""
if not mapped_channel_ids:
return None
conn = self._get_conn()
placeholders = ",".join(["?"] * len(mapped_channel_ids))
rows = conn.execute(f"SELECT last_msg_id FROM channel_tracking WHERE channel_id IN ({placeholders})", mapped_channel_ids).fetchall()
# If the number of tracked channels is less than mapped, it means some mapped channels haven't started.
if len(rows) < len(mapped_channel_ids):
return None
# Parse all ids
ids = []
for r in rows:
val = r["last_msg_id"]
if not val:
return None # One channel has no messages yet
try:
ids.append(int(val))
except ValueError:
pass
if not ids:
return None
return str(min(ids))
# Thread methods similar to channel methods # Thread methods similar to channel methods
def set_thread_message_mapping(self, channel_id: str, thread_id: str, source_id: str, target_id: str, timestamp: str = None): def set_thread_message_mapping(self, channel_id: str, thread_id: str, source_id: str, target_id: str, timestamp: str = None):
conn = self._get_conn() conn = self._get_conn()

View file

@ -215,6 +215,12 @@ class MigrationState:
return self.db.get_channel_tracking(str(target_channel_id)).get("last_msg_id") return self.db.get_channel_tracking(str(target_channel_id)).get("last_msg_id")
return None return None
def get_global_min_last_message_id(self, mapped_channel_ids: List[str]) -> str | None:
"""Returns the absolute minimum last_msg_id among the given list of mapped target channel IDs."""
if self._ensure_db():
return self.db.get_global_min_last_message_id(mapped_channel_ids)
return None
def get_thread_last_message_id(self, target_channel_id: str, thread_id: str) -> str | None: def get_thread_last_message_id(self, target_channel_id: str, thread_id: str) -> str | None:
if self._ensure_db(): if self._ensure_db():
return self.db.get_thread_tracking(str(target_channel_id), str(thread_id)).get("last_msg_id") return self.db.get_thread_tracking(str(target_channel_id), str(thread_id)).get("last_msg_id")

View file

@ -4,6 +4,7 @@ import re
import json import json
import io import io
from typing import Callable, Awaitable, Dict, Any, List from typing import Callable, Awaitable, Dict, Any, List
from pathlib import Path
try: try:
from lottie.objects import Animation from lottie.objects import Animation
@ -27,22 +28,25 @@ def clean_mentions(content: str, guild, user_mentions=None, role_mentions=None,
uid = int(match.group(1)) uid = int(match.group(1))
if anonymize_users and state: if anonymize_users and state:
alias = state.get_user_alias(str(uid)) alias = state.get_user_alias(str(uid))
return f"`@{alias}`" return f"`@{alias}`" if alias else "`@Anonymized User`"
# 1. Try provided guild # 1. Try provided guild
member = guild.get_member(uid) member = guild.get_member(uid)
if member: if member:
return f"`@{member.display_name}`" return f"`@{member.display_name}`"
# 2. Try message's user_mentions
# 2. Try provided user_mentions
if user_mentions: if user_mentions:
for u in user_mentions: m = next((u for u in user_mentions if u.id == uid), None)
if u.id == uid: if m:
return f"`@{getattr(u, 'display_name', u.name)}`" return f"`@{m.display_name}`"
# 3. Try global cache via guild.client # 3. Try global cache via guild.client
if hasattr(guild, 'client'): if hasattr(guild, 'client'):
user = guild.client.get_user(uid) user = guild.client.get_user(uid)
if user: if user:
return f"`@{user.name}`" return f"`@{user.name}`"
return f"`@Unknown User`" return "`@Unknown User`"
def replace_role(match): def replace_role(match):
rid = int(match.group(1)) rid = int(match.group(1))
@ -557,23 +561,22 @@ async def migrate_messages(
if thread_name and stats["messages"] == 0: if thread_name and stats["messages"] == 0:
content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" content = f"> <<< THREAD: **{thread_name}** >>>\n{content}"
# Get or generate alias # Always ensure alias is created/retrieved to populate user_alias table
alias = context.state.get_user_alias(str(msg.author.id)) alias = context.state.get_user_alias(str(msg.author.id))
if context.config.anonymize_users: anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
author_name = alias if anonymize_users:
avatar_url = f"https://api.dicebear.com/9.x/fun-emoji/jpg?seed={alias}" author_name = alias or "Anonymized User"
author_avatar_url = None
else: else:
author_name = msg.author.display_name author_name = msg.author.display_name
avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
if avatar_url and not avatar_url.startswith("http"):
avatar_url = None
logger.debug(f"Fluxer: Calling send_message for Discord ID {msg.id}") logger.debug(f"Fluxer: Calling send_message for Discord ID {msg.id}")
fluxer_msg_id = await context.fluxer_writer.send_message( fluxer_msg_id = await context.fluxer_writer.send_message(
channel_id=target_channel_id, channel_id=target_channel_id,
author_name=author_name, author_name=author_name,
author_avatar_url=avatar_url, author_avatar_url=author_avatar_url,
content=content, content=content,
timestamp=int(msg.created_at.timestamp()), timestamp=int(msg.created_at.timestamp()),
files=files if files else None, files=files if files else None,
@ -663,3 +666,232 @@ async def migrate_messages(
pass pass
return stats return stats
async def analyze_global_migration(context: MigrationContext, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None) -> Dict[str, int]:
"""
Scans the entire server history to count messages, threads, and attachments globally.
"""
stats = {"messages": 0, "threads": 0, "attachments": 0}
# In global mode, thread messages are returned natively in timestamp order by global fetch if they're in the DB
# However we just count them if the fetcher yields them.
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running:
break
if msg.type not in [
context.discord_reader.MESSAGE_TYPE_DEFAULT,
context.discord_reader.MESSAGE_TYPE_REPLY,
context.discord_reader.MESSAGE_TYPE_THREAD_STARTER,
context.discord_reader.MESSAGE_TYPE_FORWARD,
context.discord_reader.MESSAGE_TYPE_CHAT_INPUT_COMMAND,
context.discord_reader.MESSAGE_TYPE_CONTEXT_MENU_COMMAND,
context.discord_reader.MESSAGE_TYPE_POLL_RESULT,
context.discord_reader.MESSAGE_TYPE_AUTO_MODERATION_ACTION
]:
continue
stats["messages"] += 1
stats["attachments"] += len(msg.attachments)
if hasattr(msg, 'thread') and msg.thread:
# We don't recursively traverse here, we just count the fact there is a thread
# The actual thread messages are also fetched by the global fetcher because they have their own timestamp/id
stats["threads"] += 1
if progress_callback and stats["messages"] % 100 == 0:
await progress_callback(stats)
if progress_callback:
await progress_callback(stats)
return stats
async def migrate_global_messages(
context: MigrationContext,
after_message_id: int | None = None,
inclusive: bool = False,
progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None
) -> Dict[str, Any]:
"""
Migrates messages across all channels chronologically.
"""
stats = {
"messages": 0,
"threads": 0,
"attachments": 0,
"last_message_content": "",
"last_message_author": "",
"first_message_url": None,
"last_message_url": None
}
processed_threads = set()
logger.info("Starting Global Waterfall Migration for Fluxer...")
# Keep track of active thread mapping natively to pass parent target IDs if needed
thread_to_target_channel = {}
# Emojis and mapped users cache setup
emoji_map = context.state.emoji_map
db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {}
target_server_id = getattr(context.fluxer_writer, "server_id", None)
try:
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running:
logger.warning("Global migration interrupted by user")
break
if msg.type not in [
context.discord_reader.MESSAGE_TYPE_DEFAULT,
context.discord_reader.MESSAGE_TYPE_REPLY,
context.discord_reader.MESSAGE_TYPE_THREAD_STARTER,
context.discord_reader.MESSAGE_TYPE_FORWARD,
context.discord_reader.MESSAGE_TYPE_CHAT_INPUT_COMMAND,
context.discord_reader.MESSAGE_TYPE_CONTEXT_MENU_COMMAND,
context.discord_reader.MESSAGE_TYPE_POLL_RESULT,
context.discord_reader.MESSAGE_TYPE_AUTO_MODERATION_ACTION
]:
continue
# Determine target channel
target_channel_id = context.state.get_target_channel_id(str(msg.channel.id))
if not target_channel_id:
logger.debug(f"Skipping msg {msg.id}: channel {msg.channel.id} not mapped.")
continue
# If it's a thread message, we need to handle it based on if it's the thread starter or a reply
parent_target_id = None
if hasattr(msg, 'thread') and msg.thread and msg.id == msg.thread.id:
processed_threads.add(msg.thread.id)
stats["threads"] += 1
elif msg.channel.type in [11, 12]: # Thread channels
# It's a message IN a thread.
# In Fluxer, threads might just be linear messages or threaded replies depending on schema
# For basic migration we just send it to the parent mapped target channel.
# The parent mapped target channel ID should already be calculated correctly by get_target_channel_id (which returns mapped thread or parent channel)
pass
# Formatting
files = []
file_names = []
# Always ensure alias is created/retrieved to populate user_alias table
alias = context.state.get_user_alias(str(msg.author.id))
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
if anonymize_users:
author_name = alias or "Anonymized User"
author_avatar_url = None
else:
author_name = msg.author.display_name
author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
for att in msg.attachments:
media_info = db_media.get(att.local_hash) if db_media else None
local_path = None
if media_info:
local_path = Path(media_info["local_path"])
elif hasattr(att, 'read'):
# Fallback
pass
if local_path and local_path.exists():
files.append(local_path)
file_names.append(att.filename)
content = msg.content or ""
# Stickers
for sticker in msg.stickers:
sticker_name = sticker.name
sticker_url = sticker.url
# Check for uploaded media pool logic first
s_hash = sticker.local_hash
sticker_file = None
s_media = db_media.get(s_hash) if db_media and s_hash else None
if s_media:
s_path = Path(s_media["local_path"])
if s_path.exists():
sticker_file = s_path
content += f"\n[Sticker: {sticker_name}]"
if sticker_file:
files.append(sticker_file)
file_names.append(f"sticker_{sticker_name}.png")
content = clean_mentions(
content=content,
guild=context.discord_reader.guild,
user_mentions=msg.mentions,
role_mentions=msg.role_mentions,
channel_mentions=msg.channel_mentions,
emoji_map=emoji_map,
channel_map=context.state.channel_map,
state=context.state,
target_server_id=target_server_id,
channel_names=context.channel_names if hasattr(context, 'channel_names') else None,
anonymize_users=anonymize_users
)
if not content and not files:
logger.debug(f"Message {msg.id} empty after processing, skipping.")
continue
timestamp_int = int(msg.created_at.timestamp())
if msg.reference and msg.reference.message_id:
# Resolve the author of the message being replied to
source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id)
if source_ref_msg and source_ref_msg.author:
ref_author_id = str(source_ref_msg.author.id)
if anonymize_users:
ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User"
else:
ref_name = source_ref_msg.author.display_name
content = f"`@{ref_name}`\n{content}"
else:
# Fallback if author cannot be resolved (e.g. deleted/missing from backup)
tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id)
if tgt_reply:
content = f"[Reply to {tgt_reply}]\n{content}"
try:
fluxer_msg_id = await context.fluxer_writer.send_message(
channel_id=target_channel_id,
author_name=author_name,
author_avatar_url=author_avatar_url,
content=content,
files=files,
timestamp=timestamp_int,
embeds=msg.embeds
)
if fluxer_msg_id:
context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id)
context.state.update_last_message_id(target_channel_id, msg.id)
stats["attachments"] += len(files) if files else 0
stats["messages"] += 1
stats["last_message_content"] = content
stats["last_message_author"] = author_name
if not stats["first_message_url"]:
stats["first_message_url"] = msg.jump_url
stats["last_message_url"] = msg.jump_url
if progress_callback:
await progress_callback(stats)
except Exception as e:
logger.error(f"Failed to process global message {msg.id}: {e}")
except (KeyboardInterrupt, asyncio.CancelledError):
context.is_running = False
pass
return stats

View file

@ -4,6 +4,7 @@ import re
import json import json
import io import io
from typing import Callable, Awaitable, Dict, Any, List from typing import Callable, Awaitable, Dict, Any, List
from pathlib import Path
try: try:
from lottie.objects import Animation from lottie.objects import Animation
@ -27,22 +28,25 @@ def clean_mentions(content: str, guild, user_mentions=None, role_mentions=None,
uid = int(match.group(1)) uid = int(match.group(1))
if anonymize_users and state: if anonymize_users and state:
alias = state.get_user_alias(str(uid)) alias = state.get_user_alias(str(uid))
return f"`@{alias}`" return f"`@{alias}`" if alias else "`@Anonymized User`"
# 1. Try provided guild # 1. Try provided guild
member = guild.get_member(uid) member = guild.get_member(uid)
if member: if member:
return f"`@{member.display_name}`" return f"`@{member.display_name}`"
# 2. Try message's user_mentions
# 2. Try provided user_mentions
if user_mentions: if user_mentions:
for u in user_mentions: m = next((u for u in user_mentions if u.id == uid), None)
if u.id == uid: if m:
return f"`@{getattr(u, 'display_name', u.name)}`" return f"`@{m.display_name}`"
# 3. Try global cache via guild.client # 3. Try global cache via guild.client
if hasattr(guild, 'client'): if hasattr(guild, 'client'):
user = guild.client.get_user(uid) user = guild.client.get_user(uid)
if user: if user:
return f"`@{user.name}`" return f"`@{user.name}`"
return f"`@Unknown User`" return "`@Unknown User`"
def replace_role(match): def replace_role(match):
rid = int(match.group(1)) rid = int(match.group(1))
@ -560,22 +564,24 @@ async def migrate_messages(
if thread_name and stats["messages"] == 0: if thread_name and stats["messages"] == 0:
content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" content = f"> <<< THREAD: **{thread_name}** >>>\n{content}"
# Get or generate alias # Always ensure alias is created/retrieved to populate user_alias table
alias = context.state.get_user_alias(str(msg.author.id)) alias = context.state.get_user_alias(str(msg.author.id))
if context.config.anonymize_users: anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
author_name = alias if anonymize_users:
avatar_url = f"https://api.dicebear.com/9.x/fun-emoji/jpg?seed={alias}" author_name = alias or "Anonymized User"
author_avatar_url = None
else: else:
author_name = msg.author.display_name author_name = msg.author.display_name
avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None
if avatar_url and not avatar_url.startswith("http"): if author_avatar_url and not author_avatar_url.startswith("http"):
avatar_url = None author_avatar_url = None
logger.debug(f"Stoat: Calling send_message for Discord ID {msg.id}")
stoat_msg_id = await context.stoat_writer.send_message( stoat_msg_id = await context.stoat_writer.send_message(
channel_id=target_channel_id, channel_id=target_channel_id,
author_name=author_name, author_name=author_name,
author_avatar_url=avatar_url, author_avatar_url=author_avatar_url,
content=content, content=content,
timestamp=int(msg.created_at.timestamp()), timestamp=int(msg.created_at.timestamp()),
files=files if files else None, files=files if files else None,
@ -664,3 +670,212 @@ async def migrate_messages(
pass pass
return stats return stats
async def analyze_global_migration(context: MigrationContext, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None) -> Dict[str, int]:
"""
Scans the entire server history to count messages, threads, and attachments globally.
"""
stats = {"messages": 0, "threads": 0, "attachments": 0}
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running:
break
if msg.type not in [
context.discord_reader.MESSAGE_TYPE_DEFAULT,
context.discord_reader.MESSAGE_TYPE_REPLY,
context.discord_reader.MESSAGE_TYPE_THREAD_STARTER,
context.discord_reader.MESSAGE_TYPE_FORWARD,
context.discord_reader.MESSAGE_TYPE_CHAT_INPUT_COMMAND,
context.discord_reader.MESSAGE_TYPE_CONTEXT_MENU_COMMAND,
context.discord_reader.MESSAGE_TYPE_POLL_RESULT,
context.discord_reader.MESSAGE_TYPE_AUTO_MODERATION_ACTION
]:
continue
stats["messages"] += 1
stats["attachments"] += len(msg.attachments)
if hasattr(msg, 'thread') and msg.thread:
stats["threads"] += 1
if progress_callback and stats["messages"] % 100 == 0:
await progress_callback(stats)
if progress_callback:
await progress_callback(stats)
return stats
async def migrate_global_messages(
context: MigrationContext,
after_message_id: int | None = None,
inclusive: bool = False,
progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None
) -> Dict[str, Any]:
"""
Migrates messages across all channels chronologically to Stoat.
"""
stats = {
"messages": 0,
"threads": 0,
"attachments": 0,
"last_message_content": "",
"last_message_author": "",
"first_message_url": None,
"last_message_url": None
}
processed_threads = set()
logger.info("Starting Global Waterfall Migration for Stoat...")
emoji_map = context.state.emoji_map
db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {}
target_server_id = getattr(context.stoat_writer, "community_id", None)
try:
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running:
logger.warning("Global migration interrupted by user")
break
if msg.type not in [
context.discord_reader.MESSAGE_TYPE_DEFAULT,
context.discord_reader.MESSAGE_TYPE_REPLY,
context.discord_reader.MESSAGE_TYPE_THREAD_STARTER,
context.discord_reader.MESSAGE_TYPE_FORWARD,
context.discord_reader.MESSAGE_TYPE_CHAT_INPUT_COMMAND,
context.discord_reader.MESSAGE_TYPE_CONTEXT_MENU_COMMAND,
context.discord_reader.MESSAGE_TYPE_POLL_RESULT,
context.discord_reader.MESSAGE_TYPE_AUTO_MODERATION_ACTION
]:
continue
target_channel_id = context.state.get_target_channel_id(str(msg.channel.id))
if not target_channel_id:
logger.debug(f"Skipping msg {msg.id}: channel {msg.channel.id} not mapped.")
continue
parent_target_id = None
if hasattr(msg, 'thread') and msg.thread and msg.id == msg.thread.id:
processed_threads.add(msg.thread.id)
stats["threads"] += 1
elif msg.channel.type in [11, 12]:
pass
# Formatting
files = []
# Always ensure alias is created/retrieved to populate user_alias table
alias = context.state.get_user_alias(str(msg.author.id))
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
if anonymize_users:
author_name = alias or "Anonymized User"
author_avatar_url = None
else:
author_name = msg.author.display_name
author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
for att in msg.attachments:
media_info = db_media.get(att.local_hash) if db_media else None
local_path = None
if media_info:
local_path = Path(media_info["local_path"])
if local_path and local_path.exists():
try:
with open(local_path, "rb") as f:
files.append({"filename": att.filename, "data": f.read()})
except Exception as fe:
logger.error(f"Failed to read file {local_path}: {fe}")
content = msg.content or ""
for sticker in msg.stickers:
sticker_name = sticker.name
s_hash = sticker.local_hash
sticker_file = None
s_media = db_media.get(s_hash) if db_media and s_hash else None
if s_media:
s_path = Path(s_media["local_path"])
if s_path.exists():
sticker_file = s_path
content += f"\n[Sticker: {sticker_name}]"
if sticker_file:
files.append(sticker_file)
file_names.append(f"sticker_{sticker_name}.png")
content = clean_mentions(
content=content,
guild=context.discord_reader.guild,
user_mentions=msg.mentions,
role_mentions=msg.role_mentions,
channel_mentions=msg.channel_mentions,
emoji_map=emoji_map,
channel_map=context.state.channel_map,
state=context.state,
target_server_id=target_server_id,
channel_names=context.channel_names if hasattr(context, 'channel_names') else None,
anonymize_users=anonymize_users
)
if not content and not files:
logger.debug(f"Message {msg.id} empty after processing, skipping.")
continue
timestamp_int = int(msg.created_at.timestamp())
if msg.reference and msg.reference.message_id:
# Resolve the author of the message being replied to
source_ref_msg = await context.discord_reader.get_message(msg.channel_id, msg.reference.message_id)
if source_ref_msg and source_ref_msg.author:
ref_author_id = str(source_ref_msg.author.id)
if anonymize_users:
ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User"
else:
ref_name = source_ref_msg.author.display_name
content = f"`@{ref_name}`\n{content}"
else:
tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id)
if tgt_reply:
content = f"[Reply to {tgt_reply}]\n{content}"
try:
stoat_msg_id = await context.stoat_writer.send_message(
channel_id=target_channel_id,
author_name=author_name,
author_avatar_url=author_avatar_url,
content=content,
files=files,
timestamp=timestamp_int,
embeds=msg.embeds
)
if stoat_msg_id:
context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id)
context.state.update_last_message_id(target_channel_id, msg.id)
stats["attachments"] += len(files) if files else 0
stats["messages"] += 1
stats["last_message_content"] = content
stats["last_message_author"] = author_name
if not stats["first_message_url"]:
stats["first_message_url"] = msg.jump_url
stats["last_message_url"] = msg.jump_url
if progress_callback:
await progress_callback(stats)
except Exception as e:
logger.error(f"Failed to process global message {msg.id}: {e}")
except (KeyboardInterrupt, asyncio.CancelledError):
context.is_running = False
pass
return stats

View file

@ -160,6 +160,7 @@ class OperationPane(Container):
yield Button("Clone Server Template", id="op_clone", disabled=True, tooltip="Clone server roles, categories, and channels to the target community") yield Button("Clone Server Template", id="op_clone", disabled=True, tooltip="Clone server roles, categories, and channels to the target community")
yield Button("Sync Server Settings", id="op_sync", disabled=True, tooltip="Sync emojis, stickers, server name, and icon to the target community") yield Button("Sync Server Settings", id="op_sync", disabled=True, tooltip="Sync emojis, stickers, server name, and icon to the target community")
yield Button("Migrate Message History", id="op_messages", disabled=True, variant="primary", tooltip="Migrate message history from Discord to the target platform") yield Button("Migrate Message History", id="op_messages", disabled=True, variant="primary", tooltip="Migrate message history from Discord to the target platform")
yield Button("Waterfall Migration", id="op_waterfall", disabled=True, variant="primary", tooltip="Migrate all messages globally in chronological order to prevent broken links.\n(Available for Local Backups)")
yield Rule(id="footer_rule") yield Rule(id="footer_rule")
yield Button("Danger Zone ⚠", id="op_danger", variant="error", disabled=True, flat=True, tooltip="Dangerous operations:\ndelete channels, roles, emojis on target\n(use with caution)") yield Button("Danger Zone ⚠", id="op_danger", variant="error", disabled=True, flat=True, tooltip="Dangerous operations:\ndelete channels, roles, emojis on target\n(use with caution)")
@ -394,7 +395,7 @@ class OperationPane(Container):
lbl.update(f"{t_status}") lbl.update(f"{t_status}")
# Buttons # Buttons
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_danger"): for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_waterfall", "#op_danger"):
for btn in self.query(bid): btn.disabled = not self.tokens_valid for btn in self.query(bid): btn.disabled = not self.tokens_valid
# ── validation ──────────────────────────────────────────────────────── # ── validation ────────────────────────────────────────────────────────
@ -415,7 +416,7 @@ class OperationPane(Container):
# Disable all operation buttons while validation is in progress # Disable all operation buttons while validation is in progress
if self.view_mode == "shuttle": if self.view_mode == "shuttle":
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_danger"): for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_waterfall", "#op_danger"):
for btn in self.query(bid): btn.disabled = True for btn in self.query(bid): btn.disabled = True
elif self.view_mode == "backup": elif self.view_mode == "backup":
for bid in ("#op_backup_msgs", "#op_backup_sync"): for bid in ("#op_backup_msgs", "#op_backup_sync"):
@ -552,7 +553,12 @@ class OperationPane(Container):
target_ok = v.get("target_token") and v.get("target_community") target_ok = v.get("target_token") and v.get("target_community")
self.tokens_valid = bool(discord_ok and target_ok) self.tokens_valid = bool(discord_ok and target_ok)
# Post validation adjustments
if self.tokens_valid:
is_backup = (self.config.tool_mode == "backup_transfer")
for btn in self.query("#op_waterfall"):
btn.disabled = not is_backup
btn.display = is_backup
self._update_info_labels() self._update_info_labels()
@ -570,6 +576,8 @@ class OperationPane(Container):
self._open_sync_menu() self._open_sync_menu()
elif bid == "op_messages": elif bid == "op_messages":
self.run_migrate_messages() self.run_migrate_messages()
elif bid == "op_waterfall":
self.run_waterfall_migration()
elif bid == "op_danger": elif bid == "op_danger":
self._open_danger_menu() self._open_danger_menu()
@ -1294,6 +1302,14 @@ class OperationPane(Container):
try: try:
self.engine.is_running = True self.engine.is_running = True
# Ensure state is initialized (database exists)
if self.target_platform == "stoat":
tid = self.config.stoat_server_id
else:
tid = self.config.fluxer_server_id
self.engine.ensure_state_initialized(str(tid or ""), platform_name)
stats_analysis = await migrate_mod.analyze_migration( stats_analysis = await migrate_mod.analyze_migration(
self.engine, self.engine,
source_channel_id=source_channel.id, source_channel_id=source_channel.id,
@ -1399,6 +1415,187 @@ class OperationPane(Container):
else: else:
modal.write(f"[bold red]Error: {err}[/bold red]") modal.write(f"[bold red]Error: {err}[/bold red]")
modal.phase_report("Message Migration", "error", show_back=False) modal.phase_report("Message Migration", "error", show_back=False)
import traceback
logger.error(f"Migration Error: {traceback.format_exc()}")
finally:
self.engine.is_running = False
await self.engine.close_connections()
@work(exclusive=True)
async def run_waterfall_migration(self) -> None:
if not self.tokens_valid:
return
migrate_mod = fluxer_migrate if self.target_platform == "fluxer" else stoat_migrate
platform_name = self.target_platform.capitalize()
modal = ProgressScreen(log_level=self.config.log_level)
self.app.push_screen(modal)
await asyncio.sleep(0.1)
try:
modal.show_info("[bold cyan]Waterfall Migration Ready[/bold cyan]", "Checking mapping and missing channels...")
modal.set_status("Connecting to Servers...")
await self.engine.start_connections()
modal.set_status("Synchronizing entity mappings...")
await self._perform_auto_matching()
# 1. Missing channels check
full_d = await self.engine.discord_reader.get_channels()
if hasattr(self.engine.discord_reader, "get_backed_up_channel_ids"):
valid_ids = await self.engine.discord_reader.get_backed_up_channel_ids()
d_channels = [c for c in full_d if c.id in valid_ids and c.type in [0, 5]]
else:
d_channels = [c for c in full_d if c.type in [0, 5]]
missing_channels = []
for d in d_channels:
tgt_id = self.engine.state.get_target_channel_id(str(d.id))
if not tgt_id:
missing_channels.append(d)
if missing_channels:
modal.write(f"\n[bold yellow]Found {len(missing_channels)} channels with backups but no target mapping.[/bold yellow]")
modal.write("[dim]Do you want to automatically create these missing channels now?[/dim]")
choice = await modal.phase_wait_confirm(
show_continue=False,
show_id=True,
btn_start_label=f"Yes, Create {len(missing_channels)} Missing Channels",
btn_id_label="No, Skip Them",
btn_start_variant="primary",
btn_start_tooltip="Create channels and map them",
btn_id_tooltip="Skip them (Warning: may cause broken mentions)"
)
if choice == "btn_back":
modal.dismiss()
return
elif choice == "btn_main_menu":
modal.dismiss()
self.app.switch_screen("config_selection")
return
if choice == "btn_start_first":
modal.set_status("Creating missing channels...")
for mc in missing_channels:
try:
modal.write(f"Creating channel '#{mc.name}'...")
new_id = await self.engine.writer.create_channel(name=mc.name)
# Link them
self.engine.state.set_target_channel_id(str(mc.id), new_id, self.engine.platform)
modal.write(f"[green]Created {mc.name} ({new_id})[/green]")
except Exception as e:
modal.write(f"[red]Failed to create {mc.name}: {e}[/red]")
# 2. Resumption check
all_mapped_tgt_ids = []
# Check regular channels
for did in [str(c.id) for c in d_channels]:
tid = self.engine.state.get_target_channel_id(did)
if tid: all_mapped_tgt_ids.append(tid)
# Also check threads
if hasattr(self.engine.discord_reader, "get_active_threads"):
threads = await self.engine.discord_reader.get_active_threads()
for t in threads:
tid = self.engine.state.get_target_channel_id(str(t.id))
if tid: all_mapped_tgt_ids.append(tid)
min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids)
modal.write(f"\n[bold cyan]Waterfall Migration Resume Point:[/bold cyan]")
if min_last_id:
modal.write(f"Minimum unmigrated message ID found: [green]{min_last_id}[/green]")
else:
modal.write("No previous migration state found. Starting from the beginning.")
choice = await modal.phase_wait_confirm(
show_continue=bool(min_last_id),
show_id=False,
btn_start_label="Start From Beginning",
btn_start_tooltip="Safe, skips duplicates automatically",
btn_start_variant="default" if min_last_id else "primary",
btn_continue_label=f"Continue from ID {min_last_id}" if min_last_id else "Continue Migration",
btn_continue_tooltip="Fastest"
)
if choice == "btn_back":
modal.dismiss()
await self.engine.close_connections()
return
elif choice == "btn_main_menu":
modal.dismiss()
await self.engine.close_connections()
self.app.switch_screen("config_selection")
return
after_id = None
if choice == "btn_continue" and min_last_id:
after_id = int(min_last_id)
# Phase 3: Progress
modal.cancel_callback = lambda: setattr(self.engine, "is_running", False)
modal.phase_progress()
modal.set_status("Migrating messages Globally...")
self.engine.is_running = True
# Ensure state is initialized (database exists)
if self.target_platform == "stoat":
tid = self.config.stoat_server_id
else:
tid = self.config.fluxer_server_id
self.engine.ensure_state_initialized(str(tid or ""), platform_name)
modal.write("Scanning global footprint for totals ...")
stats_analysis = await migrate_mod.analyze_global_migration(self.engine, after_message_id=after_id)
total_messages = stats_analysis["messages"]
modal.write(f"[bold cyan]Global Migration Started:[/bold cyan] {total_messages} total messages to process.")
modal.update_stats(messages=f"0/{total_messages}", threads=str(stats_analysis["threads"]), files=str(stats_analysis["attachments"]))
async def update_msg(current_stats):
c_msgs = current_stats["messages"]
c_threads = current_stats["threads"]
c_files = current_stats["attachments"]
msg_stat = f"{c_msgs}/{total_messages}" if total_messages > 0 else str(c_msgs)
modal.set_progress(c_msgs, total_messages or 100)
modal.update_stats(messages=msg_stat, threads=str(c_threads), files=str(c_files))
content = current_stats.get("last_message_content", "")
author = current_stats.get("last_message_author", "Unknown")
if content:
disp_content = (content[:100] + '...') if len(content) > 100 else content
modal.write(f"[bold]{author}:[/bold] {disp_content}")
result = await migrate_mod.migrate_global_messages(
self.engine,
after_message_id=after_id,
inclusive=False,
progress_callback=update_msg,
)
if self.engine.is_running:
modal.write(f"[bold green]Success! {result['messages']} messages migrated globally.[/bold green]")
modal.phase_report("Waterfall Migration", show_back=False)
else:
modal.write(f"[bold yellow]Interrupted! {result['messages']} messages migrated.[/bold yellow]")
modal.phase_report("Waterfall Migration", "stopped", show_back=False)
lines = [f"Migrated Server Globally → {platform_name}:"]
lines.append(f"{result['messages']} messages, {result['attachments']} attachments, {result['threads']} threads")
await log_audit_event(self.engine, "Waterfall Migration", "\n".join(lines))
except Exception as e:
err = str(e)
modal.write(f"[bold red]Error: {err}[/bold red]")
modal.phase_report("Waterfall Migration", "error", show_back=False)
import traceback
logger.error(traceback.format_exc())
finally: finally:
self.engine.is_running = False self.engine.is_running = False
await self.engine.close_connections() await self.engine.close_connections()