improvement: batch process backup messages
This commit is contained in:
parent
16e94013c7
commit
5b31b7fc50
2 changed files with 64 additions and 15 deletions
|
|
@ -392,26 +392,72 @@ class DiscordExporter:
|
||||||
thread_count = 0
|
thread_count = 0
|
||||||
thread_msg_count = 0
|
thread_msg_count = 0
|
||||||
|
|
||||||
|
BATCH_SIZE = 100 # Process messages in parallel batches
|
||||||
|
UI_LOG_INTERVAL = 10 # Only log message preview every N messages
|
||||||
|
USER_SAVE_INTERVAL = 100 # Save user_info.json every N new messages
|
||||||
|
|
||||||
# 1. Fetch new messages - Handle Forbidden gracefully
|
# 1. Fetch new messages - Handle Forbidden gracefully
|
||||||
try:
|
try:
|
||||||
|
batch_raw = []
|
||||||
async for msg in self.reader.fetch_message_history(channel_id, after_id=last_id):
|
async for msg in self.reader.fetch_message_history(channel_id, after_id=last_id):
|
||||||
if not self.is_running: break
|
if not self.is_running: break
|
||||||
await asyncio.sleep(0) # Yield control
|
batch_raw.append(msg)
|
||||||
msg_data = await self._format_message(msg, asset_dir, asset_prefix, avatar_dir, avatar_rel_base)
|
|
||||||
messages.append(msg_data)
|
# Process in batches for parallelism
|
||||||
new_count += 1
|
if len(batch_raw) >= BATCH_SIZE:
|
||||||
accumulated_count += 1
|
# Format all messages in the batch concurrently
|
||||||
|
batch_results = await asyncio.gather(
|
||||||
|
*(self._format_message(m, asset_dir, asset_prefix, avatar_dir, avatar_rel_base) for m in batch_raw)
|
||||||
|
)
|
||||||
|
messages.extend(batch_results)
|
||||||
|
new_count += len(batch_results)
|
||||||
|
accumulated_count += len(batch_results)
|
||||||
|
|
||||||
|
# Throttled UI update: show preview only for the last message in the batch
|
||||||
|
if progress_callback and new_count % UI_LOG_INTERVAL < BATCH_SIZE:
|
||||||
|
last_msg = batch_raw[-1]
|
||||||
|
author = getattr(last_msg, "author", None)
|
||||||
|
author_name = getattr(author, "display_name", "Unknown") if author else "Unknown"
|
||||||
|
content = last_msg.content or ""
|
||||||
|
attachments_len = len(last_msg.attachments) if hasattr(last_msg, "attachments") else 0
|
||||||
|
preview = content[:150] + ("..." if len(content) > 150 else "")
|
||||||
|
if attachments_len:
|
||||||
|
preview += f" [dim]({attachments_len} attachments)[/dim]"
|
||||||
|
if not preview:
|
||||||
|
preview = "[dim](no content)[/dim]"
|
||||||
|
await progress_callback(channel_name, accumulated_count, author_name=author_name, message_preview=preview)
|
||||||
|
elif progress_callback:
|
||||||
|
await progress_callback(channel_name, accumulated_count)
|
||||||
|
|
||||||
|
# Periodic save of user_info.json (every ~100 messages)
|
||||||
|
if new_count % USER_SAVE_INTERVAL < BATCH_SIZE:
|
||||||
|
await self._save_json(user_info_file, list(self.user_cache.values()))
|
||||||
|
|
||||||
|
batch_raw.clear()
|
||||||
|
|
||||||
|
# Process remaining messages in the last partial batch
|
||||||
|
if batch_raw and self.is_running:
|
||||||
|
batch_results = await asyncio.gather(
|
||||||
|
*(self._format_message(m, asset_dir, asset_prefix, avatar_dir, avatar_rel_base) for m in batch_raw)
|
||||||
|
)
|
||||||
|
messages.extend(batch_results)
|
||||||
|
new_count += len(batch_results)
|
||||||
|
accumulated_count += len(batch_results)
|
||||||
|
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
author = getattr(msg, "author", None)
|
last_msg = batch_raw[-1]
|
||||||
|
author = getattr(last_msg, "author", None)
|
||||||
author_name = getattr(author, "display_name", "Unknown") if author else "Unknown"
|
author_name = getattr(author, "display_name", "Unknown") if author else "Unknown"
|
||||||
content = msg.content or ""
|
content = last_msg.content or ""
|
||||||
attachments_len = len(msg.attachments) if hasattr(msg, "attachments") else 0
|
|
||||||
preview = content[:150] + ("..." if len(content) > 150 else "")
|
preview = content[:150] + ("..." if len(content) > 150 else "")
|
||||||
if attachments_len:
|
|
||||||
preview += f" [dim]({attachments_len} attachments)[/dim]"
|
|
||||||
if not preview:
|
if not preview:
|
||||||
preview = "[dim](no content)[/dim]"
|
preview = "[dim](no content)[/dim]"
|
||||||
await progress_callback(channel_name, accumulated_count, author_name=author_name, message_preview=preview)
|
await progress_callback(channel_name, accumulated_count, author_name=author_name, message_preview=preview)
|
||||||
|
|
||||||
|
# Final user save after last batch
|
||||||
|
await self._save_json(user_info_file, list(self.user_cache.values()))
|
||||||
|
batch_raw.clear()
|
||||||
|
|
||||||
except discord.Forbidden:
|
except discord.Forbidden:
|
||||||
logger.error(f"403 Forbidden: Missing Access to read messages in {channel_name} ({channel_id})")
|
logger.error(f"403 Forbidden: Missing Access to read messages in {channel_name} ({channel_id})")
|
||||||
if not messages: return accumulated_count
|
if not messages: return accumulated_count
|
||||||
|
|
@ -483,9 +529,6 @@ class DiscordExporter:
|
||||||
# Save channel messages
|
# Save channel messages
|
||||||
await asyncio.sleep(0) # Yield before writing large JSON
|
await asyncio.sleep(0) # Yield before writing large JSON
|
||||||
await self._save_json(json_file, output_data)
|
await self._save_json(json_file, output_data)
|
||||||
|
|
||||||
# Save/Update user_info.json (usually small, but consistent to thread it)
|
|
||||||
await self._save_json(user_info_file, list(self.user_cache.values()))
|
|
||||||
|
|
||||||
# If it's a forum, also export its threads into the sub-directory
|
# If it's a forum, also export its threads into the sub-directory
|
||||||
if is_forum:
|
if is_forum:
|
||||||
|
|
|
||||||
|
|
@ -388,9 +388,12 @@ class BackupPane(Container):
|
||||||
modal_prog.write(f"[cyan]{label}: {chan.name}[/cyan]")
|
modal_prog.write(f"[cyan]{label}: {chan.name}[/cyan]")
|
||||||
logger.info(f"{label} for channel: #{chan.name} ({chan.id})")
|
logger.info(f"{label} for channel: #{chan.name} ({chan.id})")
|
||||||
|
|
||||||
|
_msg_log_counter = 0
|
||||||
async def update_msg_count(name, count, author_name=None, message_preview=None):
|
async def update_msg_count(name, count, author_name=None, message_preview=None):
|
||||||
|
nonlocal _msg_log_counter
|
||||||
modal_prog.update_stats(messages=str(count))
|
modal_prog.update_stats(messages=str(count))
|
||||||
if author_name and message_preview:
|
_msg_log_counter += 1
|
||||||
|
if author_name and message_preview and _msg_log_counter % 10 == 0:
|
||||||
modal_prog.write(f"[bold]{author_name}:[/bold] {message_preview}")
|
modal_prog.write(f"[bold]{author_name}:[/bold] {message_preview}")
|
||||||
|
|
||||||
accumulated_msgs = await self.exporter.export_channel_messages(
|
accumulated_msgs = await self.exporter.export_channel_messages(
|
||||||
|
|
@ -513,9 +516,12 @@ class BackupPane(Container):
|
||||||
modal_prog.write(f"[cyan]Syncing: {chan.name}[/cyan]")
|
modal_prog.write(f"[cyan]Syncing: {chan.name}[/cyan]")
|
||||||
logger.info(f"Syncing backup for channel: #{chan.name} ({chan.id})")
|
logger.info(f"Syncing backup for channel: #{chan.name} ({chan.id})")
|
||||||
|
|
||||||
|
_msg_log_counter = 0
|
||||||
async def update_msg_count(name, count, author_name=None, message_preview=None):
|
async def update_msg_count(name, count, author_name=None, message_preview=None):
|
||||||
|
nonlocal _msg_log_counter
|
||||||
modal_prog.update_stats(messages=str(count))
|
modal_prog.update_stats(messages=str(count))
|
||||||
if author_name and message_preview:
|
_msg_log_counter += 1
|
||||||
|
if author_name and message_preview and _msg_log_counter % 10 == 0:
|
||||||
modal_prog.write(f"[bold]{author_name}:[/bold] {message_preview}")
|
modal_prog.write(f"[bold]{author_name}:[/bold] {message_preview}")
|
||||||
|
|
||||||
accumulated_msgs = await self.exporter.export_channel_messages(
|
accumulated_msgs = await self.exporter.export_channel_messages(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue