diff --git a/src/fluxer/migrate_message.py b/src/fluxer/migrate_message.py index 7c00d20..37d5c06 100644 --- a/src/fluxer/migrate_message.py +++ b/src/fluxer/migrate_message.py @@ -69,111 +69,115 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta "first_message_url": None, "last_message_url": None } - async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): - if not context.is_running: - break + try: + async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): + if not context.is_running: + break + + # Process attachments + files = [] + attachments_to_process = list(msg.attachments) - # Process attachments - files = [] - attachments_to_process = list(msg.attachments) - - # Check if this message is forwarded - # Discord flags: forwarded (is bit 28 / 0x10000000) - is_forwarded = False - if hasattr(msg.flags, 'forwarded'): - is_forwarded = msg.flags.forwarded - - # If forwarded, the content and attachments might be in message_snapshots (discord.py 2.5+) - content = msg.clean_content - if is_forwarded: - logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}") - if hasattr(msg, 'message_snapshots') and msg.message_snapshots: - # For now we handle the first snapshot - snapshot = msg.message_snapshots[0] - if not content: - content = snapshot.content - if hasattr(msg, 'guild') and msg.guild: - content = clean_mentions(content, msg.guild) - # Add snapshot attachments to the list to process - attachments_to_process.extend(snapshot.attachments) - logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments") + # Check if this message is forwarded + # Discord flags: forwarded (is bit 28 / 0x10000000) + is_forwarded = False + if hasattr(msg.flags, 'forwarded'): + is_forwarded = msg.flags.forwarded + + # If forwarded, the content and attachments might be in message_snapshots (discord.py 2.5+) + content = msg.clean_content + if is_forwarded: + logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}") + if hasattr(msg, 'message_snapshots') and msg.message_snapshots: + # For now we handle the first snapshot + snapshot = msg.message_snapshots[0] + if not content: + content = snapshot.content + if hasattr(msg, 'guild') and msg.guild: + content = clean_mentions(content, msg.guild) + # Add snapshot attachments to the list to process + attachments_to_process.extend(snapshot.attachments) + logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments") - for att in attachments_to_process: + for att in attachments_to_process: + try: + att_data = await context.discord_reader.download_attachment(att) + files.append({"filename": att.filename, "data": att_data}) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") + try: - att_data = await context.discord_reader.download_attachment(att) - files.append({"filename": att.filename, "data": att_data}) - stats["attachments"] += 1 + # Check if this message is a reply + reply_to_fluxer_id = None + if msg.reference and msg.reference.message_id: + reply_to_fluxer_id = context.state.get_fluxer_message_id(str(msg.reference.message_id)) + + fluxer_msg_id = await context.fluxer_writer.send_message( + channel_id=target_channel_id, + author_name=msg.author.display_name, + author_avatar_url=str(msg.author.display_avatar.url), + content=content, + timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), + files=files if files else None, + reply_to_message_id=reply_to_fluxer_id, + is_forwarded=is_forwarded + ) + + if fluxer_msg_id: + context.state.set_message_mapping(str(msg.id), fluxer_msg_id) + + # Check for associated thread + if hasattr(msg, 'thread') and msg.thread: + thread = msg.thread + logger.info(f"Detected thread '{thread.name}' on message {msg.id}") + + # Send Start Marker + stats["threads"] += 1 + await context.fluxer_writer.send_marker( + channel_id=target_channel_id, + content=f"> <<< THREAD: **{thread.name}** >>>" + ) + + # Migrate thread messages + # We don't pass a progress callback here to avoid confusing the UI + # but we do want to track count if possible. + thread_stats = await migrate_messages( + context=context, + source_channel_id=thread.id, + target_channel_id=target_channel_id + ) + stats["messages"] += thread_stats["messages"] + stats["attachments"] += thread_stats["attachments"] + stats["threads"] += thread_stats["threads"] + + # Send End Marker + await context.fluxer_writer.send_marker( + channel_id=target_channel_id, + content=f"> <<< END OF THREAD >>>" + ) + + context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) + context.state.update_last_message_id(str(source_channel_id), str(msg.id)) + stats["messages"] += 1 + + # Update Link Tracking (but prevent threaded messages from overwriting the parent channel pointers) + # The 'after_message_id' param usually means it's the main function call and not a thread recursive call + if not stats["first_message_url"]: + stats["first_message_url"] = msg.jump_url + stats["last_message_url"] = msg.jump_url + + if progress_callback: + await progress_callback(stats["messages"]) except Exception as e: - logger.error(f"Failed to download attachment {att.filename}: {e}") + logger.error(f"Failed to process message {msg.id}: {e}") + import traceback + logger.error(traceback.format_exc()) - try: - # Check if this message is a reply - reply_to_fluxer_id = None - if msg.reference and msg.reference.message_id: - reply_to_fluxer_id = context.state.get_fluxer_message_id(str(msg.reference.message_id)) - - fluxer_msg_id = await context.fluxer_writer.send_message( - channel_id=target_channel_id, - author_name=msg.author.display_name, - author_avatar_url=str(msg.author.display_avatar.url), - content=content, - timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), - files=files if files else None, - reply_to_message_id=reply_to_fluxer_id, - is_forwarded=is_forwarded - ) - - if fluxer_msg_id: - context.state.set_message_mapping(str(msg.id), fluxer_msg_id) - - # Check for associated thread - if hasattr(msg, 'thread') and msg.thread: - thread = msg.thread - logger.info(f"Detected thread '{thread.name}' on message {msg.id}") - - # Send Start Marker - stats["threads"] += 1 - await context.fluxer_writer.send_marker( - channel_id=target_channel_id, - content=f"> <<< THREAD: **{thread.name}** >>>" - ) - - # Migrate thread messages - # We don't pass a progress callback here to avoid confusing the UI - # but we do want to track count if possible. - thread_stats = await migrate_messages( - context=context, - source_channel_id=thread.id, - target_channel_id=target_channel_id - ) - stats["messages"] += thread_stats["messages"] - stats["attachments"] += thread_stats["attachments"] - stats["threads"] += thread_stats["threads"] - - # Send End Marker - await context.fluxer_writer.send_marker( - channel_id=target_channel_id, - content=f"> <<< END OF THREAD >>>" - ) - - context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) - context.state.update_last_message_id(str(source_channel_id), str(msg.id)) - stats["messages"] += 1 - - # Update Link Tracking (but prevent threaded messages from overwriting the parent channel pointers) - # The 'after_message_id' param usually means it's the main function call and not a thread recursive call - if not stats["first_message_url"]: - stats["first_message_url"] = msg.jump_url - stats["last_message_url"] = msg.jump_url - - if progress_callback: - await progress_callback(stats["messages"]) - except Exception as e: - logger.error(f"Failed to process message {msg.id}: {e}") - import traceback - logger.error(traceback.format_exc()) - - # Delay for rate limit safety - await asyncio.sleep(context.config.migration.rate_limit_delay_seconds) - + # Delay for rate limit safety + await asyncio.sleep(context.config.migration.rate_limit_delay_seconds) + except (KeyboardInterrupt, asyncio.CancelledError): + context.is_running = False + pass + return stats diff --git a/src/stoat/migrate_message.py b/src/stoat/migrate_message.py index 37404ae..c2842af 100644 --- a/src/stoat/migrate_message.py +++ b/src/stoat/migrate_message.py @@ -68,108 +68,112 @@ async def migrate_messages(context: MigrationContext, source_channel_id: int, ta "first_message_url": None, "last_message_url": None } - async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): - if not context.is_running: - break + try: + async for msg in context.discord_reader.fetch_message_history(source_channel_id, after_id=after_message_id): + if not context.is_running: + break + + # Process attachments + files = [] + attachments_to_process = list(msg.attachments) - # Process attachments - files = [] - attachments_to_process = list(msg.attachments) - - # Check if this message is forwarded - is_forwarded = False - if hasattr(msg.flags, 'forwarded'): - is_forwarded = msg.flags.forwarded - - # Get clean content - content = msg.clean_content - if is_forwarded: - logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}") - if hasattr(msg, 'message_snapshots') and msg.message_snapshots: - snapshot = msg.message_snapshots[0] - if not content: - content = snapshot.content - if hasattr(msg, 'guild') and msg.guild: - content = clean_mentions(content, msg.guild) - attachments_to_process.extend(snapshot.attachments) - logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments") + # Check if this message is forwarded + is_forwarded = False + if hasattr(msg.flags, 'forwarded'): + is_forwarded = msg.flags.forwarded + + # Get clean content + content = msg.clean_content + if is_forwarded: + logger.debug(f"Detected forwarded message: ID={msg.id}, Flags={msg.flags.value}") + if hasattr(msg, 'message_snapshots') and msg.message_snapshots: + snapshot = msg.message_snapshots[0] + if not content: + content = snapshot.content + if hasattr(msg, 'guild') and msg.guild: + content = clean_mentions(content, msg.guild) + attachments_to_process.extend(snapshot.attachments) + logger.debug(f"Found forwarded snapshot content: {content[:50]}... and {len(snapshot.attachments)} attachments") - for att in attachments_to_process: + for att in attachments_to_process: + try: + att_data = await context.discord_reader.download_attachment(att) + files.append({"filename": att.filename, "data": att_data}) + stats["attachments"] += 1 + except Exception as e: + logger.error(f"Failed to download attachment {att.filename}: {e}") + try: - att_data = await context.discord_reader.download_attachment(att) - files.append({"filename": att.filename, "data": att_data}) - stats["attachments"] += 1 + # Check if this message is a reply + reply_to_stoat_id = None + if msg.reference and msg.reference.message_id: + reply_to_stoat_id = context.state.get_target_message_id(str(msg.reference.message_id)) + + stoat_msg_id = await context.stoat_writer.send_message( + channel_id=target_channel_id, + author_name=msg.author.display_name, + author_avatar_url=str(msg.author.display_avatar.url), + content=content, + timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), + files=files if files else None, + reply_to_message_id=reply_to_stoat_id, + is_forwarded=is_forwarded + ) + + if stoat_msg_id: + context.state.set_message_mapping(str(msg.id), stoat_msg_id) + + # Check for associated thread + if hasattr(msg, 'thread') and msg.thread: + thread = msg.thread + logger.info(f"Detected thread '{thread.name}' on message {msg.id}") + + # Send Start Marker + stats["threads"] += 1 + await context.stoat_writer.send_marker( + channel_id=target_channel_id, + content=f"> <<< THREAD: **{thread.name}** >>>" + ) + + # Migrate thread messages recursively + thread_stats = await migrate_messages( + context=context, + source_channel_id=thread.id, + target_channel_id=target_channel_id + ) + stats["messages"] += thread_stats["messages"] + stats["attachments"] += thread_stats["attachments"] + stats["threads"] += thread_stats["threads"] + + # Send End Marker + await context.stoat_writer.send_marker( + channel_id=target_channel_id, + content=f"> <<< END OF THREAD >>>" + ) + + context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) + context.state.update_last_message_id(str(source_channel_id), str(msg.id)) + stats["messages"] += 1 + + # Update Link Tracking + if not stats["first_message_url"]: + stats["first_message_url"] = msg.jump_url + stats["last_message_url"] = msg.jump_url + + if progress_callback: + await progress_callback(stats["messages"]) except Exception as e: - logger.error(f"Failed to download attachment {att.filename}: {e}") + # If it's a permission error, stop the entire migration + if "MissingPermission" in str(e): + raise + logger.error(f"Failed to process message {msg.id}: {e}") + import traceback + logger.error(traceback.format_exc()) - try: - # Check if this message is a reply - reply_to_stoat_id = None - if msg.reference and msg.reference.message_id: - reply_to_stoat_id = context.state.get_target_message_id(str(msg.reference.message_id)) - - stoat_msg_id = await context.stoat_writer.send_message( - channel_id=target_channel_id, - author_name=msg.author.display_name, - author_avatar_url=str(msg.author.display_avatar.url), - content=content, - timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), - files=files if files else None, - reply_to_message_id=reply_to_stoat_id, - is_forwarded=is_forwarded - ) - - if stoat_msg_id: - context.state.set_message_mapping(str(msg.id), stoat_msg_id) - - # Check for associated thread - if hasattr(msg, 'thread') and msg.thread: - thread = msg.thread - logger.info(f"Detected thread '{thread.name}' on message {msg.id}") - - # Send Start Marker - stats["threads"] += 1 - await context.stoat_writer.send_marker( - channel_id=target_channel_id, - content=f"> <<< THREAD: **{thread.name}** >>>" - ) - - # Migrate thread messages recursively - thread_stats = await migrate_messages( - context=context, - source_channel_id=thread.id, - target_channel_id=target_channel_id - ) - stats["messages"] += thread_stats["messages"] - stats["attachments"] += thread_stats["attachments"] - stats["threads"] += thread_stats["threads"] - - # Send End Marker - await context.stoat_writer.send_marker( - channel_id=target_channel_id, - content=f"> <<< END OF THREAD >>>" - ) - - context.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) - context.state.update_last_message_id(str(source_channel_id), str(msg.id)) - stats["messages"] += 1 - - # Update Link Tracking - if not stats["first_message_url"]: - stats["first_message_url"] = msg.jump_url - stats["last_message_url"] = msg.jump_url - - if progress_callback: - await progress_callback(stats["messages"]) - except Exception as e: - # If it's a permission error, stop the entire migration - if "MissingPermission" in str(e): - raise - logger.error(f"Failed to process message {msg.id}: {e}") - import traceback - logger.error(traceback.format_exc()) - - # Delay for rate limit safety - await asyncio.sleep(context.config.migration.rate_limit_delay_seconds) + # Delay for rate limit safety + await asyncio.sleep(context.config.migration.rate_limit_delay_seconds) + except (KeyboardInterrupt, asyncio.CancelledError): + context.is_running = False + pass return stats diff --git a/src/ui/app.py b/src/ui/app.py index 8fb6f33..a06bef6 100644 --- a/src/ui/app.py +++ b/src/ui/app.py @@ -1335,23 +1335,29 @@ class MigrationCLI: progress_callback=update_msg_progress ) - console.print(f"\n[bold green]Success! {result_stats['messages']} messages migrated to {target_channel.get('name')}.[/bold green]") + if not self.engine.is_running: + console.print(f"\n[bold yellow]Migration Interrupted! {result_stats['messages']} messages migrated to {target_channel.get('name')}.[/bold yellow]") + event_title = "Message History Migration Interrupted" + else: + console.print(f"\n[bold green]Success! {result_stats['messages']} messages migrated to {target_channel.get('name')}.[/bold green]") + event_title = "Message History Migrated" - lines = [f"Successfully migrated messages from Discord #{source_channel.name} to {platform_name} #{target_channel.get('name')}:"] + lines = [f"Migrated messages from Discord #{source_channel.name} to {platform_name} #{target_channel.get('name')}:"] if result_stats.get('first_message_url') or result_stats.get('last_message_url'): lines.append(f"**Message Info:**") if result_stats.get('first_message_url'): - lines.append(f"- First message: <{result_stats['first_message_url']}>") + lines.append(f"First message: <{result_stats['first_message_url']}>") if result_stats.get('last_message_url'): - lines.append(f"- Last message: <{result_stats['last_message_url']}>") + lines.append(f"Last message: <{result_stats['last_message_url']}>") + lines.append(f"**Stats:**") - lines.append(f"- {result_stats['messages']} messages") - lines.append(f"- {result_stats['attachments']} attachments") - lines.append(f"- {result_stats['threads']} threads") + lines.append(f"{result_stats['messages']} messages") + lines.append(f"{result_stats['attachments']} attachments") + lines.append(f"{result_stats['threads']} threads") audit_desc = "\n".join(lines) - await log_audit_event(self.engine, "Message History Migrated", audit_desc) + await log_audit_event(self.engine, event_title, audit_desc) except Exception as e: err_str = str(e)