implement basic discord backup

This commit is contained in:
rambros 2026-02-27 17:21:05 +05:30
parent dddaaf7de9
commit b9a89a4d2b
5 changed files with 626 additions and 0 deletions

3
.gitignore vendored
View file

@ -25,6 +25,7 @@ wheels/
# Virtual Environment
venv/
ENV/
reference/
# Configuration and Secrets
configs/
@ -39,3 +40,5 @@ config.yaml
test_*.py
test_release.zip
test_release/
EXPORT-*/

73
discord-exodus.py Normal file
View file

@ -0,0 +1,73 @@
import sys
import asyncio
import logging
from src.ui.exodus_app import run_exodus
from src.core.configuration import load_config
def setup_logging():
try:
config = load_config()
log_level_str = config.migration.log_level.upper()
level = getattr(logging, log_level_str, logging.INFO)
except Exception:
level = logging.INFO
handlers = [logging.FileHandler('exodus.log', mode='a')]
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=level,
handlers=handlers
)
def relaunch_in_terminal():
"""Detects if running without a terminal on Linux and relaunches in one."""
import os
import sys
import subprocess
import shutil
if sys.platform != "linux":
return
is_tty = sys.stdin.isatty() or sys.stdout.isatty()
if is_tty or os.environ.get("EXODUS_RELAUNCHED"):
return
terminals = [
("gnome-terminal", ["--"]),
("ptyxis", ["--"]),
("x-terminal-emulator", ["-e"]),
("kgx", ["-e"]),
("konsole", ["-e"]),
("xfce4-terminal", ["-e"]),
("xterm", ["-e"]),
]
executable = sys.executable if not getattr(sys, 'frozen', False) else os.path.abspath(sys.argv[0])
args = [executable] + sys.argv[1:]
env = os.environ.copy()
env["EXODUS_RELAUNCHED"] = "1"
for term, cmd_args in terminals:
if shutil.which(term):
try:
subprocess.Popen([term] + cmd_args + args, env=env)
sys.exit(0)
except Exception:
continue
async def main():
relaunch_in_terminal()
setup_logging()
try:
await run_exodus()
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

33
launch-exodus-LINUX.sh Executable file
View file

@ -0,0 +1,33 @@
#!/bin/bash
# Get the directory where the script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
# Check if venv exists, create it if not
if [ ! -d "$DIR/venv" ]; then
echo "Virtual environment not found. Creating one..."
python3 -m venv "$DIR/venv"
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment."
exit 1
fi
# Activate and install requirements
source "$DIR/venv/bin/activate"
if [ -f "$DIR/requirements.txt" ]; then
echo "Installing requirements..."
pip install --upgrade pip
pip install -r "$DIR/requirements.txt"
else
echo "Warning: requirements.txt not found. Skipping dependency installation."
fi
else
# Activate existing virtual environment
source "$DIR/venv/bin/activate"
fi
# Run the application
python3 discord-exodus.py "$@"

301
src/exodus/exporter.py Normal file
View file

