unify message_migrate & waterfall
This commit is contained in:
parent
5b315ab2bf
commit
071de5296b
2 changed files with 410 additions and 374 deletions
|
|
@ -158,7 +158,190 @@ async def get_channel_threads(reader: Any, channel_id: int) -> List[Any]:
|
||||||
return threads
|
return threads
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_and_send_message(
|
||||||
|
context: MigrationContext,
|
||||||
|
msg: Any,
|
||||||
|
target_channel_id: str,
|
||||||
|
stats: Dict[str, Any],
|
||||||
|
thread_id: str | None = None,
|
||||||
|
parent_target_id: str | None = None,
|
||||||
|
thread_name: str | None = None,
|
||||||
|
processed_threads: set | None = None
|
||||||
|
) -> str | None:
|
||||||
|
"""
|
||||||
|
Internal helper to process a single Discord message (mentions, attachments, stickers)
|
||||||
|
and send it to the Fluxer platform.
|
||||||
|
"""
|
||||||
|
# 1. Formatting
|
||||||
|
content = msg.content or ""
|
||||||
|
|
||||||
|
# Check for forwarded flag
|
||||||
|
is_forwarded = False
|
||||||
|
if hasattr(msg.flags, 'forwarded'):
|
||||||
|
is_forwarded = msg.flags.forwarded
|
||||||
|
|
||||||
|
# Always ensure alias is created/retrieved to populate user_alias table
|
||||||
|
alias = context.state.get_user_alias(str(msg.author.id))
|
||||||
|
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
|
||||||
|
|
||||||
|
# Process Stickers
|
||||||
|
files = []
|
||||||
|
if hasattr(msg, 'stickers') and msg.stickers:
|
||||||
|
for s in msg.stickers:
|
||||||
|
try:
|
||||||
|
sticker_data = await context.discord_reader.download_sticker(s)
|
||||||
|
if not sticker_data: continue
|
||||||
|
|
||||||
|
format_val = getattr(s, 'format', 'png')
|
||||||
|
if hasattr(format_val, 'name'):
|
||||||
|
ext = format_val.name.lower()
|
||||||
|
elif isinstance(format_val, int):
|
||||||
|
format_map = {1: 'png', 2: 'apng', 3: 'lottie', 4: 'gif'}
|
||||||
|
ext = format_map.get(format_val, 'png')
|
||||||
|
else:
|
||||||
|
ext = str(format_val).lower()
|
||||||
|
|
||||||
|
# Conversion logic (Simplified for unification)
|
||||||
|
if ext == 'lottie' and HAS_LOTTIE:
|
||||||
|
try:
|
||||||
|
lottie_data = json.loads(sticker_data)
|
||||||
|
def _convert_lottie(data):
|
||||||
|
anim = Animation.load(data)
|
||||||
|
buf = io.BytesIO()
|
||||||
|
export_gif(anim, buf)
|
||||||
|
buf.seek(0)
|
||||||
|
return buf
|
||||||
|
gif_buf = await asyncio.to_thread(_convert_lottie, lottie_data)
|
||||||
|
from PIL import Image
|
||||||
|
def _convert_gif_to_webp(buf):
|
||||||
|
img = Image.open(buf)
|
||||||
|
w_buf = io.BytesIO()
|
||||||
|
if getattr(img, 'n_frames', 1) > 1:
|
||||||
|
img.save(w_buf, format='WEBP', save_all=True, loop=0, quality=80)
|
||||||
|
else:
|
||||||
|
img.save(w_buf, format='WEBP', quality=80)
|
||||||
|
return w_buf.getvalue()
|
||||||
|
sticker_data = await asyncio.to_thread(_convert_gif_to_webp, gif_buf)
|
||||||
|
ext = 'webp'
|
||||||
|
except Exception: ext = 'json'
|
||||||
|
elif ext in ('apng', 'gif'):
|
||||||
|
try:
|
||||||
|
from PIL import Image
|
||||||
|
def _process_animated_sticker(data):
|
||||||
|
img = Image.open(io.BytesIO(data))
|
||||||
|
webp_buf = io.BytesIO()
|
||||||
|
if getattr(img, 'n_frames', 1) > 1:
|
||||||
|
img.save(webp_buf, format='WEBP', save_all=True, loop=0, quality=80)
|
||||||
|
else:
|
||||||
|
img.save(webp_buf, format='WEBP', quality=80)
|
||||||
|
return webp_buf.getvalue()
|
||||||
|
sticker_data = await asyncio.to_thread(_process_animated_sticker, sticker_data)
|
||||||
|
ext = 'webp'
|
||||||
|
except Exception: pass
|
||||||
|
|
||||||
|
files.append({"filename": f"sticker_{s.name}_{s.id}.{ext}", "data": sticker_data})
|
||||||
|
stats["attachments"] += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
|
||||||
|
|
||||||
|
# Process Attachments
|
||||||
|
attachments_to_process = list(msg.attachments)
|
||||||
|
if is_forwarded and hasattr(msg, 'message_snapshots') and msg.message_snapshots:
|
||||||
|
snapshot = msg.message_snapshots[0]
|
||||||
|
if not content:
|
||||||
|
content = snapshot.content
|
||||||
|
attachments_to_process.extend(snapshot.attachments)
|
||||||
|
|
||||||
|
for att in attachments_to_process:
|
||||||
|
try:
|
||||||
|
att_data = await context.discord_reader.download_attachment(att)
|
||||||
|
files.append({"filename": att.filename, "data": att_data})
|
||||||
|
stats["attachments"] += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to download attachment {att.filename}: {e}")
|
||||||
|
|
||||||
|
# Clean Mentions
|
||||||
|
content = clean_mentions(
|
||||||
|
content=content,
|
||||||
|
guild=context.discord_reader.guild,
|
||||||
|
user_mentions=msg.mentions,
|
||||||
|
role_mentions=msg.role_mentions,
|
||||||
|
channel_mentions=msg.channel_mentions,
|
||||||
|
emoji_map=context.state.emoji_map,
|
||||||
|
channel_map=context.state.channel_map,
|
||||||
|
state=context.state,
|
||||||
|
target_server_id=context.fluxer_writer.community_id,
|
||||||
|
channel_names=context.channel_names if hasattr(context, 'channel_names') else None,
|
||||||
|
anonymize_users=anonymize_users
|
||||||
|
)
|
||||||
|
|
||||||
|
if not content and not files:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Reply Resolution
|
||||||
|
reply_to_fluxer_id = None
|
||||||
|
if msg.reference and msg.reference.message_id:
|
||||||
|
reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id))
|
||||||
|
|
||||||
|
# Fallback author tagging for replies if mapping not found
|
||||||
|
if not reply_to_fluxer_id:
|
||||||
|
try:
|
||||||
|
source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id)
|
||||||
|
if source_ref_msg and source_ref_msg.author:
|
||||||
|
ref_name = context.state.get_user_alias(str(source_ref_msg.author.id)) if anonymize_users else source_ref_msg.author.display_name
|
||||||
|
content = f"`@{ref_name}`\n{content}"
|
||||||
|
else:
|
||||||
|
tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id)
|
||||||
|
if tgt_reply: content = f"[Reply to {tgt_reply}]\n{content}"
|
||||||
|
except Exception: pass
|
||||||
|
|
||||||
|
# Thread logic
|
||||||
|
if not reply_to_fluxer_id and parent_target_id and stats["messages"] == 0:
|
||||||
|
reply_to_fluxer_id = parent_target_id
|
||||||
|
if thread_name and stats["messages"] == 0:
|
||||||
|
content = f"> <<< THREAD: **{thread_name}** >>>\n{content}"
|
||||||
|
|
||||||
|
# Send Message
|
||||||
|
if anonymize_users:
|
||||||
|
author_name = alias or "Anonymized User"
|
||||||
|
author_avatar_url = None
|
||||||
|
else:
|
||||||
|
author_name = msg.author.display_name
|
||||||
|
author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
|
||||||
|
|
||||||
|
fluxer_msg_id = await context.fluxer_writer.send_message(
|
||||||
|
channel_id=target_channel_id,
|
||||||
|
author_name=author_name,
|
||||||
|
author_avatar_url=author_avatar_url,
|
||||||
|
content=content,
|
||||||
|
timestamp=int(msg.created_at.timestamp()),
|
||||||
|
files=files if files else None,
|
||||||
|
reply_to_message_id=reply_to_fluxer_id,
|
||||||
|
is_forwarded=is_forwarded,
|
||||||
|
embeds=msg.embeds
|
||||||
|
)
|
||||||
|
|
||||||
|
if fluxer_msg_id:
|
||||||
|
if thread_id:
|
||||||
|
context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), fluxer_msg_id)
|
||||||
|
context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at))
|
||||||
|
context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id))
|
||||||
|
context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0)
|
||||||
|
else:
|
||||||
|
context.state.set_message_mapping(target_channel_id, str(msg.id), fluxer_msg_id)
|
||||||
|
context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at))
|
||||||
|
context.state.update_last_message_id(target_channel_id, str(msg.id))
|
||||||
|
context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0)
|
||||||
|
|
||||||
|
stats["messages"] += 1
|
||||||
|
stats["last_message_content"] = content
|
||||||
|
stats["last_message_author"] = msg.author.display_name
|
||||||
|
|
||||||
|
return fluxer_msg_id
|
||||||
|
|
||||||
async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]:
|
async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Scans channel history to count messages, threads, and attachments.
|
Scans channel history to count messages, threads, and attachments.
|
||||||
"""
|
"""
|
||||||
|
|
@ -542,87 +725,30 @@ async def migrate_messages(
|
||||||
logger.debug(f"Added sticker {s.name} as attachment (extension: {ext}, size: {sticker_size} bytes)")
|
logger.debug(f"Added sticker {s.name} as attachment (extension: {ext}, size: {sticker_size} bytes)")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
|
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
|
||||||
|
|
||||||
# Check for existing mapping to avoid duplicates when resuming
|
# Check for existing mapping to avoid duplicates when resuming
|
||||||
if context.state.get_target_message_id(target_channel_id, str(msg.id)):
|
if context.state.get_target_message_id(target_channel_id, str(msg.id)):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reply_to_fluxer_id = None
|
fluxer_msg_id = await _process_and_send_message(
|
||||||
if msg.reference and msg.reference.message_id:
|
context=context,
|
||||||
reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id))
|
msg=msg,
|
||||||
if reply_to_fluxer_id:
|
target_channel_id=target_channel_id,
|
||||||
logger.debug(f"Detected reply to Discord ID {msg.reference.message_id} -> Fluxer ID {reply_to_fluxer_id}")
|
stats=stats,
|
||||||
else:
|
thread_id=thread_id,
|
||||||
logger.debug(f"Reply target Discord ID {msg.reference.message_id} not found in current session map.")
|
parent_target_id=parent_target_id,
|
||||||
|
thread_name=thread_name,
|
||||||
# If this is the FIRST thread message and we have a parent_target_id, force it as reply to the starter
|
processed_threads=processed_threads
|
||||||
if not reply_to_fluxer_id and parent_target_id and stats["messages"] == 0:
|
|
||||||
reply_to_fluxer_id = parent_target_id
|
|
||||||
|
|
||||||
# Prepend thread marker to the first message of the thread
|
|
||||||
if thread_name and stats["messages"] == 0:
|
|
||||||
content = f"> <<< THREAD: **{thread_name}** >>>\n{content}"
|
|
||||||
|
|
||||||
# Always ensure alias is created/retrieved to populate user_alias table
|
|
||||||
alias = context.state.get_user_alias(str(msg.author.id))
|
|
||||||
|
|
||||||
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
|
|
||||||
if anonymize_users:
|
|
||||||
author_name = alias or "Anonymized User"
|
|
||||||
author_avatar_url = None
|
|
||||||
else:
|
|
||||||
author_name = msg.author.display_name
|
|
||||||
author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
|
|
||||||
|
|
||||||
logger.debug(f"Fluxer: Calling send_message for Discord ID {msg.id}")
|
|
||||||
fluxer_msg_id = await context.fluxer_writer.send_message(
|
|
||||||
channel_id=target_channel_id,
|
|
||||||
author_name=author_name,
|
|
||||||
author_avatar_url=author_avatar_url,
|
|
||||||
content=content,
|
|
||||||
timestamp=int(msg.created_at.timestamp()),
|
|
||||||
files=files if files else None,
|
|
||||||
reply_to_message_id=reply_to_fluxer_id,
|
|
||||||
is_forwarded=is_forwarded,
|
|
||||||
embeds=msg.embeds
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if fluxer_msg_id:
|
# Check for associated thread (Individual mode recursion)
|
||||||
if thread_id:
|
|
||||||
context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), fluxer_msg_id)
|
|
||||||
else:
|
|
||||||
context.state.set_message_mapping(target_channel_id, str(msg.id), fluxer_msg_id)
|
|
||||||
else:
|
|
||||||
logger.warning(f"Fluxer: send_message returned None for Discord ID {msg.id} (message might have been skipped or timed out)")
|
|
||||||
|
|
||||||
if thread_id:
|
|
||||||
context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at))
|
|
||||||
context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id))
|
|
||||||
context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0)
|
|
||||||
else:
|
|
||||||
context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at))
|
|
||||||
context.state.update_last_message_id(target_channel_id, str(msg.id))
|
|
||||||
context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0)
|
|
||||||
|
|
||||||
stats["messages"] += 1
|
|
||||||
stats["last_message_content"] = content
|
|
||||||
stats["last_message_author"] = msg.author.display_name
|
|
||||||
|
|
||||||
# Check for associated thread (Normal case: parent message is migrated)
|
|
||||||
if hasattr(msg, 'thread') and msg.thread:
|
if hasattr(msg, 'thread') and msg.thread:
|
||||||
thread = msg.thread
|
thread = msg.thread
|
||||||
if thread.id not in processed_threads:
|
if thread.id not in processed_threads:
|
||||||
processed_threads.add(thread.id)
|
processed_threads.add(thread.id)
|
||||||
# Track thread entry
|
|
||||||
stats["threads"] += 1
|
stats["threads"] += 1
|
||||||
|
|
||||||
# Fetch last migrated message ID for this thread
|
|
||||||
thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id))
|
thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id))
|
||||||
if thread_after_id:
|
|
||||||
logger.info(f"Resuming thread '{thread.name}' from after message ID: {thread_after_id}")
|
|
||||||
|
|
||||||
# Migrate thread messages recursively
|
|
||||||
thread_stats = await migrate_messages(
|
thread_stats = await migrate_messages(
|
||||||
context=context,
|
context=context,
|
||||||
source_channel_id=thread.id,
|
source_channel_id=thread.id,
|
||||||
|
|
@ -637,22 +763,19 @@ async def migrate_messages(
|
||||||
stats["attachments"] += thread_stats["attachments"]
|
stats["attachments"] += thread_stats["attachments"]
|
||||||
stats["threads"] += thread_stats["threads"]
|
stats["threads"] += thread_stats["threads"]
|
||||||
|
|
||||||
# Send End Marker
|
|
||||||
if context.is_running:
|
if context.is_running:
|
||||||
await context.fluxer_writer.send_marker(
|
await context.fluxer_writer.send_marker(
|
||||||
channel_id=target_channel_id,
|
channel_id=target_channel_id,
|
||||||
content=f"> <<< END OF THREAD >>>"
|
content=f"> <<< END OF THREAD >>>"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update Link Tracking (but prevent threaded messages from overwriting the parent channel pointers)
|
# Update Link Tracking (Parent pointer updates)
|
||||||
# The 'after_message_id' param usually means it's the main function call and not a thread recursive call
|
|
||||||
if not stats["first_message_url"]:
|
if not stats["first_message_url"]:
|
||||||
stats["first_message_url"] = msg.jump_url
|
stats["first_message_url"] = msg.jump_url
|
||||||
stats["last_message_url"] = msg.jump_url
|
stats["last_message_url"] = msg.jump_url
|
||||||
|
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
await progress_callback(stats)
|
await progress_callback(stats)
|
||||||
logger.debug(f"Fluxer: Finished processing message Discord ID {msg.id}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to process message {msg.id}: {e}")
|
logger.error(f"Failed to process message {msg.id}: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
|
|
@ -735,7 +858,7 @@ async def migrate_global_messages(
|
||||||
progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None
|
progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Migrates messages across all channels chronologically.
|
Migrates messages across all channels chronologically to Fluxer.
|
||||||
"""
|
"""
|
||||||
stats = {
|
stats = {
|
||||||
"messages": 0,
|
"messages": 0,
|
||||||
|
|
@ -750,14 +873,6 @@ async def migrate_global_messages(
|
||||||
processed_threads = set()
|
processed_threads = set()
|
||||||
logger.info("Starting Global Waterfall Migration for Fluxer...")
|
logger.info("Starting Global Waterfall Migration for Fluxer...")
|
||||||
|
|
||||||
# Keep track of active thread mapping natively to pass parent target IDs if needed
|
|
||||||
thread_to_target_channel = {}
|
|
||||||
|
|
||||||
# Emojis and mapped users cache setup
|
|
||||||
emoji_map = context.state.emoji_map
|
|
||||||
db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {}
|
|
||||||
target_server_id = getattr(context.fluxer_writer, "server_id", None)
|
|
||||||
|
|
||||||
# Fetch global progress map to skip migrated messages efficiently
|
# Fetch global progress map to skip migrated messages efficiently
|
||||||
progress_map = context.state.get_all_last_message_ids()
|
progress_map = context.state.get_all_last_message_ids()
|
||||||
|
|
||||||
|
|
@ -794,125 +909,19 @@ async def migrate_global_messages(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If it's a thread message, we need to handle it based on if it's the thread starter or a reply
|
# If it's a thread message, we need to handle it based on if it's the thread starter or a reply
|
||||||
parent_target_id = None
|
|
||||||
if hasattr(msg, 'thread') and msg.thread and msg.id == msg.thread.id:
|
if hasattr(msg, 'thread') and msg.thread and msg.id == msg.thread.id:
|
||||||
processed_threads.add(msg.thread.id)
|
processed_threads.add(msg.thread.id)
|
||||||
stats["threads"] += 1
|
stats["threads"] += 1
|
||||||
elif msg.channel.type in [11, 12]: # Thread channels
|
|
||||||
# It's a message IN a thread.
|
|
||||||
# In Fluxer, threads might just be linear messages or threaded replies depending on schema
|
|
||||||
# For basic migration we just send it to the parent mapped target channel.
|
|
||||||
# The parent mapped target channel ID should already be calculated correctly by get_target_channel_id (which returns mapped thread or parent channel)
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Formatting
|
|
||||||
files = []
|
|
||||||
file_names = []
|
|
||||||
|
|
||||||
# Always ensure alias is created/retrieved to populate user_alias table
|
|
||||||
alias = context.state.get_user_alias(str(msg.author.id))
|
|
||||||
|
|
||||||
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
|
|
||||||
|
|
||||||
if anonymize_users:
|
|
||||||
author_name = alias or "Anonymized User"
|
|
||||||
author_avatar_url = None
|
|
||||||
else:
|
|
||||||
author_name = msg.author.display_name
|
|
||||||
author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
|
|
||||||
|
|
||||||
for att in msg.attachments:
|
|
||||||
media_info = db_media.get(att.local_hash) if db_media else None
|
|
||||||
local_path = None
|
|
||||||
if media_info:
|
|
||||||
local_path = Path(media_info["local_path"])
|
|
||||||
elif hasattr(att, 'read'):
|
|
||||||
# Fallback
|
|
||||||
pass
|
|
||||||
|
|
||||||
if local_path and local_path.exists():
|
|
||||||
files.append(local_path)
|
|
||||||
file_names.append(att.filename)
|
|
||||||
|
|
||||||
content = msg.content or ""
|
|
||||||
|
|
||||||
# Stickers
|
|
||||||
for sticker in msg.stickers:
|
|
||||||
sticker_name = sticker.name
|
|
||||||
sticker_url = sticker.url
|
|
||||||
|
|
||||||
# Check for uploaded media pool logic first
|
|
||||||
s_hash = getattr(sticker, "local_hash", None)
|
|
||||||
sticker_file = None
|
|
||||||
s_media = db_media.get(s_hash) if db_media and s_hash else None
|
|
||||||
if s_media:
|
|
||||||
s_path = Path(s_media["local_path"])
|
|
||||||
if s_path.exists():
|
|
||||||
sticker_file = s_path
|
|
||||||
|
|
||||||
content += f"\n[Sticker: {sticker_name}]"
|
|
||||||
if sticker_file:
|
|
||||||
files.append(sticker_file)
|
|
||||||
file_names.append(f"sticker_{sticker_name}.png")
|
|
||||||
|
|
||||||
content = clean_mentions(
|
|
||||||
content=content,
|
|
||||||
guild=context.discord_reader.guild,
|
|
||||||
user_mentions=msg.mentions,
|
|
||||||
role_mentions=msg.role_mentions,
|
|
||||||
channel_mentions=msg.channel_mentions,
|
|
||||||
emoji_map=emoji_map,
|
|
||||||
channel_map=context.state.channel_map,
|
|
||||||
state=context.state,
|
|
||||||
target_server_id=target_server_id,
|
|
||||||
channel_names=context.channel_names if hasattr(context, 'channel_names') else None,
|
|
||||||
anonymize_users=anonymize_users
|
|
||||||
)
|
|
||||||
|
|
||||||
if not content and not files:
|
|
||||||
logger.debug(f"Message {msg.id} empty after processing, skipping.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
timestamp_int = int(msg.created_at.timestamp())
|
|
||||||
|
|
||||||
if msg.reference and msg.reference.message_id:
|
|
||||||
# Resolve the author of the message being replied to
|
|
||||||
source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id)
|
|
||||||
if source_ref_msg and source_ref_msg.author:
|
|
||||||
ref_author_id = str(source_ref_msg.author.id)
|
|
||||||
if anonymize_users:
|
|
||||||
ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User"
|
|
||||||
else:
|
|
||||||
ref_name = source_ref_msg.author.display_name
|
|
||||||
content = f"`@{ref_name}`\n{content}"
|
|
||||||
else:
|
|
||||||
# Fallback if author cannot be resolved (e.g. deleted/missing from backup)
|
|
||||||
tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id)
|
|
||||||
if tgt_reply:
|
|
||||||
content = f"[Reply to {tgt_reply}]\n{content}"
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fluxer_msg_id = await context.fluxer_writer.send_message(
|
await _process_and_send_message(
|
||||||
channel_id=target_channel_id,
|
context=context,
|
||||||
author_name=author_name,
|
msg=msg,
|
||||||
author_avatar_url=author_avatar_url,
|
target_channel_id=target_channel_id,
|
||||||
content=content,
|
stats=stats,
|
||||||
files=files,
|
processed_threads=processed_threads
|
||||||
timestamp=timestamp_int,
|
|
||||||
embeds=msg.embeds
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if fluxer_msg_id:
|
|
||||||
context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id)
|
|
||||||
context.state.update_last_message_id(target_channel_id, msg.id)
|
|
||||||
context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at))
|
|
||||||
context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0)
|
|
||||||
stats["attachments"] += len(files) if files else 0
|
|
||||||
|
|
||||||
stats["messages"] += 1
|
|
||||||
stats["last_message_content"] = content
|
|
||||||
stats["last_message_author"] = author_name
|
|
||||||
|
|
||||||
if not stats["first_message_url"]:
|
if not stats["first_message_url"]:
|
||||||
stats["first_message_url"] = msg.jump_url
|
stats["first_message_url"] = msg.jump_url
|
||||||
stats["last_message_url"] = msg.jump_url
|
stats["last_message_url"] = msg.jump_url
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,192 @@ async def get_channel_threads(reader: Any, channel_id: int) -> List[Any]:
|
||||||
return threads
|
return threads
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_and_send_message(
|
||||||
|
context: MigrationContext,
|
||||||
|
msg: Any,
|
||||||
|
target_channel_id: str,
|
||||||
|
stats: Dict[str, Any],
|
||||||
|
thread_id: str | None = None,
|
||||||
|
parent_target_id: str | None = None,
|
||||||
|
thread_name: str | None = None,
|
||||||
|
processed_threads: set | None = None
|
||||||
|
) -> str | None:
|
||||||
|
"""
|
||||||
|
Internal helper to process a single Discord message (mentions, attachments, stickers)
|
||||||
|
and send it to the Stoat platform.
|
||||||
|
"""
|
||||||
|
# 1. Processing Flags
|
||||||
|
is_forwarded = False
|
||||||
|
if hasattr(msg.flags, 'forwarded'):
|
||||||
|
is_forwarded = msg.flags.forwarded
|
||||||
|
|
||||||
|
# 2. Content & Formatting
|
||||||
|
content = msg.content or ""
|
||||||
|
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
|
||||||
|
alias = context.state.get_user_alias(str(msg.author.id))
|
||||||
|
|
||||||
|
# Process Stickers
|
||||||
|
files = []
|
||||||
|
if hasattr(msg, 'stickers') and msg.stickers:
|
||||||
|
for s in msg.stickers:
|
||||||
|
try:
|
||||||
|
sticker_data = await context.discord_reader.download_sticker(s)
|
||||||
|
if not sticker_data: continue
|
||||||
|
|
||||||
|
format_val = getattr(s, 'format', 'png')
|
||||||
|
if hasattr(format_val, 'name'):
|
||||||
|
ext = format_val.name.lower()
|
||||||
|
elif isinstance(format_val, int):
|
||||||
|
format_map = {1: 'png', 2: 'apng', 3: 'lottie', 4: 'gif'}
|
||||||
|
ext = format_map.get(format_val, 'png')
|
||||||
|
else:
|
||||||
|
ext = str(format_val).lower()
|
||||||
|
|
||||||
|
# Conversion logic for Stoat (WebP or GIF focus)
|
||||||
|
if ext == 'lottie' and HAS_LOTTIE:
|
||||||
|
try:
|
||||||
|
lottie_data = json.loads(sticker_data)
|
||||||
|
def _convert_lottie_to_gif(data):
|
||||||
|
animation = Animation.load(data)
|
||||||
|
output = io.BytesIO()
|
||||||
|
export_gif(animation, output)
|
||||||
|
return output.getvalue()
|
||||||
|
sticker_data = await asyncio.to_thread(_convert_lottie_to_gif, lottie_data)
|
||||||
|
ext = 'gif'
|
||||||
|
except Exception: ext = 'json'
|
||||||
|
elif ext == 'apng':
|
||||||
|
try:
|
||||||
|
from PIL import Image
|
||||||
|
def _convert_apng_to_gif(data):
|
||||||
|
img = Image.open(io.BytesIO(data))
|
||||||
|
gif_buf = io.BytesIO()
|
||||||
|
if getattr(img, 'n_frames', 1) > 1:
|
||||||
|
frames = []
|
||||||
|
durations = []
|
||||||
|
for i in range(img.n_frames):
|
||||||
|
img.seek(i)
|
||||||
|
frame = img.convert('RGBA')
|
||||||
|
current_frame = Image.new('RGBA', img.size, (0,0,0,0))
|
||||||
|
current_frame.paste(frame, (0, 0))
|
||||||
|
frames.append(current_frame)
|
||||||
|
durations.append(img.info.get('duration', 100))
|
||||||
|
frames[0].save(gif_buf, format='GIF', save_all=True, append_images=frames[1:], loop=0, duration=durations, disposal=2, transparency=0)
|
||||||
|
else: img.save(gif_buf, format='GIF')
|
||||||
|
return gif_buf.getvalue()
|
||||||
|
sticker_data = await asyncio.to_thread(_convert_apng_to_gif, sticker_data)
|
||||||
|
ext = 'gif'
|
||||||
|
except Exception: pass
|
||||||
|
|
||||||
|
files.append({
|
||||||
|
"filename": f"sticker_{s.name}_{s.id}.{ext}",
|
||||||
|
"data": sticker_data,
|
||||||
|
"content_type": f"image/{ext}" if ext != "json" else "application/json"
|
||||||
|
})
|
||||||
|
stats["attachments"] += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
|
||||||
|
|
||||||
|
# Process Attachments
|
||||||
|
attachments_to_process = list(msg.attachments)
|
||||||
|
if is_forwarded and hasattr(msg, 'message_snapshots') and msg.message_snapshots:
|
||||||
|
snapshot = msg.message_snapshots[0]
|
||||||
|
if not content: content = snapshot.content
|
||||||
|
attachments_to_process.extend(snapshot.attachments)
|
||||||
|
|
||||||
|
for att in attachments_to_process:
|
||||||
|
try:
|
||||||
|
att_data = await context.discord_reader.download_attachment(att)
|
||||||
|
files.append({
|
||||||
|
"filename": att.filename,
|
||||||
|
"data": att_data,
|
||||||
|
"content_type": getattr(att, "content_type", None)
|
||||||
|
})
|
||||||
|
stats["attachments"] += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to download attachment {att.filename}: {e}")
|
||||||
|
|
||||||
|
# Clean Mentions
|
||||||
|
content = clean_mentions(
|
||||||
|
content=content,
|
||||||
|
guild=context.discord_reader.guild,
|
||||||
|
user_mentions=msg.mentions,
|
||||||
|
role_mentions=msg.role_mentions,
|
||||||
|
channel_mentions=msg.channel_mentions,
|
||||||
|
emoji_map=context.state.emoji_map,
|
||||||
|
channel_map=context.state.channel_map,
|
||||||
|
state=context.state,
|
||||||
|
target_server_id=context.stoat_writer.community_id,
|
||||||
|
channel_names=context.channel_names if hasattr(context, 'channel_names') else None,
|
||||||
|
anonymize_users=anonymize_users
|
||||||
|
)
|
||||||
|
|
||||||
|
if not content and not files:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Reply Resolution
|
||||||
|
reply_to_stoat_id = None
|
||||||
|
if msg.reference and msg.reference.message_id:
|
||||||
|
reply_to_stoat_id = context.state.get_target_message_id(target_channel_id, str(msg.reference.message_id))
|
||||||
|
if not reply_to_stoat_id:
|
||||||
|
# Fallback author tagging
|
||||||
|
try:
|
||||||
|
source_ref_msg = await context.discord_reader.get_message(msg.channel.id, msg.reference.message_id)
|
||||||
|
if source_ref_msg and source_ref_msg.author:
|
||||||
|
ref_name = context.state.get_user_alias(str(source_ref_msg.author.id)) if anonymize_users else source_ref_msg.author.display_name
|
||||||
|
content = f"`@{ref_name}`\n{content}"
|
||||||
|
else:
|
||||||
|
tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id)
|
||||||
|
if tgt_reply: content = f"[Reply to {tgt_reply}]\n{content}"
|
||||||
|
except Exception: pass
|
||||||
|
|
||||||
|
# Thread logic
|
||||||
|
if not reply_to_stoat_id and parent_target_id and stats["messages"] == 0:
|
||||||
|
reply_to_stoat_id = parent_target_id
|
||||||
|
if thread_name and stats["messages"] == 0:
|
||||||
|
content = f"> <<< THREAD: **{thread_name}** >>>\n{content}"
|
||||||
|
|
||||||
|
# Author resolution
|
||||||
|
if anonymize_users:
|
||||||
|
author_name = alias or "Anonymized User"
|
||||||
|
author_avatar_url = None
|
||||||
|
else:
|
||||||
|
author_name = msg.author.display_name
|
||||||
|
author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None
|
||||||
|
if author_avatar_url and not author_avatar_url.startswith("http"): author_avatar_url = None
|
||||||
|
|
||||||
|
# Send Message
|
||||||
|
stoat_msg_id = await context.stoat_writer.send_message(
|
||||||
|
channel_id=target_channel_id,
|
||||||
|
author_name=author_name,
|
||||||
|
author_avatar_url=author_avatar_url,
|
||||||
|
content=content,
|
||||||
|
timestamp=int(msg.created_at.timestamp()),
|
||||||
|
files=files if files else None,
|
||||||
|
reply_to_message_id=reply_to_stoat_id,
|
||||||
|
is_forwarded=is_forwarded,
|
||||||
|
embeds=msg.embeds
|
||||||
|
)
|
||||||
|
|
||||||
|
if stoat_msg_id:
|
||||||
|
if thread_id:
|
||||||
|
context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), stoat_msg_id)
|
||||||
|
context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at))
|
||||||
|
context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id))
|
||||||
|
context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0)
|
||||||
|
else:
|
||||||
|
context.state.set_message_mapping(target_channel_id, str(msg.id), stoat_msg_id)
|
||||||
|
context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at))
|
||||||
|
context.state.update_last_message_id(target_channel_id, str(msg.id))
|
||||||
|
context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0)
|
||||||
|
|
||||||
|
stats["messages"] += 1
|
||||||
|
stats["last_message_content"] = content
|
||||||
|
stats["last_message_author"] = msg.author.display_name
|
||||||
|
|
||||||
|
return stoat_msg_id
|
||||||
|
|
||||||
async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]:
|
async def analyze_migration(context: MigrationContext, source_channel_id: int, after_message_id: int | None = None, inclusive: bool = False, progress_callback: Callable[[Dict[str, Any]], Awaitable[None]] | None = None, processed_threads: set | None = None) -> Dict[str, int]:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Scans channel history to count messages, threads, and attachments.
|
Scans channel history to count messages, threads, and attachments.
|
||||||
"""
|
"""
|
||||||
|
|
@ -546,87 +731,30 @@ async def migrate_messages(
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
|
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
|
||||||
|
|
||||||
try:
|
|
||||||
# Check for existing mapping to avoid duplicates when resuming
|
# Check for existing mapping to avoid duplicates when resuming
|
||||||
if context.state.get_target_message_id(target_channel_id, str(msg.id)):
|
if context.state.get_target_message_id(target_channel_id, str(msg.id)):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Check if this message is a reply
|
try:
|
||||||
reply_to_stoat_id = None
|
stoat_msg_id = await _process_and_send_message(
|
||||||
if msg.reference and msg.reference.message_id:
|
context=context,
|
||||||
reply_to_stoat_id = context.state.get_target_message_id(target_channel_id, str(msg.reference.message_id))
|
msg=msg,
|
||||||
if reply_to_stoat_id:
|
target_channel_id=target_channel_id,
|
||||||
logger.debug(f"Detected reply to Discord ID {msg.reference.message_id} -> Stoat ID {reply_to_stoat_id}")
|
stats=stats,
|
||||||
else:
|
thread_id=thread_id,
|
||||||
logger.debug(f"Reply target Discord ID {msg.reference.message_id} not found in current session map.")
|
parent_target_id=parent_target_id,
|
||||||
|
thread_name=thread_name,
|
||||||
# If this is the FIRST thread message and we have a parent_target_id, force it as reply to the starter
|
processed_threads=processed_threads
|
||||||
if not reply_to_stoat_id and parent_target_id and stats["messages"] == 0:
|
|
||||||
reply_to_stoat_id = parent_target_id
|
|
||||||
|
|
||||||
# Prepend thread marker to the first message of the thread
|
|
||||||
if thread_name and stats["messages"] == 0:
|
|
||||||
content = f"> <<< THREAD: **{thread_name}** >>>\n{content}"
|
|
||||||
|
|
||||||
# Always ensure alias is created/retrieved to populate user_alias table
|
|
||||||
alias = context.state.get_user_alias(str(msg.author.id))
|
|
||||||
|
|
||||||
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
|
|
||||||
if anonymize_users:
|
|
||||||
author_name = alias or "Anonymized User"
|
|
||||||
author_avatar_url = None
|
|
||||||
else:
|
|
||||||
author_name = msg.author.display_name
|
|
||||||
author_avatar_url = str(msg.author.display_avatar.url) if msg.author.display_avatar.url else None
|
|
||||||
if author_avatar_url and not author_avatar_url.startswith("http"):
|
|
||||||
author_avatar_url = None
|
|
||||||
|
|
||||||
logger.debug(f"Stoat: Calling send_message for Discord ID {msg.id}")
|
|
||||||
stoat_msg_id = await context.stoat_writer.send_message(
|
|
||||||
channel_id=target_channel_id,
|
|
||||||
author_name=author_name,
|
|
||||||
author_avatar_url=author_avatar_url,
|
|
||||||
content=content,
|
|
||||||
timestamp=int(msg.created_at.timestamp()),
|
|
||||||
files=files if files else None,
|
|
||||||
reply_to_message_id=reply_to_stoat_id,
|
|
||||||
is_forwarded=is_forwarded,
|
|
||||||
embeds=msg.embeds
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if stoat_msg_id:
|
# Check for associated thread (Individual mode recursion)
|
||||||
if thread_id:
|
|
||||||
context.state.set_thread_message_mapping(target_channel_id, thread_id, str(msg.id), stoat_msg_id)
|
|
||||||
else:
|
|
||||||
context.state.set_message_mapping(target_channel_id, str(msg.id), stoat_msg_id)
|
|
||||||
|
|
||||||
if thread_id:
|
|
||||||
context.state.update_thread_last_message_timestamp(target_channel_id, thread_id, str(msg.created_at))
|
|
||||||
context.state.update_thread_last_message_id(target_channel_id, thread_id, str(msg.id))
|
|
||||||
context.state.increment_thread_stats(target_channel_id, thread_id, messages=1, files=len(files) if files else 0)
|
|
||||||
else:
|
|
||||||
context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at))
|
|
||||||
context.state.update_last_message_id(target_channel_id, str(msg.id))
|
|
||||||
context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0)
|
|
||||||
|
|
||||||
stats["messages"] += 1
|
|
||||||
stats["last_message_content"] = content
|
|
||||||
stats["last_message_author"] = msg.author.display_name
|
|
||||||
|
|
||||||
# Check for associated thread (Normal case: parent message is migrated)
|
|
||||||
if hasattr(msg, 'thread') and msg.thread:
|
if hasattr(msg, 'thread') and msg.thread:
|
||||||
thread = msg.thread
|
thread = msg.thread
|
||||||
if thread.id not in processed_threads:
|
if thread.id not in processed_threads:
|
||||||
processed_threads.add(thread.id)
|
processed_threads.add(thread.id)
|
||||||
# Track thread entry
|
|
||||||
stats["threads"] += 1
|
stats["threads"] += 1
|
||||||
|
|
||||||
# Fetch last migrated message ID for this thread
|
|
||||||
thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id))
|
thread_after_id = context.state.get_thread_last_message_id(target_channel_id, str(thread.id))
|
||||||
if thread_after_id:
|
|
||||||
logger.info(f"Resuming thread '{thread.name}' from after message ID: {thread_after_id}")
|
|
||||||
|
|
||||||
# Migrate thread messages recursively
|
|
||||||
thread_stats = await migrate_messages(
|
thread_stats = await migrate_messages(
|
||||||
context=context,
|
context=context,
|
||||||
source_channel_id=thread.id,
|
source_channel_id=thread.id,
|
||||||
|
|
@ -641,7 +769,6 @@ async def migrate_messages(
|
||||||
stats["attachments"] += thread_stats["attachments"]
|
stats["attachments"] += thread_stats["attachments"]
|
||||||
stats["threads"] += thread_stats["threads"]
|
stats["threads"] += thread_stats["threads"]
|
||||||
|
|
||||||
# Send End Marker
|
|
||||||
if context.is_running:
|
if context.is_running:
|
||||||
await context.stoat_writer.send_marker(
|
await context.stoat_writer.send_marker(
|
||||||
channel_id=target_channel_id,
|
channel_id=target_channel_id,
|
||||||
|
|
@ -656,13 +783,12 @@ async def migrate_messages(
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
await progress_callback(stats)
|
await progress_callback(stats)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# If it's a permission error, stop the entire migration
|
if "MissingPermission" in str(e): raise
|
||||||
if "MissingPermission" in str(e):
|
|
||||||
raise
|
|
||||||
logger.error(f"Failed to process message {msg.id}: {e}")
|
logger.error(f"Failed to process message {msg.id}: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
|
|
||||||
|
|
||||||
# Mark thread as completed if we finished the loop without being interrupted
|
# Mark thread as completed if we finished the loop without being interrupted
|
||||||
if thread_id and context.is_running:
|
if thread_id and context.is_running:
|
||||||
context.state.update_thread_completed(target_channel_id, thread_id, completed=True)
|
context.state.update_thread_completed(target_channel_id, thread_id, completed=True)
|
||||||
|
|
@ -795,115 +921,15 @@ async def migrate_global_messages(
|
||||||
elif msg.channel.type in [11, 12]:
|
elif msg.channel.type in [11, 12]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Formatting
|
|
||||||
files = []
|
|
||||||
|
|
||||||
# Always ensure alias is created/retrieved to populate user_alias table
|
|
||||||
alias = context.state.get_user_alias(str(msg.author.id))
|
|
||||||
|
|
||||||
anonymize_users = context.config.anonymize_users if hasattr(context, 'config') else False
|
|
||||||
|
|
||||||
if anonymize_users:
|
|
||||||
author_name = alias or "Anonymized User"
|
|
||||||
author_avatar_url = None
|
|
||||||
else:
|
|
||||||
author_name = msg.author.display_name
|
|
||||||
author_avatar_url = msg.author.avatar.url if hasattr(msg.author, 'avatar') and msg.author.avatar else None
|
|
||||||
|
|
||||||
for att in msg.attachments:
|
|
||||||
media_info = db_media.get(att.local_hash) if db_media else None
|
|
||||||
local_path = None
|
|
||||||
if media_info:
|
|
||||||
local_path = Path(media_info["local_path"])
|
|
||||||
|
|
||||||
if local_path and local_path.exists():
|
|
||||||
try:
|
try:
|
||||||
with open(local_path, "rb") as f:
|
await _process_and_send_message(
|
||||||
files.append({"filename": att.filename, "data": f.read()})
|
context=context,
|
||||||
except Exception as fe:
|
msg=msg,
|
||||||
logger.error(f"Failed to read file {local_path}: {fe}")
|
target_channel_id=target_channel_id,
|
||||||
|
stats=stats,
|
||||||
content = msg.content or ""
|
processed_threads=processed_threads
|
||||||
|
|
||||||
for sticker in msg.stickers:
|
|
||||||
sticker_name = getattr(sticker, "name", "unknown")
|
|
||||||
s_hash = getattr(sticker, "local_hash", None)
|
|
||||||
sticker_file = None
|
|
||||||
s_media = db_media.get(s_hash) if db_media and s_hash else None
|
|
||||||
if s_media:
|
|
||||||
s_path = Path(s_media["local_path"])
|
|
||||||
if s_path.exists():
|
|
||||||
sticker_file = s_path
|
|
||||||
|
|
||||||
content += f"\n[Sticker: {sticker_name}]"
|
|
||||||
if sticker_file:
|
|
||||||
try:
|
|
||||||
with open(sticker_file, "rb") as f:
|
|
||||||
files.append({
|
|
||||||
"filename": f"sticker_{sticker_name}.png",
|
|
||||||
"data": f.read(),
|
|
||||||
"content_type": "image/png"
|
|
||||||
})
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to read sticker file {sticker_file}: {e}")
|
|
||||||
|
|
||||||
content = clean_mentions(
|
|
||||||
content=content,
|
|
||||||
guild=context.discord_reader.guild,
|
|
||||||
user_mentions=msg.mentions,
|
|
||||||
role_mentions=msg.role_mentions,
|
|
||||||
channel_mentions=msg.channel_mentions,
|
|
||||||
emoji_map=emoji_map,
|
|
||||||
channel_map=context.state.channel_map,
|
|
||||||
state=context.state,
|
|
||||||
target_server_id=target_server_id,
|
|
||||||
channel_names=context.channel_names if hasattr(context, 'channel_names') else None,
|
|
||||||
anonymize_users=anonymize_users
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not content and not files:
|
|
||||||
logger.debug(f"Message {msg.id} empty after processing, skipping.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
timestamp_int = int(msg.created_at.timestamp())
|
|
||||||
|
|
||||||
if msg.reference and msg.reference.message_id:
|
|
||||||
# Resolve the author of the message being replied to
|
|
||||||
source_ref_msg = await context.discord_reader.get_message(msg.channel_id, msg.reference.message_id)
|
|
||||||
if source_ref_msg and source_ref_msg.author:
|
|
||||||
ref_author_id = str(source_ref_msg.author.id)
|
|
||||||
if anonymize_users:
|
|
||||||
ref_name = context.state.get_user_alias(ref_author_id) or "Anonymized User"
|
|
||||||
else:
|
|
||||||
ref_name = source_ref_msg.author.display_name
|
|
||||||
content = f"`@{ref_name}`\n{content}"
|
|
||||||
else:
|
|
||||||
tgt_reply = context.state.get_target_message_id(target_channel_id, msg.reference.message_id)
|
|
||||||
if tgt_reply:
|
|
||||||
content = f"[Reply to {tgt_reply}]\n{content}"
|
|
||||||
|
|
||||||
try:
|
|
||||||
stoat_msg_id = await context.stoat_writer.send_message(
|
|
||||||
channel_id=target_channel_id,
|
|
||||||
author_name=author_name,
|
|
||||||
author_avatar_url=author_avatar_url,
|
|
||||||
content=content,
|
|
||||||
files=files,
|
|
||||||
timestamp=timestamp_int,
|
|
||||||
embeds=msg.embeds
|
|
||||||
)
|
|
||||||
|
|
||||||
if stoat_msg_id:
|
|
||||||
context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id)
|
|
||||||
context.state.update_last_message_id(target_channel_id, msg.id)
|
|
||||||
context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at))
|
|
||||||
context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0)
|
|
||||||
stats["attachments"] += len(files) if files else 0
|
|
||||||
|
|
||||||
stats["messages"] += 1
|
|
||||||
stats["last_message_content"] = content
|
|
||||||
stats["last_message_author"] = author_name
|
|
||||||
|
|
||||||
if not stats["first_message_url"]:
|
if not stats["first_message_url"]:
|
||||||
stats["first_message_url"] = msg.jump_url
|
stats["first_message_url"] = msg.jump_url
|
||||||
stats["last_message_url"] = msg.jump_url
|
stats["last_message_url"] = msg.jump_url
|
||||||
|
|
@ -914,6 +940,7 @@ async def migrate_global_messages(
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to process global message {msg.id}: {e}")
|
logger.error(f"Failed to process global message {msg.id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
except (KeyboardInterrupt, asyncio.CancelledError):
|
except (KeyboardInterrupt, asyncio.CancelledError):
|
||||||
context.is_running = False
|
context.is_running = False
|
||||||
pass
|
pass
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue