implement incremental message backup
This commit is contained in:
parent
5091fc217f
commit
769c9a5ece
2 changed files with 240 additions and 65 deletions
|
|
@ -15,6 +15,7 @@ class DiscordExporter:
|
|||
self.reader = reader
|
||||
self.server_name = ""
|
||||
self.server_id = ""
|
||||
self.user_cache = {}
|
||||
|
||||
async def setup(self):
|
||||
"""Prepares the output directory and fetches server metadata."""
|
||||
|
|
@ -281,67 +282,187 @@ class DiscordExporter:
|
|||
async def export_channel_messages(self, channel_id: int, progress_callback=None):
|
||||
"""Exports all messages from a channel, including attachments, pins, reactions."""
|
||||
channel = await self.reader.get_channel(channel_id)
|
||||
channel_name = channel.name
|
||||
if not channel: return 0
|
||||
|
||||
# Create channel-specific folder
|
||||
chan_dir = self.export_path / "channels" / f"{channel_name}_{channel_id}"
|
||||
attach_dir = chan_dir / "attachments"
|
||||
chan_dir.mkdir(parents=True, exist_ok=True)
|
||||
attach_dir.mkdir(exist_ok=True)
|
||||
channel_name = channel.name
|
||||
safe_name = channel_name.replace(" ", "-").lower()
|
||||
|
||||
# Detection for thread grouping
|
||||
is_thread = isinstance(channel, discord.Thread)
|
||||
backup_root = self.export_path / "message_backup"
|
||||
|
||||
if is_thread:
|
||||
backup_dir = backup_root / "threads"
|
||||
avatar_rel_base = "../user_avatars"
|
||||
else:
|
||||
backup_dir = backup_root
|
||||
avatar_rel_base = "user_avatars"
|
||||
|
||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Shared avatars directory (always at root of message_backup)
|
||||
avatar_dir = backup_root / "user_avatars"
|
||||
avatar_dir.mkdir(exist_ok=True)
|
||||
|
||||
# Load existing user_info.json
|
||||
user_info_file = backup_root / "user_info.json"
|
||||
if not self.user_cache and user_info_file.exists():
|
||||
try:
|
||||
with open(user_info_file, "r", encoding="utf-8") as f:
|
||||
u_list = json.load(f)
|
||||
self.user_cache = {u["id"]: u for u in u_list}
|
||||
except Exception:
|
||||
self.user_cache = {}
|
||||
|
||||
base_filename = f"{safe_name}-{channel_id}"
|
||||
json_file = backup_dir / f"{base_filename}.json"
|
||||
asset_dir = backup_dir / base_filename
|
||||
asset_dir.mkdir(exist_ok=True)
|
||||
|
||||
messages = []
|
||||
count = 0
|
||||
last_id = None
|
||||
|
||||
async for msg in self.reader.fetch_message_history(channel_id):
|
||||
msg_data = await self._format_message(msg, attach_dir)
|
||||
messages.append(msg_data)
|
||||
count += 1
|
||||
if progress_callback:
|
||||
await progress_callback(channel_name, count)
|
||||
|
||||
# Export pins
|
||||
pins = []
|
||||
if hasattr(channel, "pins"):
|
||||
# Load existing messages for incremental sync
|
||||
if json_file.exists():
|
||||
try:
|
||||
pins_objects = await channel.pins()
|
||||
pins = [str(p.id) for p in pins_objects]
|
||||
with open(json_file, "r", encoding="utf-8") as f:
|
||||
old_data = json.load(f)
|
||||
messages = old_data.get("messages", [])
|
||||
if "lastMessageID" in old_data:
|
||||
last_id = int(old_data["lastMessageID"])
|
||||
elif messages:
|
||||
last_id = int(messages[-1]["messageID"])
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch pins for {channel_name}: {e}")
|
||||
logger.warning(f"Could not load existing backup for sync in {channel_name}: {e}")
|
||||
messages = []
|
||||
|
||||
count = len(messages)
|
||||
new_count = 0
|
||||
thread_count = 0
|
||||
thread_msg_count = 0
|
||||
|
||||
# 1. Fetch new messages - Handle Forbidden gracefully
|
||||
try:
|
||||
async for msg in self.reader.fetch_message_history(channel_id, after_id=last_id):
|
||||
msg_data = await self._format_message(msg, asset_dir, base_filename, avatar_dir, avatar_rel_base)
|
||||
messages.append(msg_data)
|
||||
new_count += 1
|
||||
if progress_callback:
|
||||
await progress_callback(channel_name, count + new_count)
|
||||
except discord.Forbidden:
|
||||
logger.error(f"403 Forbidden: Missing Access to read messages in {channel_name} ({channel_id})")
|
||||
if not messages: return 0
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching messages for {channel_name}: {e}")
|
||||
if not messages: return 0
|
||||
|
||||
# 2. Handle Threads and collect counts accurately
|
||||
all_threads = []
|
||||
try:
|
||||
# Active threads: fetch_active_threads() is needed because channel.threads is a cache property
|
||||
# and might be empty when running via API without gateway.
|
||||
if self.reader.guild:
|
||||
active_threads_data = await self.reader.guild.fetch_active_threads()
|
||||
all_threads.extend([t for t in active_threads_data.threads if t.parent_id == channel_id])
|
||||
|
||||
# Archived threads
|
||||
if hasattr(channel, "archived_threads"):
|
||||
async for thread in channel.archived_threads(limit=None):
|
||||
all_threads.append(thread)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch threads for count in {channel_name}: {e}")
|
||||
|
||||
thread_count = len(all_threads)
|
||||
for t in all_threads:
|
||||
thread_msg_count += (t.message_count or 0)
|
||||
|
||||
output_data = {
|
||||
"channel_info": self._format_channel(channel),
|
||||
"messages": messages,
|
||||
"pins": pins
|
||||
"channelName": channel_name,
|
||||
"channelID": str(channel_id),
|
||||
"messageCount": len(messages),
|
||||
"threadCount": thread_count,
|
||||
"lastMessageID": str(messages[-1]["messageID"]) if messages else None,
|
||||
"threadMessagesCount": thread_msg_count,
|
||||
"lastBackup": discord.utils.utcnow().isoformat(),
|
||||
"messages": messages
|
||||
}
|
||||
|
||||
with open(chan_dir / "messages.json", "w", encoding="utf-8") as f:
|
||||
if is_thread:
|
||||
output_data["parentID"] = str(channel.parent_id)
|
||||
output_data["parentName"] = getattr(channel.parent, "name", "Unknown")
|
||||
|
||||
# Save channel messages
|
||||
with open(json_file, "w", encoding="utf-8") as f:
|
||||
json.dump(output_data, f, indent=4, ensure_ascii=False)
|
||||
|
||||
# Also check for threads
|
||||
if hasattr(channel, "threads"):
|
||||
# We fetch threads separately if needed, or they might be returned by get_channels if we are not careful
|
||||
pass
|
||||
# Save/Update user_info.json
|
||||
with open(user_info_file, "w", encoding="utf-8") as f:
|
||||
json.dump(list(self.user_cache.values()), f, indent=4, ensure_ascii=False)
|
||||
|
||||
return count
|
||||
|
||||
async def _format_message(self, msg, attach_dir):
|
||||
async def _format_message(self, msg, asset_dir, asset_prefix, avatar_dir, avatar_rel_base):
|
||||
"""Formats a single message to match the reference format."""
|
||||
attachments = []
|
||||
for a in msg.attachments:
|
||||
# Using ID to avoid collisions
|
||||
filename = f"{a.id}_{a.filename}"
|
||||
# mimic reference asset naming (suffixing hash/id)
|
||||
safe_name = a.filename
|
||||
short_id = str(a.id)[-5:]
|
||||
stored_name = f"{Path(safe_name).stem}-{short_id}{Path(safe_name).suffix}"
|
||||
|
||||
try:
|
||||
# Optimized download can be added later (e.g. queue)
|
||||
# data = await self.reader.download_attachment(a)
|
||||
# with open(attach_dir / filename, "wb") as f:
|
||||
# f.write(data)
|
||||
# Check if exists, else download (basic cache)
|
||||
target = asset_dir / stored_name
|
||||
if not target.exists():
|
||||
data = await a.read()
|
||||
with open(target, "wb") as f:
|
||||
f.write(data)
|
||||
|
||||
attachments.append({
|
||||
"id": str(a.id),
|
||||
"filename": a.filename,
|
||||
"url": a.url,
|
||||
"local_path": f"attachments/{filename}"
|
||||
"url": f"{asset_prefix}/{stored_name}",
|
||||
"fileName": a.filename,
|
||||
"fileSizeBytes": a.size
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download attachment {a.filename} from message {msg.id}: {e}")
|
||||
logger.error(f"Failed to download attachment {a.filename}: {e}")
|
||||
|
||||
# Author info extraction and deduplication
|
||||
author = msg.author
|
||||
user_id = str(author.id)
|
||||
|
||||
if user_id not in self.user_cache:
|
||||
avatar_url = None
|
||||
if author.avatar:
|
||||
try:
|
||||
av_name = f"{user_id}.png"
|
||||
av_target = avatar_dir / av_name
|
||||
if not av_target.exists():
|
||||
await author.avatar.save(av_target)
|
||||
avatar_url = f"{avatar_rel_base}/{av_name}"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save avatar for {author.name}: {e}")
|
||||
|
||||
roles = []
|
||||
if hasattr(author, "roles"):
|
||||
for r in author.roles:
|
||||
if r.is_default(): continue
|
||||
roles.append({
|
||||
"id": str(r.id),
|
||||
"name": r.name,
|
||||
"color": str(r.color),
|
||||
"position": r.position
|
||||
})
|
||||
|
||||
self.user_cache[user_id] = {
|
||||
"userID": user_id,
|
||||
"username": author.name,
|
||||
"userNickname": getattr(author, "display_name", author.name),
|
||||
"userColor": str(author.color) if hasattr(author, "color") else None,
|
||||
"userIsBot": author.bot,
|
||||
"userRoles": roles,
|
||||
"userAvatar": f"user_avatars/{user_id}.png" if author.avatar else None
|
||||
}
|
||||
|
||||
reactions = []
|
||||
for r in msg.reactions:
|
||||
|
|
@ -351,25 +472,44 @@ class DiscordExporter:
|
|||
"count": r.count
|
||||
})
|
||||
|
||||
return {
|
||||
"id": str(msg.id),
|
||||
"author": {
|
||||
"id": str(msg.author.id),
|
||||
"name": msg.author.name,
|
||||
"discriminator": msg.author.discriminator,
|
||||
"avatar": str(msg.author.avatar.url) if msg.author.avatar else None,
|
||||
"bot": msg.author.bot
|
||||
},
|
||||
"content": msg.content,
|
||||
# Determine message type (Override if it's a thread starter)
|
||||
msg_type = str(msg.type).split(".")[-1].capitalize()
|
||||
if msg.thread:
|
||||
msg_type = "ThreadStarter"
|
||||
|
||||
data = {
|
||||
"messageID": str(msg.id),
|
||||
"type": msg_type,
|
||||
"timestamp": msg.created_at.isoformat(),
|
||||
"edited_timestamp": msg.edited_at.isoformat() if msg.edited_at else None,
|
||||
"isPinned": msg.pinned,
|
||||
"content": msg.content,
|
||||
"userID": user_id,
|
||||
"attachments": attachments,
|
||||
"embeds": [e.to_dict() for e in msg.embeds],
|
||||
"reactions": reactions,
|
||||
"type": str(msg.type),
|
||||
"is_pinned": msg.pinned
|
||||
"embeds": [e.to_dict() for e in msg.embeds], # simplified
|
||||
"stickers": [],
|
||||
"reactions": reactions
|
||||
}
|
||||
|
||||
# Thread info for creation/starter messages
|
||||
if msg.thread:
|
||||
data["thread"] = {
|
||||
"id": str(msg.thread.id),
|
||||
"name": msg.thread.name,
|
||||
"messageCount": getattr(msg.thread, "message_count", 0),
|
||||
"archived": msg.thread.archived,
|
||||
"archiveDuration": msg.thread.auto_archive_duration,
|
||||
"locked": msg.thread.locked
|
||||
}
|
||||
|
||||
# Add reply reference if exists
|
||||
if msg.reference and msg.reference.message_id:
|
||||
data["reference"] = {
|
||||
"messageId": str(msg.reference.message_id),
|
||||
"channelId": str(msg.reference.channel_id)
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
async def export_threads(self, channel_id: int):
|
||||
"""Exports active and archived threads for a channel."""
|
||||
channel = await self.reader.get_channel(channel_id)
|
||||
|
|
@ -379,8 +519,9 @@ class DiscordExporter:
|
|||
all_threads = []
|
||||
try:
|
||||
# Active threads
|
||||
if hasattr(channel, "threads"):
|
||||
all_threads.extend(channel.threads)
|
||||
if self.reader.guild:
|
||||
active_threads_data = await self.reader.guild.fetch_active_threads()
|
||||
all_threads.extend([t for t in active_threads_data.threads if t.parent_id == channel_id])
|
||||
|
||||
# Archived threads (can be private or public)
|
||||
if hasattr(channel, "archived_threads"):
|
||||
|
|
|
|||
|
|
@ -177,12 +177,49 @@ class ExodusCLI:
|
|||
with console.status("[yellow]Exporting channel structure...[/yellow]"):
|
||||
await self.exporter.export_channels_structure()
|
||||
|
||||
# 2. Export messages
|
||||
channels = await self.engine.discord_reader.get_channels()
|
||||
console.print(f"\n[yellow]Found {len(channels)} channels to backup.[/yellow]")
|
||||
if not Confirm.ask("Start message backup? This may take a while.", default=True):
|
||||
# 2. Select Channels
|
||||
all_channels = await self.engine.discord_reader.get_channels()
|
||||
# Filter for exportable channels
|
||||
eligible_channels = [
|
||||
c for c in all_channels
|
||||
if c.type in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.voice]
|
||||
]
|
||||
|
||||
if not eligible_channels:
|
||||
console.print("[yellow]No text/news channels found to backup.[/yellow]")
|
||||
return
|
||||
|
||||
console.print(f"\n[bold]Select Channels to Backup ({len(eligible_channels)} total):[/bold]")
|
||||
for i, chan in enumerate(eligible_channels):
|
||||
console.print(f"({i+1}) {chan.name}")
|
||||
|
||||
console.print("(A) [bold green]All Channels[/bold green]")
|
||||
console.print("(B) Back")
|
||||
|
||||
choices = [str(i+1) for i in range(len(eligible_channels))] + ["A", "B", "a", "b"]
|
||||
selection_input = Prompt.ask("\nSelect option or indices (e.g. 1,2,5)", default="B")
|
||||
|
||||
if selection_input.upper() == "B":
|
||||
return
|
||||
|
||||
selected_channels = []
|
||||
if selection_input.upper() == "A":
|
||||
selected_channels = eligible_channels
|
||||
else:
|
||||
try:
|
||||
# Handle multiple indices like '1,2,5'
|
||||
indices = [int(i.strip()) - 1 for i in selection_input.split(",") if i.strip().isdigit()]
|
||||
selected_channels = [eligible_channels[i] for i in indices if 0 <= i < len(eligible_channels)]
|
||||
except Exception:
|
||||
console.print("[red]Invalid selection. Aborting.[/red]")
|
||||
return
|
||||
|
||||
if not selected_channels:
|
||||
console.print("[yellow]No valid channels selected.[/yellow]")
|
||||
return
|
||||
|
||||
console.print(f"\n[yellow]Starting backup for {len(selected_channels)} channels...[/yellow]")
|
||||
|
||||
with Progress(
|
||||
SpinnerColumn(),
|
||||
TextColumn("[progress.description]{task.description}"),
|
||||
|
|
@ -190,11 +227,8 @@ class ExodusCLI:
|
|||
TaskProgressColumn(),
|
||||
console=console
|
||||
) as progress:
|
||||
overall_task = progress.add_task("[cyan]Exporting Channels...", total=len(channels))
|
||||
for chan in channels:
|
||||
if chan.type not in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.voice]:
|
||||
progress.advance(overall_task)
|
||||
continue
|
||||
overall_task = progress.add_task("[cyan]Exporting Channels...", total=len(selected_channels))
|
||||
for chan in selected_channels:
|
||||
progress.update(overall_task, description=f"[cyan]Backing up: {chan.name}")
|
||||
await self.exporter.export_channel_messages(chan.id)
|
||||
await self.exporter.export_threads(chan.id)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue