From 071de5296b590dee23c92ebaf9c8a32f05a95df7 Mon Sep 17 00:00:00 2001 From: rambros Date: Mon, 30 Mar 2026 12:52:50 +0530 Subject: [PATCH] unify message_migrate & waterfall --- src/fluxer/migrate_message.py | 395 +++++++++++++++++----------------- src/stoat/migrate_message.py | 389 +++++++++++++++++---------------- 2 files changed, 410 insertions(+), 374 deletions(-) diff --git a/src/fluxer/migrate_message.py b/src/fluxer/migrate_message.py index c2b0aea..fc596c7 100644 --- a/src/fluxer/migrate_message.py +++ b/src/fluxer/migrate_message.py @@ -158,7 +158,190 @@ async def get_channel_threads(reader: Any, channel_id: int) -> List[Any]: return threads + +async def _process_and_send_message( + context: MigrationContext, + msg: Any, + target_channel_id: str, + stats: Dict[str, Any], + thread_id: str | None = None, + parent_target_id: str | None = None, + thread_name: str | None = None, + processed_threads: set | None = None +) -> str | None: + """ + Internal helper to process a single Discord message (mentions, attachments, stickers) + and send it to the Fluxer platform. + """ + # 1. Formatting + content = msg.content or "" + + # Check for forwarded flag + is_forwarded = False + if hasattr(msg.flags, 'forwarded'): + is_forwarded = msg.flags.forwarded + + # Always ensure alias is created/retrieved to populate user_alias table + alias = context.state.get_user_alias(str(msg.author.id)) + anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False + + # Process Stickers + files = [] + if hasattr(msg, 'stickers') and msg.stickers: + for s in msg.stickers: + try: + sticker_data = await context.discord_reader.download_sticker(s) + if not sticker_data: continue + + format_val = getattr(s, 'format', 'png') + if hasattr(format_val, 'name'): + ext = format_val.name.lower() + elif isinstance(format_val, int): + format_map = {1: 'png', 2: 'apng', 3: 'lottie', 4: 'gif'} + ext = format_map.get(format_val, 'png') + else: + ext = str(format_val).lower() + + # Conversion logic (Simplified for unification) + if ext == 'lottie' and HAS_LOTTIE: + try: + lottie_data = json.loads(sticker_data) + def _convert_lottie(data): + anim = Animation.load(data) + buf = io.BytesIO() + export_gif(anim, buf) + buf.seek(0) + return buf + gif_buf = await asyncio.to_thread(_convert_lottie, lottie_data) + from PIL import Image + def _convert_gif_to_webp(buf): + img = Image.open(buf) + w_buf = io.BytesIO() + if getattr(img, 'n_frames', 1) > 1: + img.save(w_buf, format='WEBP', save_all=True, loop=0, quality=80) + else: + img.save(w_buf, format='WEBP', quality=80) + return w_buf.getvalue() + sticker_data = await asyncio.to_thread(_convert_gif_to_webp, gif_buf) + ext = 'webp' + except Exception: ext = 'json' + elif ext in ('apng', 'gif'): + try: + from PIL import Image + def _process_animated_sticker(data): + img = Image.open(io.BytesIO(data)) + webp_buf = io.BytesIO() + if getattr(img, 'n_frames', 1) > 1: + img.save(webp_buf, format='WEBP', save_all=True, loop=0, quality=80) + else: + img.save(webp_buf, format='WEBP', quality=80) + return webp_buf.getvalue() + sticker_data = await asyncio.to_thread(_process_animated_sticker, sticker_data) + ext = 'webp' + except Exception: pass + + files.append({"filename": f"sticker_{s.name}_{s.id}.{ext}", "data": sticker_data}) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") + + # Process Attachments + attachments_to_process = list(msg.attachments) + if is_forwarded and hasattr(msg, 'message_snapshots') and msg.message_snapshots: + snapshot = msg.message_snapshots[0] + if not content: + content = snapshot.content + attachments_to_process.extend(snapshot.attachments) + + for att in attachments_to_process: + try: + att_data = await context.discord_reader.download_attachment(att) + files.append({"filename": att.filename, "data": att_data}) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") + + # Clean Mentions + content = clean_mentions( + content=content, + guild=context.discord_reader.guild, + user_mentions=msg.mentions, + role_mentions=msg.role_mentions, + channel_mentions=msg.channel_mentions, + emoji_map=context.state.emoji_map, + channel_map=context.state.channel_map, + state=context.state, + target_server_id=context.fluxer_writer.community_id, + channel_names=context.channel_names if hasattr(context, 'channel_names') else None, + anonymize_users=anonymize_users + ) + + if not content and not files: + return None + + # Reply Resolution + reply_to_fluxer_id = None + if msg.reference and msg.reference.message_id: + reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id)) + + # Fallback author tagging for replies if mapping not found + if not reply_to_fluxer_id: + try: + source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id) + if source_ref_msg and source_ref_msg.author: + ref_name = context.state.get_user_alias(str(source_ref_msg.author.id)) if anonymize_users else source_ref_msg.author.display_name + content = f"`@{ref_name}`\n{content}" + else: + tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) + if tgt_reply: content = f"[Reply to {tgt_reply}]\n{content}" + except Exception: pass + + # Thread logic + if not reply_to_fluxer_id and parent_target_id and stats["messages"] == 0: + reply_to_fluxer_id = parent_target_id + if thread_name and stats["messages"] == 0: + content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" + + # Send Message + if anonymize_users: + author_name = alias or "Anonymized User" + author_avatar_url = None + else: + author_name = msg.author.display_name + author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None + + fluxer_msg_id = await context.fluxer_writer.send_message( + channel_id=target_channel_id, + author_name=author_name, + author_avatar_url=author_avatar_url, + content=content, + timestamp=int(msg.created_at.timestamp()), + files=files if files else None, + reply_to_message_id=reply_to_fluxer_id, + is_forwarded=is_forwarded, + embeds=msg.embeds + ) + + if fluxer_msg_id: + if thread_id: + context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), fluxer_msg_id) + context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) + context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) + context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) + else: + context.state.set_message_mapping(target_channel_id, str(msg.id), fluxer_msg_id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.update_last_message_id(target_channel_id, str(msg.id)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) + + stats["messages"] += 1 + stats["last_message_content"] = content + stats["last_message_author"] = msg.author.display_name + + return fluxer_msg_id + async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]: + """ Scans channel history to count messages, threads, and attachments. """ @@ -542,87 +725,30 @@ async def migrate_messages( logger.debug(f"Added sticker {s.name} as attachment (extension: {ext}, size: {sticker_size} bytes)") except Exception as e: logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") - - # Check for existing mapping to avoid duplicates when resuming + # Check for existing mapping to avoid duplicates when resuming if context.state.get_target_message_id(target_channel_id, str(msg.id)): continue try: - reply_to_fluxer_id = None - if msg.reference and msg.reference.message_id: - reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id)) - if reply_to_fluxer_id: - logger.debug(f"Detected reply to Discord ID {msg.reference.message_id} -> Fluxer ID {reply_to_fluxer_id}") - else: - logger.debug(f"Reply target Discord ID {msg.reference.message_id} not found in current session map.") - - # If this is the FIRST thread message and we have a parent_target_id, force it as reply to the starter - if not reply_to_fluxer_id and parent_target_id and stats["messages"] == 0: - reply_to_fluxer_id = parent_target_id - - # Prepend thread marker to the first message of the thread - if thread_name and stats["messages"] == 0: - content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None - - logger.debug(f"Fluxer: Calling send_message for Discord ID {msg.id}") - fluxer_msg_id = await context.fluxer_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - timestamp=int(msg.created_at.timestamp()), - files=files if files else None, - reply_to_message_id=reply_to_fluxer_id, - is_forwarded=is_forwarded, - embeds=msg.embeds + fluxer_msg_id = await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + thread_id=thread_id, + parent_target_id=parent_target_id, + thread_name=thread_name, + processed_threads=processed_threads ) - if fluxer_msg_id: - if thread_id: - context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), fluxer_msg_id) - else: - context.state.set_message_mapping(target_channel_id, str(msg.id), fluxer_msg_id) - else: - logger.warning(f"Fluxer: send_message returned None for Discord ID {msg.id} (message might have been skipped or timed out)") - - if thread_id: - context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) - context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) - context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) - else: - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.update_last_message_id(target_channel_id, str(msg.id)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = msg.author.display_name - - # Check for associated thread (Normal case: parent message is migrated) + # Check for associated thread (Individual mode recursion) if hasattr(msg, 'thread') and msg.thread: thread = msg.thread if thread.id not in processed_threads: processed_threads.add(thread.id) - # Track thread entry stats["threads"] += 1 - # Fetch last migrated message ID for this thread thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id)) - if thread_after_id: - logger.info(f"Resuming thread '{thread.name}' from after message ID: {thread_after_id}") - - # Migrate thread messages recursively thread_stats = await migrate_messages( context=context, source_channel_id=thread.id, @@ -637,22 +763,19 @@ async def migrate_messages( stats["attachments"] += thread_stats["attachments"] stats["threads"] += thread_stats["threads"] - # Send End Marker if context.is_running: await context.fluxer_writer.send_marker( channel_id=target_channel_id, content=f"> <<< END OF THREAD >>>" ) - # Update Link Tracking (but prevent threaded messages from overwriting the parent channel pointers) - # The 'after_message_id' param usually means it's the main function call and not a thread recursive call + # Update Link Tracking (Parent pointer updates) if not stats["first_message_url"]: stats["first_message_url"] = msg.jump_url stats["last_message_url"] = msg.jump_url if progress_callback: await progress_callback(stats) - logger.debug(f"Fluxer: Finished processing message Discord ID {msg.id}") except Exception as e: logger.error(f"Failed to process message {msg.id}: {e}") import traceback @@ -735,7 +858,7 @@ async def migrate_global_messages( progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None ) -> Dict[str, Any]: """ - Migrates messages across all channels chronologically. + Migrates messages across all channels chronologically to Fluxer. """ stats = { "messages": 0, @@ -750,14 +873,6 @@ async def migrate_global_messages( processed_threads = set() logger.info("Starting Global Waterfall Migration for Fluxer...") - # Keep track of active thread mapping natively to pass parent target IDs if needed - thread_to_target_channel = {} - - # Emojis and mapped users cache setup - emoji_map = context.state.emoji_map - db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {} - target_server_id = getattr(context.fluxer_writer, "server_id", None) - # Fetch global progress map to skip migrated messages efficiently progress_map = context.state.get_all_last_message_ids() @@ -794,124 +909,18 @@ async def migrate_global_messages( continue # If it's a thread message, we need to handle it based on if it's the thread starter or a reply - parent_target_id = None if hasattr(msg, 'thread') and msg.thread and msg.id == msg.thread.id: processed_threads.add(msg.thread.id) stats["threads"] += 1 - elif msg.channel.type in [11, 12]: # Thread channels - # It's a message IN a thread. - # In Fluxer, threads might just be linear messages or threaded replies depending on schema - # For basic migration we just send it to the parent mapped target channel. - # The parent mapped target channel ID should already be calculated correctly by get_target_channel_id (which returns mapped thread or parent channel) - pass - - # Formatting - files = [] - file_names = [] - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None - - for att in msg.attachments: - media_info = db_media.get(att.local_hash) if db_media else None - local_path = None - if media_info: - local_path = Path(media_info["local_path"]) - elif hasattr(att, 'read'): - # Fallback - pass - - if local_path and local_path.exists(): - files.append(local_path) - file_names.append(att.filename) - - content = msg.content or "" - - # Stickers - for sticker in msg.stickers: - sticker_name = sticker.name - sticker_url = sticker.url - - # Check for uploaded media pool logic first - s_hash = getattr(sticker, "local_hash", None) - sticker_file = None - s_media = db_media.get(s_hash) if db_media and s_hash else None - if s_media: - s_path = Path(s_media["local_path"]) - if s_path.exists(): - sticker_file = s_path - - content += f"\n[Sticker: {sticker_name}]" - if sticker_file: - files.append(sticker_file) - file_names.append(f"sticker_{sticker_name}.png") - - content = clean_mentions( - content=content, - guild=context.discord_reader.guild, - user_mentions=msg.mentions, - role_mentions=msg.role_mentions, - channel_mentions=msg.channel_mentions, - emoji_map=emoji_map, - channel_map=context.state.channel_map, - state=context.state, - target_server_id=target_server_id, - channel_names=context.channel_names if hasattr(context, 'channel_names') else None, - anonymize_users=anonymize_users - ) - - if not content and not files: - logger.debug(f"Message {msg.id} empty after processing, skipping.") - continue - - timestamp_int = int(msg.created_at.timestamp()) - - if msg.reference and msg.reference.message_id: - # Resolve the author of the message being replied to - source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id) - if source_ref_msg and source_ref_msg.author: - ref_author_id = str(source_ref_msg.author.id) - if anonymize_users: - ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User" - else: - ref_name = source_ref_msg.author.display_name - content = f"`@{ref_name}`\n{content}" - else: - # Fallback if author cannot be resolved (e.g. deleted/missing from backup) - tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) - if tgt_reply: - content = f"[Reply to {tgt_reply}]\n{content}" try: - fluxer_msg_id = await context.fluxer_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - files=files, - timestamp=timestamp_int, - embeds=msg.embeds + await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + processed_threads=processed_threads ) - - if fluxer_msg_id: - context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id) - context.state.update_last_message_id(target_channel_id, msg.id) - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - stats["attachments"] += len(files) if files else 0 - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = author_name if not stats["first_message_url"]: stats["first_message_url"] = msg.jump_url diff --git a/src/stoat/migrate_message.py b/src/stoat/migrate_message.py index 4a3d9a3..19af6bb 100644 --- a/src/stoat/migrate_message.py +++ b/src/stoat/migrate_message.py @@ -155,7 +155,192 @@ async def get_channel_threads(reader: Any, channel_id: int) -> List[Any]: return threads +async def _process_and_send_message( + context: MigrationContext, + msg: Any, + target_channel_id: str, + stats: Dict[str, Any], + thread_id: str | None = None, + parent_target_id: str | None = None, + thread_name: str | None = None, + processed_threads: set | None = None +) -> str | None: + """ + Internal helper to process a single Discord message (mentions, attachments, stickers) + and send it to the Stoat platform. + """ + # 1. Processing Flags + is_forwarded = False + if hasattr(msg.flags, 'forwarded'): + is_forwarded = msg.flags.forwarded + + # 2. Content & Formatting + content = msg.content or "" + anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False + alias = context.state.get_user_alias(str(msg.author.id)) + + # Process Stickers + files = [] + if hasattr(msg, 'stickers') and msg.stickers: + for s in msg.stickers: + try: + sticker_data = await context.discord_reader.download_sticker(s) + if not sticker_data: continue + + format_val = getattr(s, 'format', 'png') + if hasattr(format_val, 'name'): + ext = format_val.name.lower() + elif isinstance(format_val, int): + format_map = {1: 'png', 2: 'apng', 3: 'lottie', 4: 'gif'} + ext = format_map.get(format_val, 'png') + else: + ext = str(format_val).lower() + + # Conversion logic for Stoat (WebP or GIF focus) + if ext == 'lottie' and HAS_LOTTIE: + try: + lottie_data = json.loads(sticker_data) + def _convert_lottie_to_gif(data): + animation = Animation.load(data) + output = io.BytesIO() + export_gif(animation, output) + return output.getvalue() + sticker_data = await asyncio.to_thread(_convert_lottie_to_gif, lottie_data) + ext = 'gif' + except Exception: ext = 'json' + elif ext == 'apng': + try: + from PIL import Image + def _convert_apng_to_gif(data): + img = Image.open(io.BytesIO(data)) + gif_buf = io.BytesIO() + if getattr(img, 'n_frames', 1) > 1: + frames = [] + durations = [] + for i in range(img.n_frames): + img.seek(i) + frame = img.convert('RGBA') + current_frame = Image.new('RGBA', img.size, (0,0,0,0)) + current_frame.paste(frame, (0, 0)) + frames.append(current_frame) + durations.append(img.info.get('duration', 100)) + frames[0].save(gif_buf, format='GIF', save_all=True, append_images=frames[1:], loop=0, duration=durations, disposal=2, transparency=0) + else: img.save(gif_buf, format='GIF') + return gif_buf.getvalue() + sticker_data = await asyncio.to_thread(_convert_apng_to_gif, sticker_data) + ext = 'gif' + except Exception: pass + + files.append({ + "filename": f"sticker_{s.name}_{s.id}.{ext}", + "data": sticker_data, + "content_type": f"image/{ext}" if ext != "json" else "application/json" + }) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") + + # Process Attachments + attachments_to_process = list(msg.attachments) + if is_forwarded and hasattr(msg, 'message_snapshots') and msg.message_snapshots: + snapshot = msg.message_snapshots[0] + if not content: content = snapshot.content + attachments_to_process.extend(snapshot.attachments) + + for att in attachments_to_process: + try: + att_data = await context.discord_reader.download_attachment(att) + files.append({ + "filename": att.filename, + "data": att_data, + "content_type": getattr(att, "content_type", None) + }) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") + + # Clean Mentions + content = clean_mentions( + content=content, + guild=context.discord_reader.guild, + user_mentions=msg.mentions, + role_mentions=msg.role_mentions, + channel_mentions=msg.channel_mentions, + emoji_map=context.state.emoji_map, + channel_map=context.state.channel_map, + state=context.state, + target_server_id=context.stoat_writer.community_id, + channel_names=context.channel_names if hasattr(context, 'channel_names') else None, + anonymize_users=anonymize_users + ) + + if not content and not files: + return None + + # Reply Resolution + reply_to_stoat_id = None + if msg.reference and msg.reference.message_id: + reply_to_stoat_id = context.state.get_target_message_id(target_channel_id, str(msg.reference.message_id)) + if not reply_to_stoat_id: + # Fallback author tagging + try: + source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id) + if source_ref_msg and source_ref_msg.author: + ref_name = context.state.get_user_alias(str(source_ref_msg.author.id)) if anonymize_users else source_ref_msg.author.display_name + content = f"`@{ref_name}`\n{content}" + else: + tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) + if tgt_reply: content = f"[Reply to {tgt_reply}]\n{content}" + except Exception: pass + + # Thread logic + if not reply_to_stoat_id and parent_target_id and stats["messages"] == 0: + reply_to_stoat_id = parent_target_id + if thread_name and stats["messages"] == 0: + content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" + + # Author resolution + if anonymize_users: + author_name = alias or "Anonymized User" + author_avatar_url = None + else: + author_name = msg.author.display_name + author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None + if author_avatar_url and not author_avatar_url.startswith("http"): author_avatar_url = None + + # Send Message + stoat_msg_id = await context.stoat_writer.send_message( + channel_id=target_channel_id, + author_name=author_name, + author_avatar_url=author_avatar_url, + content=content, + timestamp=int(msg.created_at.timestamp()), + files=files if files else None, + reply_to_message_id=reply_to_stoat_id, + is_forwarded=is_forwarded, + embeds=msg.embeds + ) + + if stoat_msg_id: + if thread_id: + context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), stoat_msg_id) + context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) + context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) + context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) + else: + context.state.set_message_mapping(target_channel_id, str(msg.id), stoat_msg_id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.update_last_message_id(target_channel_id, str(msg.id)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) + + stats["messages"] += 1 + stats["last_message_content"] = content + stats["last_message_author"] = msg.author.display_name + + return stoat_msg_id + async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]: + """ Scans channel history to count messages, threads, and attachments. """ @@ -546,87 +731,30 @@ async def migrate_messages( except Exception as e: logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") + # Check for existing mapping to avoid duplicates when resuming + if context.state.get_target_message_id(target_channel_id, str(msg.id)): + continue + try: - # Check for existing mapping to avoid duplicates when resuming - if context.state.get_target_message_id(target_channel_id, str(msg.id)): - continue - - # Check if this message is a reply - reply_to_stoat_id = None - if msg.reference and msg.reference.message_id: - reply_to_stoat_id = context.state.get_target_message_id(target_channel_id, str(msg.reference.message_id)) - if reply_to_stoat_id: - logger.debug(f"Detected reply to Discord ID {msg.reference.message_id} -> Stoat ID {reply_to_stoat_id}") - else: - logger.debug(f"Reply target Discord ID {msg.reference.message_id} not found in current session map.") - - # If this is the FIRST thread message and we have a parent_target_id, force it as reply to the starter - if not reply_to_stoat_id and parent_target_id and stats["messages"] == 0: - reply_to_stoat_id = parent_target_id - - # Prepend thread marker to the first message of the thread - if thread_name and stats["messages"] == 0: - content = f"> <<< THREAD: **{thread_name}** >>>\n{content}" - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None - if author_avatar_url and not author_avatar_url.startswith("http"): - author_avatar_url = None - - logger.debug(f"Stoat: Calling send_message for Discord ID {msg.id}") - stoat_msg_id = await context.stoat_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - timestamp=int(msg.created_at.timestamp()), - files=files if files else None, - reply_to_message_id=reply_to_stoat_id, - is_forwarded=is_forwarded, - embeds=msg.embeds + stoat_msg_id = await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + thread_id=thread_id, + parent_target_id=parent_target_id, + thread_name=thread_name, + processed_threads=processed_threads ) - - if stoat_msg_id: - if thread_id: - context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), stoat_msg_id) - else: - context.state.set_message_mapping(target_channel_id, str(msg.id), stoat_msg_id) - if thread_id: - context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at)) - context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id)) - context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0) - else: - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.update_last_message_id(target_channel_id, str(msg.id)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = msg.author.display_name - - # Check for associated thread (Normal case: parent message is migrated) + # Check for associated thread (Individual mode recursion) if hasattr(msg, 'thread') and msg.thread: thread = msg.thread if thread.id not in processed_threads: processed_threads.add(thread.id) - # Track thread entry stats["threads"] += 1 - # Fetch last migrated message ID for this thread thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id)) - if thread_after_id: - logger.info(f"Resuming thread '{thread.name}' from after message ID: {thread_after_id}") - - # Migrate thread messages recursively thread_stats = await migrate_messages( context=context, source_channel_id=thread.id, @@ -641,7 +769,6 @@ async def migrate_messages( stats["attachments"] += thread_stats["attachments"] stats["threads"] += thread_stats["threads"] - # Send End Marker if context.is_running: await context.stoat_writer.send_marker( channel_id=target_channel_id, @@ -656,12 +783,11 @@ async def migrate_messages( if progress_callback: await progress_callback(stats) except Exception as e: - # If it's a permission error, stop the entire migration - if "MissingPermission" in str(e): - raise + if "MissingPermission" in str(e): raise logger.error(f"Failed to process message {msg.id}: {e}") import traceback logger.error(traceback.format_exc()) + # Mark thread as completed if we finished the loop without being interrupted if thread_id and context.is_running: @@ -795,114 +921,14 @@ async def migrate_global_messages( elif msg.channel.type in [11, 12]: pass - # Formatting - files = [] - - # Always ensure alias is created/retrieved to populate user_alias table - alias = context.state.get_user_alias(str(msg.author.id)) - - anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False - - if anonymize_users: - author_name = alias or "Anonymized User" - author_avatar_url = None - else: - author_name = msg.author.display_name - author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None - - for att in msg.attachments: - media_info = db_media.get(att.local_hash) if db_media else None - local_path = None - if media_info: - local_path = Path(media_info["local_path"]) - - if local_path and local_path.exists(): - try: - with open(local_path, "rb") as f: - files.append({"filename": att.filename, "data": f.read()}) - except Exception as fe: - logger.error(f"Failed to read file {local_path}: {fe}") - - content = msg.content or "" - - for sticker in msg.stickers: - sticker_name = getattr(sticker, "name", "unknown") - s_hash = getattr(sticker, "local_hash", None) - sticker_file = None - s_media = db_media.get(s_hash) if db_media and s_hash else None - if s_media: - s_path = Path(s_media["local_path"]) - if s_path.exists(): - sticker_file = s_path - - content += f"\n[Sticker: {sticker_name}]" - if sticker_file: - try: - with open(sticker_file, "rb") as f: - files.append({ - "filename": f"sticker_{sticker_name}.png", - "data": f.read(), - "content_type": "image/png" - }) - except Exception as e: - logger.error(f"Failed to read sticker file {sticker_file}: {e}") - - content = clean_mentions( - content=content, - guild=context.discord_reader.guild, - user_mentions=msg.mentions, - role_mentions=msg.role_mentions, - channel_mentions=msg.channel_mentions, - emoji_map=emoji_map, - channel_map=context.state.channel_map, - state=context.state, - target_server_id=target_server_id, - channel_names=context.channel_names if hasattr(context, 'channel_names') else None, - anonymize_users=anonymize_users - ) - - if not content and not files: - logger.debug(f"Message {msg.id} empty after processing, skipping.") - continue - - timestamp_int = int(msg.created_at.timestamp()) - - if msg.reference and msg.reference.message_id: - # Resolve the author of the message being replied to - source_ref_msg = await context.discord_reader.get_message(msg.channel_id, msg.reference.message_id) - if source_ref_msg and source_ref_msg.author: - ref_author_id = str(source_ref_msg.author.id) - if anonymize_users: - ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User" - else: - ref_name = source_ref_msg.author.display_name - content = f"`@{ref_name}`\n{content}" - else: - tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id) - if tgt_reply: - content = f"[Reply to {tgt_reply}]\n{content}" - try: - stoat_msg_id = await context.stoat_writer.send_message( - channel_id=target_channel_id, - author_name=author_name, - author_avatar_url=author_avatar_url, - content=content, - files=files, - timestamp=timestamp_int, - embeds=msg.embeds + await _process_and_send_message( + context=context, + msg=msg, + target_channel_id=target_channel_id, + stats=stats, + processed_threads=processed_threads ) - - if stoat_msg_id: - context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id) - context.state.update_last_message_id(target_channel_id, msg.id) - context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) - context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) - stats["attachments"] += len(files) if files else 0 - - stats["messages"] += 1 - stats["last_message_content"] = content - stats["last_message_author"] = author_name if not stats["first_message_url"]: stats["first_message_url"] = msg.jump_url @@ -913,6 +939,7 @@ async def migrate_global_messages( except Exception as e: logger.error(f"Failed to process global message {msg.id}: {e}") + except (KeyboardInterrupt, asyncio.CancelledError): context.is_running = False