@ -0,0 +1,301 @@
import os
import json
import logging
import asyncio
import discord
from pathlib import Path
from typing import Dict, Any, List, Optional, AsyncGenerator
logger = logging.getLogger(__name__)
class DiscordExporter:
"""Core logic for exporting Discord server data."""
def __init__(self, reader):
self.reader = reader
self.server_name = ""
self.server_id = ""
async def setup(self):
"""Prepares the output directory and fetches server metadata."""
metadata = await self.reader.get_server_metadata()
self.server_name = metadata.get("name", "Unknown Server")
self.server_id = metadata.get("id", "0")
# Create safe folder name
import re
safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', self.server_name)
self.export_path = Path(".") / f"EXPORT-{safe_name}-{self.server_id}"
self.export_path.mkdir(parents=True, exist_ok=True)
# Consolidate media into one folder
self.media_path = self.export_path / "server_media"
self.media_path.mkdir(exist_ok=True)
logger.info(f"Export directory set to: {self.export_path}")
return metadata
async def export_metadata(self):
"""Saves server metadata to a JSON file."""
metadata = await self.reader.get_server_metadata()
output_file = self.export_path / "server_metadata.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=4, ensure_ascii=False)
return metadata
async def export_roles(self):
"""Exports all roles to roles.json."""
roles = await self.reader.get_roles()
role_data = []
for r in roles:
role_data.append({
"id": str(r.id),
"name": r.name,
"color": str(r.color),
"position": r.position,
"permissions": r.permissions.value,
"hoist": r.hoist,
"mentionable": r.mentionable
})
output_file = self.export_path / "roles.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(role_data, f, indent=4, ensure_ascii=False)
return role_data
async def export_emojis_stickers(self):
"""Exports all emojis and stickers, plus server icon/banner, to server_media folder."""
emojis = await self.reader.get_emojis()
stickers = await self.reader.get_stickers()
metadata = await self.reader.get_server_metadata()
# Download Server Icon
if metadata.get("icon_url"):
try:
# We need a way to download from URL directly or use reader
# Since DiscordReader doesn't have a direct "download from URL" we might need one
# but for guild icon/banner we can use guild object if available in reader.
if self.reader.guild and self.reader.guild.icon:
data = await self.reader.download_asset(self.reader.guild.icon)
ext = "gif" if self.reader.guild.icon.is_animated() else "png"
with open(self.media_path / f"server_icon.{ext}", "wb") as f:
f.write(data)
except Exception as e:
logger.error(f"Failed to download server icon: {e}")
# Download Server Banner
if metadata.get("banner_url"):
try:
if self.reader.guild and self.reader.guild.banner:
data = await self.reader.download_asset(self.reader.guild.banner)
ext = "gif" if self.reader.guild.banner.is_animated() else "png"
with open(self.media_path / f"server_banner.{ext}", "wb") as f:
f.write(data)
except Exception as e:
logger.error(f"Failed to download server banner: {e}")
emoji_data = []
for e in emojis:
ext = "gif" if e.animated else "png"
filename = f"emoji_{e.name}_{e.id}.{ext}"
emoji_path = self.media_path / filename
try:
data = await self.reader.download_emoji(e)
with open(emoji_path, "wb") as f:
f.write(data)
emoji_data.append({
"id": str(e.id),
"name": e.name,
"animated": e.animated,
"filename": filename
})
except Exception as ex:
logger.error(f"Failed to download emoji {e.name}: {ex}")
sticker_data = []
for s in stickers:
ext = "png"
if s.url:
if ".json" in str(s.url): ext = "json"
elif ".gif" in str(s.url): ext = "gif"
elif ".webp" in str(s.url): ext = "webp"
filename = f"sticker_{s.name}_{s.id}.{ext}"
sticker_path = self.media_path / filename
try:
data = await self.reader.download_sticker(s)
with open(sticker_path, "wb") as f:
f.write(data)
sticker_data.append({
"id": str(s.id),
"name": s.name,
"filename": filename
})
except Exception as ex:
logger.error(f"Failed to download sticker {s.name}: {ex}")
with open(self.export_path / "emojis.json", "w", encoding="utf-8") as f:
json.dump(emoji_data, f, indent=4, ensure_ascii=False)
with open(self.export_path / "stickers.json", "w", encoding="utf-8") as f:
json.dump(sticker_data, f, indent=4, ensure_ascii=False)
return len(emoji_data), len(sticker_data)
async def export_channels_structure(self):
"""Exports categories and channels hierarchy."""
categories = await self.reader.get_categories()
channels = await self.reader.get_channels()
structure = []
for cat in categories:
cat_channels = [c for c in channels if c.category_id == cat.id]
structure.append({
"type": "category",
"id": str(cat.id),
"name": cat.name,
"position": cat.position,
"channels": [self._format_channel(c) for c in cat_channels]
})
# Uncategorized
uncategorized = [c for c in channels if not c.category_id]
if uncategorized:
structure.append({
"type": "category",
"id": "uncategorized",
"name": "Uncategorized",
"channels": [self._format_channel(c) for c in uncategorized]
})
output_file = self.export_path / "channels_structure.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(structure, f, indent=4, ensure_ascii=False)
return structure
def _format_channel(self, c):
return {
"id": str(c.id),
"name": c.name,
"type": str(c.type),
"position": c.position,
"topic": getattr(c, "topic", None),
"nsfw": getattr(c, "nsfw", False)
}
async def export_channel_messages(self, channel_id: int, progress_callback=None):
"""Exports all messages from a channel, including attachments, pins, reactions."""
channel = await self.reader.get_channel(channel_id)
channel_name = channel.name
# Create channel-specific folder
chan_dir = self.export_path / "channels" / f"{channel_name}_{channel_id}"
attach_dir = chan_dir / "attachments"
chan_dir.mkdir(parents=True, exist_ok=True)
attach_dir.mkdir(exist_ok=True)
messages = []
count = 0
async for msg in self.reader.fetch_message_history(channel_id):
msg_data = await self._format_message(msg, attach_dir)
messages.append(msg_data)
count += 1
if progress_callback:
await progress_callback(channel_name, count)
# Export pins
pins = []
if hasattr(channel, "pins"):
try:
pins_objects = await channel.pins()
pins = [str(p.id) for p in pins_objects]
except Exception as e:
logger.error(f"Failed to fetch pins for {channel_name}: {e}")
output_data = {
"channel_info": self._format_channel(channel),
"messages": messages,
"pins": pins
}
with open(chan_dir / "messages.json", "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=4, ensure_ascii=False)
# Also check for threads
if hasattr(channel, "threads"):
# We fetch threads separately if needed, or they might be returned by get_channels if we are not careful
pass
return count
async def _format_message(self, msg, attach_dir):
attachments = []
for a in msg.attachments:
# Using ID to avoid collisions
filename = f"{a.id}_{a.filename}"
try:
# Optimized download can be added later (e.g. queue)
# data = await self.reader.download_attachment(a)
# with open(attach_dir / filename, "wb") as f:
# f.write(data)
attachments.append({
"id": str(a.id),
"filename": a.filename,
"url": a.url,
"local_path": f"attachments/{filename}"
})
except Exception as e:
logger.error(f"Failed to download attachment {a.filename} from message {msg.id}: {e}")
reactions = []
for r in msg.reactions:
emoji_str = str(r.emoji) if not r.is_custom_emoji() else f"{r.emoji.name}:{r.emoji.id}"
reactions.append({
"emoji": emoji_str,
"count": r.count
})
return {
"id": str(msg.id),
"author": {
"id": str(msg.author.id),
"name": msg.author.name,
"discriminator": msg.author.discriminator,
"avatar": str(msg.author.avatar.url) if msg.author.avatar else None,
"bot": msg.author.bot
},
"content": msg.content,
"timestamp": msg.created_at.isoformat(),
"edited_timestamp": msg.edited_at.isoformat() if msg.edited_at else None,
"attachments": attachments,
"embeds": [e.to_dict() for e in msg.embeds],
"reactions": reactions,
"type": str(msg.type),
"is_pinned": msg.pinned
}
async def export_threads(self, channel_id: int):
"""Exports active and archived threads for a channel."""
channel = await self.reader.get_channel(channel_id)
if not hasattr(channel, "threads") and not hasattr(channel, "archived_threads"):
return 0
all_threads = []
try:
# Active threads
if hasattr(channel, "threads"):
all_threads.extend(channel.threads)
# Archived threads (can be private or public)
if hasattr(channel, "archived_threads"):
async for thread in channel.archived_threads(limit=None):
all_threads.append(thread)
except Exception as e:
logger.error(f"Failed to fetch threads for {channel.name}: {e}")
thread_count = 0
for thread in all_threads:
await self.export_channel_messages(thread.id)
thread_count += 1
return thread_count

216
src/ui/exodus_app.py Normal file
View file

@ -0,0 +1,216 @@
import sys
import asyncio
import logging
import time
from rich.console import Console
from rich.prompt import Prompt, Confirm
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from src.core.configuration import load_config, save_config
from src.core.base import MigrationContext
from src.exodus.exporter import DiscordExporter
console = Console()
class ExodusCLI:
"""CLI app to manage Discord server data export."""
def __init__(self, config_path="config.yaml"):
self.config_path = config_path
try:
self.config = load_config(self.config_path)
# We only need the Discord side of MigrationContext
self.engine = MigrationContext(self.config, target_platform="fluxer")
self.exporter = DiscordExporter(self.engine.discord_reader)
except Exception as e:
console.print(f"[bold red]Failed to load config: {e}[/bold red]")
sys.exit(1)
self.tokens_valid = False
async def validate_config(self):
self.validation_results = {
"discord_token": False, "discord_bot_name": None,
"discord_server": False, "discord_server_name": None,
"discord_timeout": False
}
d_token = self.config.discord_bot_token
fillers = ["DISCORD_BOT_TOKEN", "000000000000000000", "DISCORD_SERVER_ID", "", None]
discord_dummy = d_token in fillers or self.config.discord_server_id in fillers
if discord_dummy:
console.print("[bold yellow]Discord setup incomplete in config.yaml[/bold yellow]")
return False
try:
with console.status("[yellow]Validating Discord token...[/yellow]"):
res = await self.engine.discord_reader.validate()
self.validation_results["discord_token"] = res.get("token", False)
self.validation_results["discord_bot_name"] = res.get("bot_name")
self.validation_results["discord_server"] = res.get("server", False)
self.validation_results["discord_server_name"] = res.get("server_name")
if not res.get("token"):
console.print("[bold red]Discord Token validation failed.[/bold red]")
elif not res.get("server"):
console.print("[bold red]Discord Server ID validation failed.[/bold red]")
else:
self.tokens_valid = True
return True
except Exception as e:
console.print(f"[bold red]Discord validation failed: {e}[/bold red]")
return False
async def run(self):
await self.validate_config()
while True:
console.print("")
console.print(Panel.fit("Discord Exodus - Server Exporter", style="bold green"))
d_name = self.validation_results.get("discord_server_name")
d_display = f"[bold green]\"{d_name}\"[/bold green]" if d_name else "[bold red]NOT CONNECTED[/bold red]"
console.print(f"[bold cyan]Source Server:[/bold cyan] {d_display}")
console.print("\n[bold]Main Menu[/bold]")
console.print("(1) Setup Export Folder & Metadata")
console.print("(2) Export Roles")
console.print("(3) Export Emojis & Stickers")
console.print("(4) Export Channels Structure")
console.print("(5) Export All Message History (Long operation)")
console.print("(C) Configuration")
console.print("(Q) Exit")
choice = Prompt.ask("\nSelect an option", choices=["1", "2", "3", "4", "5", "C", "Q"], default="Q", show_choices=False).upper()
if choice == "1":
await self.setup_export()
elif choice == "2":
await self.export_roles()
elif choice == "3":
await self.export_assets()
elif choice == "4":
await self.export_structure()
elif choice == "5":
await self.export_messages()
elif choice == "C":
await self.edit_configuration()
elif choice == "Q":
await self.engine.close_connections()
break
async def setup_export(self):
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
with console.status("[yellow]Setting up export directory...[/yellow]"):
meta = await self.exporter.setup()
await self.exporter.export_metadata()
console.print(f"[bold green]Export directory ready: {self.exporter.export_path}[/bold green]")
console.print(f"Server: [bold]{meta['name']}[/bold] ({meta['id']})")
except Exception as e:
console.print(f"[bold red]Setup failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def export_roles(self):
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
await self.exporter.setup()
with console.status("[yellow]Exporting roles...[/yellow]"):
roles = await self.exporter.export_roles()
console.print(f"[bold green]Exported {len(roles)} roles to roles.json[/bold green]")
except Exception as e:
console.print(f"[bold red]Role export failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def export_assets(self):
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
await self.exporter.setup()
with console.status("[yellow]Exporting emojis and stickers...[/yellow]"):
e_count, s_count = await self.exporter.export_emojis_stickers()
console.print(f"[bold green]Exported {e_count} emojis and {s_count} stickers.[/bold green]")
except Exception as e:
console.print(f"[bold red]Asset export failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def export_structure(self):
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
await self.exporter.setup()
with console.status("[yellow]Exporting channel structure...[/yellow]"):
struct = await self.exporter.export_channels_structure()
console.print(f"[bold green]Exported channel hierarchy to channels_structure.json[/bold green]")
except Exception as e:
console.print(f"[bold red]Structure export failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def export_messages(self):
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
await self.exporter.setup()
channels = await self.engine.discord_reader.get_channels()
console.print(f"\n[yellow]Found {len(channels)} channels to export.[/yellow]")
if not Confirm.ask("Start message export? This may take a while.", default=True):
return
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console
) as progress:
overall_task = progress.add_task("[cyan]Exporting Channels...", total=len(channels))
for chan in channels:
if chan.type not in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.voice]:
progress.advance(overall_task)
continue
progress.update(overall_task, description=f"[cyan]Exporting: {chan.name}")
await self.exporter.export_channel_messages(chan.id)
# Also export threads for this channel
await self.exporter.export_threads(chan.id)
progress.advance(overall_task)
console.print("[bold green]Message export complete![/bold green]")
except Exception as e:
console.print(f"[bold red]Message export failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def edit_configuration(self):
# reuse or implement simplified version of edit_configuration from app.py
console.print("\n[bold]Configuration Editor[/bold]")
d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token)
d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id)
if d_token != self.config.discord_bot_token or d_server != self.config.discord_server_id:
self.config.discord_bot_token = d_token
self.config.discord_server_id = d_server
save_config(self.config, self.config_path)
self.engine = MigrationContext(self.config, target_platform="fluxer")
self.exporter = DiscordExporter(self.engine.discord_reader)
await self.validate_config()
console.print("[bold green]Config updated![/bold green]")
else:
console.print("[yellow]No changes.[/yellow]")
async def run_exodus(config_path="config.yaml"):
app = ExodusCLI(config_path)
await app.run()