From c581911f68b64f36c774c344d5abd060d50ab1e4 Mon Sep 17 00:00:00 2001 From: rambros3d Date: Mon, 30 Mar 2026 01:03:05 +0530 Subject: [PATCH 1/8] add windows build script --- build.bat | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 build.bat diff --git a/build.bat b/build.bat new file mode 100644 index 0000000..8702e45 --- /dev/null +++ b/build.bat @@ -0,0 +1,77 @@ +@echo off +setlocal enabledelayedexpansion +cd /d "%~dp0" + +echo --- Disco-Reaper Windows Build Script --- + +REM Check for venv +IF NOT EXIST "venv" ( + echo Creating virtual environment... + python -m venv venv + IF ERRORLEVEL 1 ( + echo Error: Failed to create virtual environment. + echo Ensure Python is installed and added to PATH. + IF NOT DEFINED AUTO_BUILD pause + exit /b 1 + ) +) + +echo Activating virtual environment... +call venv\Scripts\activate.bat + +REM Self-healing pip check +python -m pip --version >nul 2>&1 +IF ERRORLEVEL 1 ( + echo Warning: pip is missing or broken in venv. Attempting repair... + python -m ensurepip --default-pip + IF ERRORLEVEL 1 ( + echo Error: Failed to repair pip automatically. + echo Try recreating the venv: rmdir /s /q venv ^&^& python -m venv venv + IF NOT DEFINED AUTO_BUILD pause + exit /b 1 + ) +) + +echo Ensuring build dependencies are up to date... +REM python -m pip install --upgrade pip --quiet +python -m pip install pyinstaller --quiet +python -m pip install -r requirements.txt --quiet + +echo Cleaning previous build artifacts... +IF EXIST "build" rmdir /s /q build +IF EXIST "dist" rmdir /s /q dist + +echo Starting PyInstaller build... +REM Get git version tag +set "GIT_VERSION=Unknown" +for /f "tokens=*" %%i in ('git describe --tags --abbrev^=0 2^>nul') do set "GIT_VERSION=%%i" +echo Baking version: %GIT_VERSION% +echo __version__ = "%GIT_VERSION%"> src\core\_baked_version.py + +python -m PyInstaller --clean disco-reaper.spec +IF ERRORLEVEL 1 ( + echo Error: PyInstaller build failed. + del /f src\core\_baked_version.py 2>nul + IF NOT DEFINED AUTO_BUILD pause + exit /b 1 +) + +echo Cleaning up baked version file... +del /f src\core\_baked_version.py 2>nul + +echo Packaging release: disco-reaper-windows.zip... +cd dist +powershell -Command "Compress-Archive -Path 'DiscoReaper.exe' -DestinationPath 'disco-reaper-windows.zip' -Force" 2>nul +IF ERRORLEVEL 1 ( + echo Warning: Failed to create zip. Files are available in dist\ directory. +) ELSE ( + echo Package created: dist\disco-reaper-windows.zip +) +cd .. + +echo ----------------------------------- +echo Build complete! +echo Standalone executable: dist\DiscoReaper.exe +echo Release Package: dist\disco-reaper-windows.zip +echo --- +IF NOT DEFINED AUTO_BUILD pause From 3f649b30627848d0927a8e989c99b4b5c8b2aedc Mon Sep 17 00:00:00 2001 From: rambros3d Date: Mon, 30 Mar 2026 01:26:04 +0530 Subject: [PATCH 2/8] improve log outputs --- disco-reaper.py | 22 +++++++++++++--------- src/fluxer/writer.py | 18 ++++++++++++++++++ src/stoat/writer.py | 37 ++++++++++++++++++++++++++++++------- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/disco-reaper.py b/disco-reaper.py index 6b8b233..33d1212 100644 --- a/disco-reaper.py +++ b/disco-reaper.py @@ -1,17 +1,18 @@ import sys import logging +from logging.handlers import RotatingFileHandler from src.ui.main_app import run_disco_reaper_tui from src.core.configuration import load_config def setup_logging(): try: config = load_config(create_if_missing=False) - log_level_str = config.migration.log_level.upper() + log_level_str = config.log_level.upper() level = getattr(logging, log_level_str, logging.INFO) except Exception: level = logging.INFO - handlers = [logging.FileHandler('.reaper.log', mode='a')] + handlers = [RotatingFileHandler('.reaper.log', mode='a', maxBytes=10*1024*1024, backupCount=3)] logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%H:%M:%S', @@ -92,18 +93,21 @@ def cleanup_old_update(): """Removes the .old executable left behind by a Windows update.""" import os import sys + from pathlib import Path if sys.platform != "win32": return - - current_exe = sys.executable if getattr(sys, 'frozen', False) else sys.argv[0] - old_exe = current_exe + ".old" - if os.path.exists(old_exe): + # In frozen (PyInstaller) builds, sys.executable points to the temp _MEIxxxxx dir. + # sys.argv[0] always points to the real .exe on disk, so use that and resolve() it. + current_exe = Path(sys.argv[0]).resolve() + old_exe = current_exe.with_suffix(current_exe.suffix + ".old") + + if old_exe.exists(): try: - os.remove(old_exe) - except Exception: - pass + old_exe.unlink() + except Exception as e: + logging.getLogger(__name__).debug(f"Could not remove old update file {old_exe}: {e}") def main(): import os diff --git a/src/fluxer/writer.py b/src/fluxer/writer.py index dffa111..d271a7c 100644 --- a/src/fluxer/writer.py +++ b/src/fluxer/writer.py @@ -36,6 +36,7 @@ class FluxerWriter: guilds_list.append((label, str(g.id))) return guilds_list except Exception as e: + print(f"Failed to fetch Fluxer communities via HTTP: {e}") logger.error(f"Failed to fetch Fluxer communities via HTTP: {e}") raise @@ -61,6 +62,7 @@ class FluxerWriter: return w except Exception as e: print(f"Failed to manage webhook for channel {channel_id}: {e}") + logger.error(f"Failed to manage webhook for channel {channel_id}: {e}") return None async def start(self): @@ -322,6 +324,7 @@ class FluxerWriter: logger.debug(f"Fluxer: Webhook send complete, msg_id={msg.id if msg else 'None'}") return str(msg.id) if msg else None except asyncio.TimeoutError: + print(f"Fluxer: Webhook send timed out after 45s for channel {channel_id}") logger.error(f"Fluxer: Webhook send timed out after 45s for channel {channel_id}") return None else: @@ -353,6 +356,7 @@ class FluxerWriter: logger.debug(f"Fluxer: Bot send complete, msg_id={msg_data.get('id') if msg_data else 'None'}") return str(msg_data["id"]) if msg_data else None except asyncio.TimeoutError: + print(f"Fluxer: Bot send timed out after 45s for channel {channel_id}") logger.error(f"Fluxer: Bot send timed out after 45s for channel {channel_id}") return None except Exception as e: @@ -386,6 +390,7 @@ class FluxerWriter: return str(msg_data["id"]) if msg_data else None except Exception as e: print(f"Failed to send marker: {e}") + logger.error(f"Failed to send marker: {e}") return None async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool, permissions: int, position: Optional[int] = None) -> str: @@ -408,6 +413,7 @@ class FluxerWriter: return str(role["id"]) except Exception as e: print(f"Failed to copy role {name}: {e}") + logger.error(f"Failed to copy role {name}: {e}") return "" async def create_emoji(self, name: str, image_bytes: bytes) -> str: @@ -473,6 +479,7 @@ class FluxerWriter: ) except Exception as e: print(f"Failed to update community metadata: {e}") + logger.error(f"Failed to update community metadata: {e}") async def remove_community_logo_and_banner(self) -> dict: """ @@ -503,6 +510,7 @@ class FluxerWriter: ) except Exception as e: print(f"Failed to remove community icon: {e}") + logger.error(f"Failed to remove community icon: {e}") # 3. Remove banner if set if has_banner: @@ -513,6 +521,7 @@ class FluxerWriter: ) except Exception as e: print(f"Failed to remove community banner: {e}") + logger.error(f"Failed to remove community banner: {e}") return { "icon": "REMOVED" if has_icon else "SKIP", @@ -544,6 +553,7 @@ class FluxerWriter: await progress_callback(ch.get("name", "Unknown"), deleted, total) except Exception as e: print(f"Failed to delete channel {ch.get('name')}: {e}") + logger.error(f"Failed to delete channel {ch.get('name')}: {e}") return deleted async def reset_channel_permissions(self, progress_callback=None) -> int: @@ -576,12 +586,14 @@ class FluxerWriter: ) ) except Exception as e: + print(f"Failed to delete overwrite {ow['id']} for channel {ch['id']}: {e}") logger.error(f"Failed to delete overwrite {ow['id']} for channel {ch['id']}: {e}") processed += 1 if progress_callback: await progress_callback(ch.get("name", "Unknown"), processed, total) except Exception as e: print(f"Failed to reset permissions for channel {ch.get('name')}: {e}") + logger.error(f"Failed to reset permissions for channel {ch.get('name')}: {e}") return processed async def set_channel_permission(self, channel_id: str, overwrite_id: str, allow: int, deny: int, is_role: bool = True): @@ -603,6 +615,7 @@ class FluxerWriter: type=0 if is_role else 1 ) except Exception as e: + print(f"Failed to set permission on channel {channel_id} for overwrite {overwrite_id}: {e}") logger.error(f"Failed to set permission on channel {channel_id} for overwrite {overwrite_id}: {e}") @@ -644,6 +657,7 @@ class FluxerWriter: await progress_callback(role.get("name", "Unknown"), deleted, total) except Exception as e: print(f"Failed to delete role {role.get('name')}: {e}") + logger.error(f"Failed to delete role {role.get('name')}: {e}") return deleted async def delete_all_emojis_and_stickers(self, progress_callback=None) -> dict: @@ -667,8 +681,10 @@ class FluxerWriter: await progress_callback(emoji.get("name", "Unknown"), "Emoji", emoji_deleted, emoji_total) except Exception as e: print(f"Failed to delete emoji {emoji.get('name')}: {e}") + logger.error(f"Failed to delete emoji {emoji.get('name')}: {e}") except Exception as e: print(f"Failed to fetch emojis: {e}") + logger.error(f"Failed to fetch emojis: {e}") # Delete stickers try: @@ -682,8 +698,10 @@ class FluxerWriter: await progress_callback(sticker.get("name", "Unknown"), "Sticker", sticker_deleted, sticker_total) except Exception as e: print(f"Failed to delete sticker {sticker.get('name')}: {e}") + logger.error(f"Failed to delete sticker {sticker.get('name')}: {e}") except Exception as e: print(f"Failed to fetch stickers: {e}") + logger.error(f"Failed to fetch stickers: {e}") return {"emojis": emoji_deleted, "stickers": sticker_deleted} diff --git a/src/stoat/writer.py b/src/stoat/writer.py index 77ed74e..d3a8c2c 100644 --- a/src/stoat/writer.py +++ b/src/stoat/writer.py @@ -58,6 +58,7 @@ class StoatWriter: else: raise asyncio.TimeoutError("Timed out waiting for Stoat to be ready") except Exception as e: + print(f"Failed to fetch Stoat servers: {e}") logger.error(f"Failed to fetch Stoat servers: {e}") raise finally: @@ -88,6 +89,7 @@ class StoatWriter: try: self._me = await self.client.fetch_user("@me") except Exception as e: + print(f"Failed to fetch bot user in StoatWriter: {e}") logger.error(f"Failed to fetch bot user in StoatWriter: {e}") self.client = None # Reset if we can't even fetch @me @@ -235,6 +237,7 @@ class StoatWriter: return results except Exception as e: + print(f"Failed to fetch Stoat channels: {e}") logger.error(f"Failed to fetch Stoat channels: {e}") return [] @@ -272,6 +275,7 @@ class StoatWriter: self._server = None # Clear cache return str(ch.id) except Exception as e: + print(f"Failed to create Stoat channel {name}: {e}") logger.error(f"Failed to create Stoat channel {name}: {e}") return "" @@ -297,6 +301,7 @@ class StoatWriter: # clone_server.py now handles all parenting bulk logic return True except Exception as e: + print(f"Failed to modify Stoat channel {channel_id}: {e}") logger.error(f"Failed to modify Stoat channel {channel_id}: {e}") return False @@ -419,6 +424,7 @@ class StoatWriter: return str(msg.id) if msg else None raise # Re-raise MissingPermission and other errors except Exception as e: + print(f"Failed to send Stoat message to {channel_id}: {e}") logger.error(f"Failed to send Stoat message to {channel_id}: {e}") raise # Let caller handle (migration loop will stop for permission errors) @@ -442,6 +448,7 @@ class StoatWriter: ) return str(msg.id) except Exception as e: + print(f"Failed to send Stoat marker to {channel_id}: {e}") logger.error(f"Failed to send Stoat marker to {channel_id}: {e}") return None @@ -470,6 +477,7 @@ class StoatWriter: return str(role.id) except Exception as e: + print(f"Failed to create Stoat role {name}: {e}") logger.error(f"Failed to create Stoat role {name}: {e}") return "" @@ -526,6 +534,7 @@ class StoatWriter: await server.set_default_permissions(s_perms) return True except Exception as e: + print(f"Failed to update Stoat default permissions: {e}") logger.error(f"Failed to update Stoat default permissions: {e}") return False @@ -536,6 +545,7 @@ class StoatWriter: emoji = await server.create_server_emoji(name=name, image=image_bytes) return str(emoji.id) except Exception as e: + print(f"Failed to create Stoat emoji {name}: {e}") logger.error(f"Failed to create Stoat emoji {name}: {e}") return "" @@ -551,6 +561,7 @@ class StoatWriter: banner=banner if banner is not None else stoat.UNDEFINED ) except Exception as e: + print(f"Failed to update Stoat guild metadata: {e}") logger.error(f"Failed to update Stoat guild metadata: {e}") async def remove_community_logo_and_banner(self) -> dict: @@ -562,12 +573,14 @@ class StoatWriter: try: await server.edit(icon=None) except Exception as e: + print(f"Failed to remove Stoat community icon: {e}") logger.error(f"Failed to remove Stoat community icon: {e}") if has_banner: try: await server.edit(banner=None) except Exception as e: + print(f"Failed to remove Stoat community banner: {e}") logger.error(f"Failed to remove Stoat community banner: {e}") return { @@ -594,6 +607,7 @@ class StoatWriter: if progress_callback: await progress_callback(name, i, total) except Exception as e: + print(f"Failed to delete Stoat channel {ch.id}: {e}") logger.error(f"Failed to delete Stoat channel {ch.id}: {e}") # To delete categories, we can wipe the categories array via server.edit to avoid 404 endpoint @@ -618,6 +632,7 @@ class StoatWriter: await progress_callback(name, j, total) j += 1 except Exception as e: + print(f"Failed to wipe Stoat categories via edit: {e}") logger.error(f"Failed to wipe Stoat categories via edit: {e}") return count @@ -634,17 +649,23 @@ class StoatWriter: logger.info(f"Danger Zone: Skipping permission reset for audit channel {name}") total -= 1 continue - # In Stoat, clearing overrides might involve setting them to default or explicitly removing the role_permissions/default_permissions - # Since we don't know an explicit "clear_overrides" method, we'll wipe them by setting empty/none if possible. - # Actually Stoat allows overwriting. Setting allow=0 deny=0 for role overrides isn't explicitly clear. - # For safety, we will just pass. If the user expects it, we'd iterate over roles and set empty. - # A quick way is to edit the channel permissions to empty state if possible. - # Let's count them anyway. - # (Fluxer writer does a loop over existing overrides, we can just return 0 for now until we inspect Stoat `PermissionOverride` deletion) + + # Fetch fresh channel to get current role_permissions + fresh_ch = await self.client.fetch_channel(ch.id) + # Clear default permissions + if hasattr(fresh_ch, "default_permissions") and fresh_ch.default_permissions is not None: + await fresh_ch.set_default_permissions(None) + + # Clear all role overrides + if hasattr(fresh_ch, "role_permissions"): + for role_id in list(fresh_ch.role_permissions.keys()): + await fresh_ch.set_role_permissions(str(role_id), allow=stoat.Permissions.none(), deny=stoat.Permissions.none()) + count += 1 if progress_callback: await progress_callback(name, i, total) except Exception as e: + print(f"Failed to reset Stoat channel permissions for {ch.id}: {e}") logger.error(f"Failed to reset Stoat channel permissions for {ch.id}: {e}") return count @@ -671,6 +692,7 @@ class StoatWriter: if "MissingPermission" in err_msg and "ViewChannel" in err_msg: logger.error(f"Stoat LOCKOUT: Bot lacks 'ViewChannel' to edit {channel_id}. " "Ensure the bot has 'Manage Server' or a role with 'Allow View Channel' rank higher than @everyone.") + print(f"Failed to set Stoat channel permission for {overwrite_id} on {channel_id}: {e}") logger.error(f"Failed to set Stoat channel permission for {overwrite_id} on {channel_id}: {e}") @@ -712,6 +734,7 @@ class StoatWriter: await emoji.delete() count += 1 except Exception as e: + print(f"Failed to delete Stoat emoji {emoji.name}: {e}") logger.error(f"Failed to delete Stoat emoji {emoji.name}: {e}") return {"emojis": count, "stickers": 0} From 0cb678b848c1204b5fd16811b67f2ab98806cbcb Mon Sep 17 00:00:00 2001 From: rambros3d Date: Mon, 30 Mar 2026 01:38:12 +0530 Subject: [PATCH 3/8] fix name list errors --- src/core/database.py | 2 +- src/core/state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/database.py b/src/core/database.py index 47578ea..94ffbc0 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -4,7 +4,7 @@ import logging import json import random from pathlib import Path -from typing import Optional, Dict, Any, Union +from typing import Optional, Dict, Any, List, Union import threading import sys from src.core.utils import parse_snowflake diff --git a/src/core/state.py b/src/core/state.py index eb816c5..7388c40 100644 --- a/src/core/state.py +++ b/src/core/state.py @@ -232,7 +232,7 @@ class MigrationState: return None - def get_global_min_last_message_id(self, all_mapped_ids: List[str]) -> int | None: + def get_global_min_last_message_id(self, all_mapped_ids: list[str]) -> int | None: """Returns the absolute minimum last_msg_id among the given list of mapped target IDs (channels and threads).""" if self._ensure_db(): return self.db.get_global_min_last_message_id(all_mapped_ids) From 73d52d2183485d263fad4cd72c6cb91b4df67196 Mon Sep 17 00:00:00 2001 From: rambros3d Date: Mon, 30 Mar 2026 01:40:05 +0530 Subject: [PATCH 4/8] improve thread safety for backup db --- src/core/backup_database.py | 56 ++++++++++++++++++++++++++++++++++++- src/core/backup_reader.py | 38 +++++++++++-------------- 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/src/core/backup_database.py b/src/core/backup_database.py index 40e0011..381cdb7 100644 --- a/src/core/backup_database.py +++ b/src/core/backup_database.py @@ -114,7 +114,7 @@ class BackupDatabase: elif table == "forum_tags": conn.execute("CREATE TABLE forum_tags (id INTEGER PRIMARY KEY, forum_id INTEGER, name TEXT, moderated INTEGER, emoji_id INTEGER, emoji_name TEXT)") elif table == "server_assets": - conn.execute("CREATE TABLE server_assets (id INTEGER PRIMARY KEY, name TEXT, type TEXT, filename TEXT, url TEXT, content_type INTEGER)") + conn.execute("CREATE TABLE server_assets (id INTEGER PRIMARY KEY, name TEXT, type TEXT, filename TEXT, url TEXT, content_type TEXT)") old_cols = [c[1] for c in conn.execute(f"PRAGMA table_info({table}_old)").fetchall()] new_cols = [c[1] for c in conn.execute(f"PRAGMA table_info({table})").fetchall()] @@ -945,6 +945,60 @@ class BackupDatabase: return purged_count + def get_backed_up_channel_ids(self) -> List[int]: + """Returns a list of distinct channel IDs that have messages in the database.""" + with self._lock: + rows = self._conn.execute("SELECT DISTINCT channel_id FROM messages").fetchall() + return [parse_snowflake(r[0]) for r in rows if parse_snowflake(r[0])] + + def get_message_with_relations(self, message_id) -> Optional[Dict[str, Any]]: + """Fetches a single message with its attachments, embeds, reactions, and stickers.""" + with self._lock: + mid = parse_snowflake(message_id) + row = self._conn.execute("SELECT * FROM messages WHERE id = ?", (mid,)).fetchone() + if not row: + return None + data = dict(row) + + # Attachments + atts = self._conn.execute("SELECT * FROM attachments WHERE message_id = ?", (mid,)).fetchall() + data["attachments"] = [dict(a) for a in atts] + + # Embeds + embs = self._conn.execute("SELECT * FROM embeds WHERE message_id = ?", (mid,)).fetchall() + data["embeds"] = [] + for er in embs: + e_dict = { + "title": er["title"], + "description": er["description"], + "url": er["url"], + "color": er["color"], + "timestamp": er["timestamp"], + "thumbnail": {"url": er["thumbnail_url"]} if er["thumbnail_url"] else None, + "image": {"url": er["image_url"]} if er["image_url"] else None, + "author": { + "name": er["author_name"], + "url": er["author_url"], + "icon_url": er["author_icon_url"] + } if er["author_name"] else None, + "footer": { + "text": er["footer_text"], + "icon_url": er["footer_icon_url"] + } if er["footer_text"] else None, + "fields": json.loads(er["fields"]) if er["fields"] else [] + } + data["embeds"].append(e_dict) + + # Reactions + reas = self._conn.execute("SELECT * FROM reactions WHERE message_id = ?", (mid,)).fetchall() + data["reactions"] = [dict(r) for r in reas] + + # Stickers + sts = self._conn.execute("SELECT * FROM message_stickers WHERE message_id = ?", (mid,)).fetchall() + data["stickers"] = [dict(s) for s in sts] + + return data + def close(self): """Commits any pending writes and closes the connection.""" with self._lock: diff --git a/src/core/backup_reader.py b/src/core/backup_reader.py index c9d250d..36369c8 100644 --- a/src/core/backup_reader.py +++ b/src/core/backup_reader.py @@ -393,6 +393,20 @@ class BackupMember: # Fallback for unexpected data format self.id = 0 self.name = "Unknown" + self.display_name = "Unknown" + self.global_name = "Unknown" + self.bot = False + self.system = False + self.discriminator = "0000" + self.color = BackupColor(0) + self.roles = sorted(role_objects or [], key=lambda r: r.position, reverse=True) + self.guild_permissions = BackupPermissions(0) + self.created_at = datetime.now(timezone.utc) + self.joined_at = datetime.now(timezone.utc) + self.status = type("Status", (), {"value": "offline"})() + self.activity = None + self._avatar_url = None + self.avatar = BackupAsset(None) return self.id = parse_snowflake(data["id"]) self.name = data.get("username", "Unknown") @@ -1266,11 +1280,7 @@ class BackupReader: async def get_backed_up_channel_ids(self) -> List[int]: """Returns a list of channel IDs that have messages in the database.""" if not self.db: return [] - import sqlite3 - conn = sqlite3.connect(self.db.db_path) - rows = conn.execute("SELECT DISTINCT channel_id FROM messages").fetchall() - conn.close() - return [parse_snowflake(r[0]) for r in rows if parse_snowflake(r[0])] + return self.db.get_backed_up_channel_ids() async def get_channel(self, channel_id: int) -> BackupChannel | BackupThread | None: for c in self.channels: @@ -1351,23 +1361,9 @@ class BackupReader: async def get_message(self, channel_id: int, message_id: int) -> BackupMessage | None: """Fetch a specific message from SQLite.""" if not self.db: return None - import sqlite3 - conn = sqlite3.connect(self.db.db_path) - conn.row_factory = sqlite3.Row - row = conn.execute("SELECT * FROM messages WHERE id = ?", (str(message_id),)).fetchone() - if row: - data = dict(row) - # Fetch attachments - atts = conn.execute("SELECT * FROM attachments WHERE message_id = ?", (str(message_id),)).fetchall() - data["attachments"] = [dict(a) for a in atts] - - # Fetch stickers - sts = conn.execute("SELECT * FROM message_stickers WHERE message_id = ?", (str(message_id),)).fetchall() - data["stickers"] = [dict(s) for s in sts] - - conn.close() + data = self.db.get_message_with_relations(message_id) + if data: return self._hydrate_message(data) - conn.close() return None async def get_first_message(self, channel_id: int) -> BackupMessage | None: From 514a2e551c8e9a05f86941eb38d03f766af0132f Mon Sep 17 00:00:00 2001 From: rambros Date: Mon, 30 Mar 2026 02:21:11 +0530 Subject: [PATCH 5/8] fix sticker local_hash error --- src/core/backup_reader.py | 9 +++++---- src/fluxer/migrate_message.py | 2 +- src/stoat/migrate_message.py | 15 +++++++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/core/backup_reader.py b/src/core/backup_reader.py index 36369c8..aa8f66c 100644 --- a/src/core/backup_reader.py +++ b/src/core/backup_reader.py @@ -530,12 +530,13 @@ class BackupEmoji: class BackupSticker: """Minimal stand-in for discord.GuildSticker.""" - __slots__ = ("id", "name", "url", "format", "_backup_root", "_file_path") + __slots__ = ("id", "name", "url", "format", "_backup_root", "_file_path", "local_hash") def __init__(self, data: dict, backup_root: Path | None = None, media_pool: dict | None = None): if not isinstance(data, dict): self.id = 0 self.name = "Sticker" + self.local_hash = None return self.id = parse_snowflake(data.get("id") or data.get("sticker_id", 0)) or 0 self.name = data.get("name", "Sticker") @@ -550,14 +551,14 @@ class BackupSticker: self._backup_root = backup_root # 1. Check if it's a CAS-based sticker (from message_stickers table) - local_hash = data.get("local_hash") - if local_hash and backup_root: + self.local_hash = data.get("local_hash") + if self.local_hash and backup_root: ext = ".png" if self.format == StickerFormatType.lottie: ext = ".json" elif self.format == StickerFormatType.apng: ext = ".png" elif self.format == StickerFormatType.gif: ext = ".gif" - self._file_path = backup_root / "attachments" / f"{local_hash}{ext}" + self._file_path = backup_root / "attachments" / f"{self.local_hash}{ext}" # 2. Check if it's a server asset sticker (legacy or manual save) elif data.get("filename") and backup_root: self._file_path = backup_root / "server_assets" / data["filename"] diff --git a/src/fluxer/migrate_message.py b/src/fluxer/migrate_message.py index 5a2c0ea..cf141c0 100644 --- a/src/fluxer/migrate_message.py +++ b/src/fluxer/migrate_message.py @@ -842,7 +842,7 @@ async def migrate_global_messages( sticker_url = sticker.url # Check for uploaded media pool logic first - s_hash = sticker.local_hash + s_hash = getattr(sticker, "local_hash", None) sticker_file = None s_media = db_media.get(s_hash) if db_media and s_hash else None if s_media: diff --git a/src/stoat/migrate_message.py b/src/stoat/migrate_message.py index 130579f..6861876 100644 --- a/src/stoat/migrate_message.py +++ b/src/stoat/migrate_message.py @@ -826,8 +826,8 @@ async def migrate_global_messages( content = msg.content or "" for sticker in msg.stickers: - sticker_name = sticker.name - s_hash = sticker.local_hash + sticker_name = getattr(sticker, "name", "unknown") + s_hash = getattr(sticker, "local_hash", None) sticker_file = None s_media = db_media.get(s_hash) if db_media and s_hash else None if s_media: @@ -837,8 +837,15 @@ async def migrate_global_messages( content += f"\n[Sticker: {sticker_name}]" if sticker_file: - files.append(sticker_file) - file_names.append(f"sticker_{sticker_name}.png") + try: + with open(sticker_file, "rb") as f: + files.append({ + "filename": f"sticker_{sticker_name}.png", + "data": f.read(), + "content_type": "image/png" + }) + except Exception as e: + logger.error(f"Failed to read sticker file {sticker_file}: {e}") content = clean_mentions( content=content, From ef2e94547732f131c56a966b0e0ab2d3ac019d18 Mon Sep 17 00:00:00 2001 From: rambros Date: Mon, 30 Mar 2026 02:48:19 +0530 Subject: [PATCH 6/8] update ignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 308e0cc..3facd9e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ wheels/ .installed.cfg *.egg *.txt +*.exe # Virtual Environment venv/ @@ -44,7 +45,7 @@ tmp/ test_*.py test_release.zip test_release/ -DiscoReaper-* +DiscoReaper *.zip # App data files From 5b315ab2bfc08577c44f33c9f9171a2f2d0fd1e2 Mon Sep 17 00:00:00 2001 From: rambros Date: Mon, 30 Mar 2026 03:03:00 +0530 Subject: [PATCH 7/8] improve waterfall resume operation --- src/core/base.py | 27 +++++++++++++++++++++++++-- src/core/database.py | 9 +++++++++ src/core/state.py | 14 +++++--------- src/fluxer/migrate_message.py | 3 ++- src/stoat/migrate_message.py | 3 ++- src/ui/shuttle_ops.py | 15 +++++++++------ 6 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/core/base.py b/src/core/base.py index 81943f8..cbf6145 100644 --- a/src/core/base.py +++ b/src/core/base.py @@ -97,11 +97,17 @@ class MigrationContext: } # CONSISTENCY: Once target metadata is known, initialize the flat SQLite DB. - if results["target_community"] and results["target_community_name"]: + if results["target_community"]: tid = self.config.fluxer_server_id if self.target_platform == "fluxer" else self.config.stoat_server_id + + # Prefer the original discord community name for the DB file if available (e.g. from live load or backup) + db_name = results.get("discord_server_name") + if not db_name or db_name == "Not Found" or db_name == "Unknown": + db_name = results.get("target_community_name") or "Unknown" + self.ensure_state_initialized( str(tid or ""), - results["target_community_name"] + db_name ) return results @@ -120,6 +126,23 @@ class MigrationContext: return import re + import json + + # Override the target name explicitly with the original Discord source name if available. + # This fixes naming collisions and UI confusion like "Fluxer-123456.db" instead of "MyServer-123456.db" + try: + if hasattr(self.discord_reader, "guild") and getattr(self.discord_reader, "guild", None): + community_name = getattr(self.discord_reader, "guild").name + elif getattr(self, "source_mode", "live") == "backup" and hasattr(self.discord_reader, "backup_dir"): + b_dir = getattr(self.discord_reader, "backup_dir") + if b_dir and b_dir.exists(): + meta_file = b_dir / "metadata.json" + if meta_file.exists(): + data = json.loads(meta_file.read_text()) + community_name = data.get("name", community_name) + except Exception: + pass + clean_name = re.sub(r'[^\w\s-]', '', community_name).strip() clean_name = re.sub(r'[-\s]+', '_', clean_name) diff --git a/src/core/database.py b/src/core/database.py index 94ffbc0..a956224 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -560,6 +560,15 @@ class MigrationDatabase: conn.execute("DELETE FROM thread_tracking WHERE channel_id = ?", (str(channel_id),)) conn.commit() logger.info(f"Cleared all tracking and mapping data for channel: {channel_id}") + def clear_all_migration_data(self): + """Purge all mappings and tracking data for ALL channels and threads.""" + conn = self._get_conn() + conn.execute("DELETE FROM message_mappings") + conn.execute("DELETE FROM thread_mappings") + conn.execute("DELETE FROM channel_tracking") + conn.execute("DELETE FROM thread_tracking") + conn.commit() + logger.info("Cleared ALL tracking and message mapping data globally.") def close(self): if hasattr(self._local, "conn"): diff --git a/src/core/state.py b/src/core/state.py index 7388c40..997d310 100644 --- a/src/core/state.py +++ b/src/core/state.py @@ -238,16 +238,12 @@ class MigrationState: return self.db.get_global_min_last_message_id(all_mapped_ids) return None - def set_waterfall_last_id(self, last_id: str | int): - if self.db: - self.db.set_metadata("waterfall_last_id", str(last_id)) + + def clear_all_migration_data(self): + """Clears all message mapping and tracking state globally.""" + if self._ensure_db(): + self.db.clear_all_migration_data() - def get_waterfall_last_id(self) -> int | None: - if self.db: - val = self.db.get_metadata("waterfall_last_id") - return int(val) if val else None - return None - def get_all_last_message_ids(self) -> Dict[str, str]: """Returns a combined map of channel_id/thread_id -> last_msg_id.""" if self._ensure_db(): diff --git a/src/fluxer/migrate_message.py b/src/fluxer/migrate_message.py index cf141c0..c2b0aea 100644 --- a/src/fluxer/migrate_message.py +++ b/src/fluxer/migrate_message.py @@ -905,7 +905,8 @@ async def migrate_global_messages( if fluxer_msg_id: context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id) context.state.update_last_message_id(target_channel_id, msg.id) - context.state.set_waterfall_last_id(msg.id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) stats["attachments"] += len(files) if files else 0 stats["messages"] += 1 diff --git a/src/stoat/migrate_message.py b/src/stoat/migrate_message.py index 6861876..4a3d9a3 100644 --- a/src/stoat/migrate_message.py +++ b/src/stoat/migrate_message.py @@ -896,7 +896,8 @@ async def migrate_global_messages( if stoat_msg_id: context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id) context.state.update_last_message_id(target_channel_id, msg.id) - context.state.set_waterfall_last_id(msg.id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) stats["attachments"] += len(files) if files else 0 stats["messages"] += 1 diff --git a/src/ui/shuttle_ops.py b/src/ui/shuttle_ops.py index fc17583..2ca44a7 100644 --- a/src/ui/shuttle_ops.py +++ b/src/ui/shuttle_ops.py @@ -1551,10 +1551,8 @@ class OperationPane(Container): if filtered_tgt_ids: all_mapped_tgt_ids = filtered_tgt_ids - # 2.6 Resume Point: Prioritize Global waterfall tracker, fallback to channel minimums - min_last_id = self.engine.state.get_waterfall_last_id() - if min_last_id is None: - min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids) + # 2.6 Resume Point: Calculate from global channel minimums + min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids) modal.write(f"\n[bold cyan]Waterfall Migration Resume Point:[/bold cyan]") if min_last_id is not None: @@ -1566,7 +1564,7 @@ class OperationPane(Container): show_continue=min_last_id is not None, show_id=False, btn_start_label="Start From Beginning", - btn_start_tooltip="Safe, skips duplicates automatically", + btn_start_tooltip="Wipes migration progress and restarts from the beginning; may create duplicates", btn_start_variant="default" if min_last_id is not None else "primary", btn_continue_label=f"Continue from ID {min_last_id if min_last_id is not None else 0}" if min_last_id is not None else "Continue Migration", btn_continue_tooltip="Fastest" @@ -1582,7 +1580,11 @@ class OperationPane(Container): return after_id = None - if choice == "btn_continue" and min_last_id is not None: + if choice == "btn_start_first": + logger.info("Proceeding with 'Start from Beginning' (global clean sink).") + self.engine.state.clear_all_migration_data() + after_id = None + elif choice == "btn_continue" and min_last_id is not None: after_id = int(min_last_id) # Phase 3: Progress @@ -1599,6 +1601,7 @@ class OperationPane(Container): tid = self.config.fluxer_server_id self.engine.ensure_state_initialized(str(tid or ""), platform_name) + modal.show_stats() modal.write("Scanning global footprint for totals ...") stats_analysis = await migrate_mod.analyze_global_migration(self.engine, after_message_id=after_id) total_messages = stats_analysis["messages"] From 071de5296b590dee23c92ebaf9c8a32f05a95df7 Mon Sep 17 00:00:00 2001 From: rambros Date: Mon, 30 Mar 2026 12:52:50 +0530 Subject: [PATCH 8/8] unify message_migrate & waterfall --- src/fluxer/migrate_message.py | 395 +++++++++++++++++----------------- src/stoat/migrate_message.py | 389 +++++++++++++++++---------------- 2 files changed, 410 insertions(+), 374 deletions(-) diff --git a/src/fluxer/migrate_message.py b/src/fluxer/migrate_message.py index c2b0aea..fc596c7 100644 --- a/src/fluxer/migrate_message.py +++ b/src/fluxer/migrate_message.py @@ -158,7 +158,190 @@ async def get_channel_threads(reader: Any, channel_id: int) -> List[Any]: return threads + +async def _process_and_send_message( + context: MigrationContext, + msg: Any, + target_channel_id: str, + stats: Dict[str, Any], + thread_id: str | None = None, + parent_target_id: str | None = None, + thread_name: str | None = None, + processed_threads: set | None = None +) -> str | None: + """ + Internal helper to process a single Discord message (mentions, attachments, stickers) + and send it to the Fluxer platform. + """ + # 1. Formatting + content = msg.content or "" + + # Check for forwarded flag + is_forwarded = False + if hasattr(msg.flags, 'forwarded'): + is_forwarded = msg.flags.forwarded + + # Always ensure alias is created/retrieved to populate user_alias table + alias = context.state.get_user_alias(str(msg.author.id)) + anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False + + # Process Stickers + files = [] + if hasattr(msg, 'stickers') and msg.stickers: + for s in msg.stickers: + try: + sticker_data = await context.discord_reader.download_sticker(s) + if not sticker_data: continue + + format_val = getattr(s, 'format', 'png') + if hasattr(format_val, 'name'): + ext = format_val.name.lower() + elif isinstance(format_val, int): + format_map = {1: 'png', 2: 'apng', 3: 'lottie', 4: 'gif'} + ext = format_map.get(format_val, 'png') + else: + ext = str(format_val).lower() + + # Conversion logic (Simplified for unification) + if ext == 'lottie' and HAS_LOTTIE: + try: + lottie_data = json.loads(sticker_data) + def _convert_lottie(data): + anim = Animation.load(data) + buf = io.BytesIO() + export_gif(anim, buf) + buf.seek(0) + return buf + gif_buf = await asyncio.to_thread(_convert_lottie, lottie_data) + from PIL import Image + def _convert_gif_to_webp(buf): + img = Image.open(buf) + w_buf = io.BytesIO() + if getattr(img, 'n_frames', 1) > 1: + img.save(w_buf, format='WEBP', save_all=True, loop=0, quality=80) + else: + img.save(w_buf, format='WEBP', quality=80) + return w_buf.getvalue() + sticker_data = await asyncio.to_thread(_convert_gif_to_webp, gif_buf) + ext = 'webp' + except Exception: ext = 'json' + elif ext in ('apng', 'gif'): + try: + from PIL import Image + def _process_animated_sticker(data): + img = Image.open(io.BytesIO(data)) + webp_buf = io.BytesIO() + if getattr(img, 'n_frames', 1) > 1: + img.save(webp_buf, format='WEBP', save_all=True, loop=0, quality=80) + else: + img.save(webp_buf, format='WEBP', quality=80) + return webp_buf.getvalue() + sticker_data = await asyncio.to_thread(_process_animated_sticker, sticker_data) + ext = 'webp' + except Exception: pass + + files.append({"filename": f"sticker_{s.name}_{s.id}.{ext}", "data": sticker_data}) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") + + # Process Attachments + attachments_to_process = list(msg.attachments) + if is_forwarded and hasattr(msg, 'message_snapshots') and msg.message_snapshots: + snapshot = msg.message_snapshots[0] + if not content: + content = snapshot.content + attachments_to_process.extend(snapshot.attachments) + + for att in attachments_to_process: + try: + att_data = await context.discord_reader.download_attachment(att) + files.append({"filename": att.filename, "data": att_data}) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") + + # Clean Mentions + content = clean_mentions( + content=content, + guild=context.discord_reader.guild, + user_mentions=msg.mentions, + role_mentions=msg.role_mentions, + channel_mentions=msg.channel_mentions, + emoji_map=context.state.emoji_map, + channel_map=context.state.channel_map, + state=context.state, + target_server_id=context.fluxer_writer.community_id, + channel_names=context.channel_names if hasattr(context, 'channel_names') else None, + anonymize_users=anonymize_users + ) + + if not content and not files: + return None + + # Reply Resolution + reply_to_fluxer_id = None + if msg.reference and msg.reference.message_id: + reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id)) + + # Fallback author tagging for replies if mapping not found + if not reply_to_fluxer_id: + try: + source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id) + if source_ref_msg and source_ref_msg.author: + ref_name = context.state.get_user_alias(str(source_ref_msg.author.id)) if anonymize_users else source_ref_msg.author.display_name + content = f"`@{ref_name}`\n{content}" + else: + tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) + if tgt_reply: content = f"[Reply to {tgt_reply}]\n{content}" + except Exception: pass + + # Thread logic + if not reply_to_fluxer_id and parent_target_id and stats["messages"] == 0: + reply_to_fluxer_id = parent_target_id + if thread_name and stats["messages"] == 0: + content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" + + # Send Message + if anonymize_users: + author_name = alias or "Anonymized User" + author_avatar_url = None + else: + author_name = msg.author.display_name + author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None + + fluxer_msg_id = await context.fluxer_writer.send_message( + channel_id=target_channel_id, + author_name=author_name, + author_avatar_url=author_avatar_url, + content=content, + timestamp=int(msg.created_at.timestamp()), + files=files if files else None, + reply_to_message_id=reply_to_fluxer_id, + is_forwarded=is_forwarded, + embeds=msg.embeds + ) + + if fluxer_msg_id: + if thread_id: + context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), fluxer_msg_id) + context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) + context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) + context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) + else: + context.state.set_message_mapping(target_channel_id, str(msg.id), fluxer_msg_id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.update_last_message_id(target_channel_id, str(msg.id)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) + + stats["messages"] += 1 + stats["last_message_content"] = content + stats["last_message_author"] = msg.author.display_name + + return fluxer_msg_id + async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]: + """ Scans channel history to count messages, threads, and attachments. """ @@ -542,87 +725,30 @@ async def migrate_messages( logger.debug(f"Added sticker {s.name} as attachment (extension: {ext}, size: {sticker_size} bytes)") except Exception as e: logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") - - # Check for existing mapping to avoid duplicates when resuming + # Check for existing mapping to avoid duplicates when resuming if context.state.get_target_message_id(target_channel_id, str(msg.id)): continue try: - reply_to_fluxer_id = None - if msg.reference and msg.reference.message_id: - reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id)) - if reply_to_fluxer_id: - logger.debug(f"Detected reply to Discord ID {msg.reference.message_id} -> Fluxer ID {reply_to_fluxer_id}") - else: - logger.debug(f"Reply target Discord ID {msg.reference.message_id} not found in current session map.") - - # If this is the FIRST thread message and we have a parent_target_id, force it as reply to the starter - if not reply_to_fluxer_id and parent_target_id and stats["messages"] == 0: - reply_to_fluxer_id = parent_target_id - - # Prepend thread marker to the first message of the thread - if thread_name and stats["messages"] == 0: - content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None - - logger.debug(f"Fluxer: Calling send_message for Discord ID {msg.id}") - fluxer_msg_id = await context.fluxer_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - timestamp=int(msg.created_at.timestamp()), - files=files if files else None, - reply_to_message_id=reply_to_fluxer_id, - is_forwarded=is_forwarded, - embeds=msg.embeds + fluxer_msg_id = await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + thread_id=thread_id, + parent_target_id=parent_target_id, + thread_name=thread_name, + processed_threads=processed_threads ) - if fluxer_msg_id: - if thread_id: - context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), fluxer_msg_id) - else: - context.state.set_message_mapping(target_channel_id, str(msg.id), fluxer_msg_id) - else: - logger.warning(f"Fluxer: send_message returned None for Discord ID {msg.id} (message might have been skipped or timed out)") - - if thread_id: - context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) - context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) - context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) - else: - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.update_last_message_id(target_channel_id, str(msg.id)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = msg.author.display_name - - # Check for associated thread (Normal case: parent message is migrated) + # Check for associated thread (Individual mode recursion) if hasattr(msg, 'thread') and msg.thread: thread = msg.thread if thread.id not in processed_threads: processed_threads.add(thread.id) - # Track thread entry stats["threads"] += 1 - # Fetch last migrated message ID for this thread thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id)) - if thread_after_id: - logger.info(f"Resuming thread '{thread.name}' from after message ID: {thread_after_id}") - - # Migrate thread messages recursively thread_stats = await migrate_messages( context=context, source_channel_id=thread.id, @@ -637,22 +763,19 @@ async def migrate_messages( stats["attachments"] += thread_stats["attachments"] stats["threads"] += thread_stats["threads"] - # Send End Marker if context.is_running: await context.fluxer_writer.send_marker( channel_id=target_channel_id, content=f"> <<< END OF THREAD >>>" ) - # Update Link Tracking (but prevent threaded messages from overwriting the parent channel pointers) - # The 'after_message_id' param usually means it's the main function call and not a thread recursive call + # Update Link Tracking (Parent pointer updates) if not stats["first_message_url"]: stats["first_message_url"] = msg.jump_url stats["last_message_url"] = msg.jump_url if progress_callback: await progress_callback(stats) - logger.debug(f"Fluxer: Finished processing message Discord ID {msg.id}") except Exception as e: logger.error(f"Failed to process message {msg.id}: {e}") import traceback @@ -735,7 +858,7 @@ async def migrate_global_messages( progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None ) -> Dict[str, Any]: """ - Migrates messages across all channels chronologically. + Migrates messages across all channels chronologically to Fluxer. """ stats = { "messages": 0, @@ -750,14 +873,6 @@ async def migrate_global_messages( processed_threads = set() logger.info("Starting Global Waterfall Migration for Fluxer...") - # Keep track of active thread mapping natively to pass parent target IDs if needed - thread_to_target_channel = {} - - # Emojis and mapped users cache setup - emoji_map = context.state.emoji_map - db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {} - target_server_id = getattr(context.fluxer_writer, "server_id", None) - # Fetch global progress map to skip migrated messages efficiently progress_map = context.state.get_all_last_message_ids() @@ -794,124 +909,18 @@ async def migrate_global_messages( continue # If it's a thread message, we need to handle it based on if it's the thread starter or a reply - parent_target_id = None if hasattr(msg, 'thread') and msg.thread and msg.id == msg.thread.id: processed_threads.add(msg.thread.id) stats["threads"] += 1 - elif msg.channel.type in [11, 12]: # Thread channels - # It's a message IN a thread. - # In Fluxer, threads might just be linear messages or threaded replies depending on schema - # For basic migration we just send it to the parent mapped target channel. - # The parent mapped target channel ID should already be calculated correctly by get_target_channel_id (which returns mapped thread or parent channel) - pass - - # Formatting - files = [] - file_names = [] - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None - - for att in msg.attachments: - media_info = db_media.get(att.local_hash) if db_media else None - local_path = None - if media_info: - local_path = Path(media_info["local_path"]) - elif hasattr(att, 'read'): - # Fallback - pass - - if local_path and local_path.exists(): - files.append(local_path) - file_names.append(att.filename) - - content = msg.content or "" - - # Stickers - for sticker in msg.stickers: - sticker_name = sticker.name - sticker_url = sticker.url - - # Check for uploaded media pool logic first - s_hash = getattr(sticker, "local_hash", None) - sticker_file = None - s_media = db_media.get(s_hash) if db_media and s_hash else None - if s_media: - s_path = Path(s_media["local_path"]) - if s_path.exists(): - sticker_file = s_path - - content += f"\n[Sticker: {sticker_name}]" - if sticker_file: - files.append(sticker_file) - file_names.append(f"sticker_{sticker_name}.png") - - content = clean_mentions( - content=content, - guild=context.discord_reader.guild, - user_mentions=msg.mentions, - role_mentions=msg.role_mentions, - channel_mentions=msg.channel_mentions, - emoji_map=emoji_map, - channel_map=context.state.channel_map, - state=context.state, - target_server_id=target_server_id, - channel_names=context.channel_names if hasattr(context, 'channel_names') else None, - anonymize_users=anonymize_users - ) - - if not content and not files: - logger.debug(f"Message {msg.id} empty after processing, skipping.") - continue - - timestamp_int = int(msg.created_at.timestamp()) - - if msg.reference and msg.reference.message_id: - # Resolve the author of the message being replied to - source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id) - if source_ref_msg and source_ref_msg.author: - ref_author_id = str(source_ref_msg.author.id) - if anonymize_users: - ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User" - else: - ref_name = source_ref_msg.author.display_name - content = f"`@{ref_name}`\n{content}" - else: - # Fallback if author cannot be resolved (e.g. deleted/missing from backup) - tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) - if tgt_reply: - content = f"[Reply to {tgt_reply}]\n{content}" try: - fluxer_msg_id = await context.fluxer_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - files=files, - timestamp=timestamp_int, - embeds=msg.embeds + await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + processed_threads=processed_threads ) - - if fluxer_msg_id: - context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id) - context.state.update_last_message_id(target_channel_id, msg.id) - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - stats["attachments"] += len(files) if files else 0 - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = author_name if not stats["first_message_url"]: stats["first_message_url"] = msg.jump_url diff --git a/src/stoat/migrate_message.py b/src/stoat/migrate_message.py index 4a3d9a3..19af6bb 100644 --- a/src/stoat/migrate_message.py +++ b/src/stoat/migrate_message.py @@ -155,7 +155,192 @@ async def get_channel_threads(reader: Any, channel_id: int) -> List[Any]: return threads +async def _process_and_send_message( + context: MigrationContext, + msg: Any, + target_channel_id: str, + stats: Dict[str, Any], + thread_id: str | None = None, + parent_target_id: str | None = None, + thread_name: str | None = None, + processed_threads: set | None = None +) -> str | None: + """ + Internal helper to process a single Discord message (mentions, attachments, stickers) + and send it to the Stoat platform. + """ + # 1. Processing Flags + is_forwarded = False + if hasattr(msg.flags, 'forwarded'): + is_forwarded = msg.flags.forwarded + + # 2. Content & Formatting + content = msg.content or "" + anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False + alias = context.state.get_user_alias(str(msg.author.id)) + + # Process Stickers + files = [] + if hasattr(msg, 'stickers') and msg.stickers: + for s in msg.stickers: + try: + sticker_data = await context.discord_reader.download_sticker(s) + if not sticker_data: continue + + format_val = getattr(s, 'format', 'png') + if hasattr(format_val, 'name'): + ext = format_val.name.lower() + elif isinstance(format_val, int): + format_map = {1: 'png', 2: 'apng', 3: 'lottie', 4: 'gif'} + ext = format_map.get(format_val, 'png') + else: + ext = str(format_val).lower() + + # Conversion logic for Stoat (WebP or GIF focus) + if ext == 'lottie' and HAS_LOTTIE: + try: + lottie_data = json.loads(sticker_data) + def _convert_lottie_to_gif(data): + animation = Animation.load(data) + output = io.BytesIO() + export_gif(animation, output) + return output.getvalue() + sticker_data = await asyncio.to_thread(_convert_lottie_to_gif, lottie_data) + ext = 'gif' + except Exception: ext = 'json' + elif ext == 'apng': + try: + from PIL import Image + def _convert_apng_to_gif(data): + img = Image.open(io.BytesIO(data)) + gif_buf = io.BytesIO() + if getattr(img, 'n_frames', 1) > 1: + frames = [] + durations = [] + for i in range(img.n_frames): + img.seek(i) + frame = img.convert('RGBA') + current_frame = Image.new('RGBA', img.size, (0,0,0,0)) + current_frame.paste(frame, (0, 0)) + frames.append(current_frame) + durations.append(img.info.get('duration', 100)) + frames[0].save(gif_buf, format='GIF', save_all=True, append_images=frames[1:], loop=0, duration=durations, disposal=2, transparency=0) + else: img.save(gif_buf, format='GIF') + return gif_buf.getvalue() + sticker_data = await asyncio.to_thread(_convert_apng_to_gif, sticker_data) + ext = 'gif' + except Exception: pass + + files.append({ + "filename": f"sticker_{s.name}_{s.id}.{ext}", + "data": sticker_data, + "content_type": f"image/{ext}" if ext != "json" else "application/json" + }) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") + + # Process Attachments + attachments_to_process = list(msg.attachments) + if is_forwarded and hasattr(msg, 'message_snapshots') and msg.message_snapshots: + snapshot = msg.message_snapshots[0] + if not content: content = snapshot.content + attachments_to_process.extend(snapshot.attachments) + + for att in attachments_to_process: + try: + att_data = await context.discord_reader.download_attachment(att) + files.append({ + "filename": att.filename, + "data": att_data, + "content_type": getattr(att, "content_type", None) + }) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") + + # Clean Mentions + content = clean_mentions( + content=content, + guild=context.discord_reader.guild, + user_mentions=msg.mentions, + role_mentions=msg.role_mentions, + channel_mentions=msg.channel_mentions, + emoji_map=context.state.emoji_map, + channel_map=context.state.channel_map, + state=context.state, + target_server_id=context.stoat_writer.community_id, + channel_names=context.channel_names if hasattr(context, 'channel_names') else None, + anonymize_users=anonymize_users + ) + + if not content and not files: + return None + + # Reply Resolution + reply_to_stoat_id = None + if msg.reference and msg.reference.message_id: + reply_to_stoat_id = context.state.get_target_message_id(target_channel_id, str(msg.reference.message_id)) + if not reply_to_stoat_id: + # Fallback author tagging + try: + source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id) + if source_ref_msg and source_ref_msg.author: + ref_name = context.state.get_user_alias(str(source_ref_msg.author.id)) if anonymize_users else source_ref_msg.author.display_name + content = f"`@{ref_name}`\n{content}" + else: + tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) + if tgt_reply: content = f"[Reply to {tgt_reply}]\n{content}" + except Exception: pass + + # Thread logic + if not reply_to_stoat_id and parent_target_id and stats["messages"] == 0: + reply_to_stoat_id = parent_target_id + if thread_name and stats["messages"] == 0: + content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" + + # Author resolution + if anonymize_users: + author_name = alias or "Anonymized User" + author_avatar_url = None + else: + author_name = msg.author.display_name + author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None + if author_avatar_url and not author_avatar_url.startswith("http"): author_avatar_url = None + + # Send Message + stoat_msg_id = await context.stoat_writer.send_message( + channel_id=target_channel_id, + author_name=author_name, + author_avatar_url=author_avatar_url, + content=content, + timestamp=int(msg.created_at.timestamp()), + files=files if files else None, + reply_to_message_id=reply_to_stoat_id, + is_forwarded=is_forwarded, + embeds=msg.embeds + ) + + if stoat_msg_id: + if thread_id: + context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), stoat_msg_id) + context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) + context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) + context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) + else: + context.state.set_message_mapping(target_channel_id, str(msg.id), stoat_msg_id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.update_last_message_id(target_channel_id, str(msg.id)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) + + stats["messages"] += 1 + stats["last_message_content"] = content + stats["last_message_author"] = msg.author.display_name + + return stoat_msg_id + async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]: + """ Scans channel history to count messages, threads, and attachments. """ @@ -546,87 +731,30 @@ async def migrate_messages( except Exception as e: logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") + # Check for existing mapping to avoid duplicates when resuming + if context.state.get_target_message_id(target_channel_id, str(msg.id)): + continue + try: - # Check for existing mapping to avoid duplicates when resuming - if context.state.get_target_message_id(target_channel_id, str(msg.id)): - continue - - # Check if this message is a reply - reply_to_stoat_id = None - if msg.reference and msg.reference.message_id: - reply_to_stoat_id = context.state.get_target_message_id(target_channel_id, str(msg.reference.message_id)) - if reply_to_stoat_id: - logger.debug(f"Detected reply to Discord ID {msg.reference.message_id} -> Stoat ID {reply_to_stoat_id}") - else: - logger.debug(f"Reply target Discord ID {msg.reference.message_id} not found in current session map.") - - # If this is the FIRST thread message and we have a parent_target_id, force it as reply to the starter - if not reply_to_stoat_id and parent_target_id and stats["messages"] == 0: - reply_to_stoat_id = parent_target_id - - # Prepend thread marker to the first message of the thread - if thread_name and stats["messages"] == 0: - content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None - if author_avatar_url and not author_avatar_url.startswith("http"): - author_avatar_url = None - - logger.debug(f"Stoat: Calling send_message for Discord ID {msg.id}") - stoat_msg_id = await context.stoat_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - timestamp=int(msg.created_at.timestamp()), - files=files if files else None, - reply_to_message_id=reply_to_stoat_id, - is_forwarded=is_forwarded, - embeds=msg.embeds + stoat_msg_id = await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + thread_id=thread_id, + parent_target_id=parent_target_id, + thread_name=thread_name, + processed_threads=processed_threads ) - - if stoat_msg_id: - if thread_id: - context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), stoat_msg_id) - else: - context.state.set_message_mapping(target_channel_id, str(msg.id), stoat_msg_id) - if thread_id: - context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) - context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) - context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) - else: - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.update_last_message_id(target_channel_id, str(msg.id)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = msg.author.display_name - - # Check for associated thread (Normal case: parent message is migrated) + # Check for associated thread (Individual mode recursion) if hasattr(msg, 'thread') and msg.thread: thread = msg.thread if thread.id not in processed_threads: processed_threads.add(thread.id) - # Track thread entry stats["threads"] += 1 - # Fetch last migrated message ID for this thread thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id)) - if thread_after_id: - logger.info(f"Resuming thread '{thread.name}' from after message ID: {thread_after_id}") - - # Migrate thread messages recursively thread_stats = await migrate_messages( context=context, source_channel_id=thread.id, @@ -641,7 +769,6 @@ async def migrate_messages( stats["attachments"] += thread_stats["attachments"] stats["threads"] += thread_stats["threads"] - # Send End Marker if context.is_running: await context.stoat_writer.send_marker( channel_id=target_channel_id, @@ -656,12 +783,11 @@ async def migrate_messages( if progress_callback: await progress_callback(stats) except Exception as e: - # If it's a permission error, stop the entire migration - if "MissingPermission" in str(e): - raise + if "MissingPermission" in str(e): raise logger.error(f"Failed to process message {msg.id}: {e}") import traceback logger.error(traceback.format_exc()) + # Mark thread as completed if we finished the loop without being interrupted if thread_id and context.is_running: @@ -795,114 +921,14 @@ async def migrate_global_messages( elif msg.channel.type in [11, 12]: pass - # Formatting - files = [] - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None - - for att in msg.attachments: - media_info = db_media.get(att.local_hash) if db_media else None - local_path = None - if media_info: - local_path = Path(media_info["local_path"]) - - if local_path and local_path.exists(): - try: - with open(local_path, "rb") as f: - files.append({"filename": att.filename, "data": f.read()}) - except Exception as fe: - logger.error(f"Failed to read file {local_path}: {fe}") - - content = msg.content or "" - - for sticker in msg.stickers: - sticker_name = getattr(sticker, "name", "unknown") - s_hash = getattr(sticker, "local_hash", None) - sticker_file = None - s_media = db_media.get(s_hash) if db_media and s_hash else None - if s_media: - s_path = Path(s_media["local_path"]) - if s_path.exists(): - sticker_file = s_path - - content += f"\n[Sticker: {sticker_name}]" - if sticker_file: - try: - with open(sticker_file, "rb") as f: - files.append({ - "filename": f"sticker_{sticker_name}.png", - "data": f.read(), - "content_type": "image/png" - }) - except Exception as e: - logger.error(f"Failed to read sticker file {sticker_file}: {e}") - - content = clean_mentions( - content=content, - guild=context.discord_reader.guild, - user_mentions=msg.mentions, - role_mentions=msg.role_mentions, - channel_mentions=msg.channel_mentions, - emoji_map=emoji_map, - channel_map=context.state.channel_map, - state=context.state, - target_server_id=target_server_id, - channel_names=context.channel_names if hasattr(context, 'channel_names') else None, - anonymize_users=anonymize_users - ) - - if not content and not files: - logger.debug(f"Message {msg.id} empty after processing, skipping.") - continue - - timestamp_int = int(msg.created_at.timestamp()) - - if msg.reference and msg.reference.message_id: - # Resolve the author of the message being replied to - source_ref_msg = await context.discord_reader.get_message(msg.channel_id, msg.reference.message_id) - if source_ref_msg and source_ref_msg.author: - ref_author_id = str(source_ref_msg.author.id) - if anonymize_users: - ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User" - else: - ref_name = source_ref_msg.author.display_name - content = f"`@{ref_name}`\n{content}" - else: - tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) - if tgt_reply: - content = f"[Reply to {tgt_reply}]\n{content}" - try: - stoat_msg_id = await context.stoat_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - files=files, - timestamp=timestamp_int, - embeds=msg.embeds + await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + processed_threads=processed_threads ) - - if stoat_msg_id: - context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id) - context.state.update_last_message_id(target_channel_id, msg.id) - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - stats["attachments"] += len(files) if files else 0 - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = author_name if not stats["first_message_url"]: stats["first_message_url"] = msg.jump_url @@ -913,6 +939,7 @@ async def migrate_global_messages( except Exception as e: logger.error(f"Failed to process global message {msg.id}: {e}") + except (KeyboardInterrupt, asyncio.CancelledError): context.is_running = False