refactor codebase for TUI

This commit is contained in:
rambros 2026-03-03 01:01:31 +05:30
parent f821638301
commit add35ccd1f
18 changed files with 1864 additions and 1066 deletions

View file

@ -1,9 +1,6 @@
import sys
import argparse
import asyncio
import logging
from src.ui.reaper_app import run_disco_reaper
from src.ui.reaper_tui import run_disco_reaper_tui
from src.ui.main_app import run_disco_reaper_tui
from src.core.configuration import load_config
def setup_logging():
@ -60,23 +57,14 @@ def relaunch_in_terminal():
except Exception:
continue
async def main():
def main():
relaunch_in_terminal()
setup_logging()
parser = argparse.ArgumentParser(description="Disco Reaper")
parser.add_argument("--cli", action="store_true", help="Run the legacy CLI interface")
parser.add_argument("--tui", action="store_true", help="Run the Textual TUI interface (default)")
args, _ = parser.parse_known_args()
if args.cli:
await run_disco_reaper()
else:
await run_disco_reaper_tui()
run_disco_reaper_tui()
if __name__ == "__main__":
try:
asyncio.run(main())
main()
except KeyboardInterrupt:
print("\nOperation terminated by user.")
sys.exit(0)

View file

@ -1,43 +0,0 @@
#!/bin/bash
# Get the directory where the script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
# Check if venv exists, create it if not
if [ ! -d "$DIR/venv" ]; then
echo "Virtual environment not found. Creating one..."
python3 -m venv "$DIR/venv"
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment."
exit 1
fi
# Activate and install requirements
source "$DIR/venv/bin/activate"
if [ -f "$DIR/requirements.txt" ]; then
echo "Installing requirements..."
pip install --upgrade pip
pip install -r "$DIR/requirements.txt"
else
echo "Warning: requirements.txt not found. Skipping dependency installation."
fi
else
# Activate existing virtual environment
source "$DIR/venv/bin/activate"
fi
# Run the application
if [[ "$1" == "--cli" ]]; then
echo "Starting Disco Reaper in CLI mode..."
python3 disco-reaper.py --cli
elif [[ "$1" == "--tui" ]]; then
echo "Starting Disco Reaper in TUI mode..."
python3 disco-reaper.py --tui
else
# Default to TUI if no arguments or other arguments
echo "Starting Disco Reaper (Default: TUI)..."
python3 disco-reaper.py --tui "$@"
fi

View file

@ -1,34 +0,0 @@
#!/bin/bash
# Get the directory where the script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
# Check if venv exists, create it if not
if [ ! -d "$DIR/venv" ]; then
echo "Virtual environment not found. Creating one..."
python3 -m venv "$DIR/venv"
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment."
exit 1
fi
# Activate and install requirements
source "$DIR/venv/bin/activate"
if [ -f "$DIR/requirements.txt" ]; then
echo "Installing requirements..."
pip install --upgrade pip
pip install -r "$DIR/requirements.txt"
else
echo "Warning: requirements.txt not found. Skipping dependency installation."
fi
else
# Activate existing virtual environment
source "$DIR/venv/bin/activate"
fi
# Run the application
python3 server-shuttle.py "$@"

View file

@ -1,34 +0,0 @@
#!/bin/bash
# Get the directory where the script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
# Check if venv exists, create it if not
if [ ! -d "$DIR/venv" ]; then
echo "Virtual environment not found. Creating one..."
# On macOS, python3 is the standard command
python3 -m venv "$DIR/venv"
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment."
exit 1
fi
# Activate and install requirements
source "$DIR/venv/bin/activate"
if [ -f "$DIR/requirements.txt" ]; then
echo "Installing requirements..."
pip install --upgrade pip
pip install -r "$DIR/requirements.txt"
else
echo "Warning: requirements.txt not found. Skipping dependency installation."
fi
else
# Activate existing virtual environment
source "$DIR/venv/bin/activate"
fi
# Run the application
python3 server-shuttle.py "$@"

View file

@ -1,38 +0,0 @@
@echo off
SET "DIR=%~dp0"
cd /d "%DIR%"
:: Check if venv exists, create it if not
if not exist "%DIR%venv" (
echo Virtual environment not found. Creating one...
python -m venv venv
if %ERRORLEVEL% neq 0 (
echo Error: Failed to create virtual environment.
pause
exit /b 1
)
:: Activate and install requirements
call venv\Scripts\activate
if exist "requirements.txt" (
echo Installing requirements...
python -m pip install --upgrade pip
pip install -r requirements.txt
) else (
echo Warning: requirements.txt not found. Skipping dependency installation.
)
) else (
:: Activate existing virtual environment
call venv\Scripts\activate
)
:: Run the application
python server-shuttle.py %*
if %ERRORLEVEL% neq 0 (
echo.
echo application exited with error code %ERRORLEVEL%
pause
)

View file

@ -1,34 +0,0 @@
#!/bin/bash
# Get the directory where the script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
# Check if venv exists, create it if not
if [ ! -d "$DIR/venv" ]; then
echo "Virtual environment not found. Creating one..."
python3 -m venv "$DIR/venv"
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment."
exit 1
fi
# Activate and install requirements
source "$DIR/venv/bin/activate"
if [ -f "$DIR/requirements.txt" ]; then
echo "Installing requirements..."
pip install --upgrade pip
pip install -r "$DIR/requirements.txt"
else
echo "Warning: requirements.txt not found. Skipping dependency installation."
fi
else
# Activate existing virtual environment
source "$DIR/venv/bin/activate"
fi
# Run the application
echo "Starting Unified TUI..."
python3 unifier.py "$@"

26
launch.sh Executable file
View file

@ -0,0 +1,26 @@
#!/bin/bash
# Disco Reaper — unified launcher
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
# ── Virtualenv setup ─────────────────────────────────────────────────────────
if [ ! -d "$DIR/venv" ]; then
echo "Virtual environment not found. Creating..."
python3 -m venv "$DIR/venv"
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment."
exit 1
fi
source "$DIR/venv/bin/activate"
if [ -f "$DIR/requirements.txt" ]; then
echo "Installing requirements..."
pip install --upgrade pip -q
pip install -r "$DIR/requirements.txt" -q
fi
else
source "$DIR/venv/bin/activate"
fi
# ── Launch ───────────────────────────────────────────────────────────────────
python3 "$DIR/disco-reaper.py" "$@"

View file

@ -1,162 +0,0 @@
import sys
import asyncio
import logging
from pathlib import Path
from src.ui.shuttle_app import run_cli
from src.core.configuration import load_config
def setup_logging():
try:
config = load_config()
log_level_str = config.migration.log_level.upper()
level = getattr(logging, log_level_str, logging.INFO)
except Exception:
level = logging.INFO
handlers = [logging.FileHandler('.shuttle.log', mode='a')]
if level == logging.DEBUG:
handlers.append(logging.StreamHandler(sys.stdout))
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=level,
handlers=handlers
)
def select_platform(config):
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
console = Console()
fillers = [ "DISCORD_BOT_TOKEN", "FLUXER_BOT_TOKEN", "STOAT_BOT_TOKEN",
"000000000000000000",
"DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID", "STOAT_SERVER_ID",
"", None
]
fluxer_set = config.fluxer_bot_token not in fillers and config.fluxer_community_id not in fillers
stoat_set = config.stoat_bot_token not in fillers and config.stoat_server_id not in fillers
if fluxer_set and not stoat_set:
return "fluxer"
elif stoat_set and not fluxer_set:
return "stoat"
elif fluxer_set and stoat_set:
console.print(Panel.fit(
"[bold]Both Fluxer and Stoat configurations found.[/bold]\n"
"Which one do you want to use?",
title="[bold cyan]Platform Selection[/bold cyan]"
))
console.print("(1) [bold blue]Fluxer[/bold blue]")
console.print("(2) [bold red]Stoat[/bold red]")
console.print("(Q) [bold dim]Quit[/bold dim]")
console.print("")
choice = Prompt.ask("Select an option [[bold cyan]1/2/Q[/bold cyan]]", choices=["1", "2", "Q", "q"], show_choices=False).upper()
if choice == "1":
return "fluxer"
elif choice == "2":
return "stoat"
else:
sys.exit(0)
else:
# Both are fillers
console.print(Panel.fit(
"[bold]First setup, Tool configuration[/bold]\n"
"Which platform do you want to migrate to?",
title="[bold cyan]Initial Setup[/bold cyan]"
))
console.print("(1) [bold blue]Fluxer[/bold blue]")
console.print("(2) [bold red]Stoat[/bold red]")
console.print("(Q) [bold dim]Quit[/bold dim]")
console.print("")
choice = Prompt.ask("Select an option [[bold cyan]1/2/Q[/bold cyan]]", choices=["1", "2", "Q", "q"], show_choices=False).upper()
if choice == "1":
return "fluxer"
elif choice == "2":
return "stoat"
else:
sys.exit(0)
def relaunch_in_terminal():
"""Detects if running without a terminal on Linux and relaunches in one."""
import os
import sys
import subprocess
import shutil
# Only attempt on Linux
if sys.platform != "linux":
return
# Check if we have a TTY on stdin or stdout, or if already relaunched
is_tty = sys.stdin.isatty() or sys.stdout.isatty()
if is_tty or os.environ.get("SERVER_SHUTTLE_RELAUNCHED"):
return
# Diagnostic logging to help debug why it fails on some distros
debug_log = "/tmp/shuttle_terminal_debug.log"
with open(debug_log, "a") as f:
f.write(f"Relaunching... isatty={is_tty}, env={os.environ.get('SERVER_SHUTTLE_RELAUNCHED')}\n")
# List of terminals to try with their specific execution flags
terminals = [
("gnome-terminal", ["--"]), # Modern GNOME Terminal
("ptyxis", ["--"]), # Fedora/Modern GNOME
("x-terminal-emulator", ["-e"]), # Ubuntu/Debian standard
("kgx", ["-e"]), # GNOME Console
("konsole", ["-e"]),
("xfce4-terminal", ["-e"]),
("lxterminal", ["-e"]),
("mate-terminal", ["-e"]),
("alacritty", ["-e"]),
("kitty", []),
("xterm", ["-e"]),
]
# Resolve the absolute path to ourselves.
# frozen=True means we are running from a PyInstaller bundle.
if getattr(sys, 'frozen', False):
executable = os.path.abspath(sys.argv[0])
else:
executable = sys.executable
args = [executable] + sys.argv[1:]
# Set env var to prevent loops
env = os.environ.copy()
env["SERVER_SHUTTLE_RELAUNCHED"] = "1"
for term, cmd_args in terminals:
if shutil.which(term):
with open(debug_log, "a") as f:
f.write(f"Found terminal: {term}\n")
try:
# Construct command: term [args] executable [sys.argv]
subprocess.Popen([term] + cmd_args + args, env=env)
sys.exit(0)
except Exception as e:
with open(debug_log, "a") as f:
f.write(f"Failed to launch {term}: {e}\n")
continue
def main():
relaunch_in_terminal()
config = load_config()
setup_logging()
try:
platform = select_platform(config)
asyncio.run(run_cli(target_platform=platform))
except KeyboardInterrupt:
print("\nOperation terminated by user.")
sys.exit(0)
except Exception as e:
print(f"Failed to start tool: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -10,35 +10,27 @@ from src.stoat.writer import StoatWriter
logger = logging.getLogger(__name__)
class MigrationContext:
"""Holds state and connections for reading from Discord and writing to Fluxer."""
"""Holds state and connections for reading from Discord and writing to the target platform."""
def __init__(self, config: AppConfig, target_platform: str = "fluxer"):
def __init__(self, config: AppConfig, target_platform: str | None = None):
self.config = config
self.target_platform = target_platform
# If caller didn't specify, fall back to config value
self.target_platform = target_platform or config.target_platform or "fluxer"
# Use the target server/community ID for state file naming so each
# target server gets its own independent migration history.
if target_platform == "stoat":
server_id = config.stoat_server_id
else:
server_id = config.fluxer_community_id
# Fallback for unconfigured platforms
if not server_id or server_id in [
"000000000000000000", "DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID", "STOAT_SERVER_ID"
]:
server_id = config.target_server_id or "unconfigured"
fillers = {"000000000000000000", "DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID",
"STOAT_SERVER_ID", "TARGET_SERVER_ID", ""}
if server_id in fillers:
server_id = "unconfigured"
# Try to find an existing folder for this server_id
# Try to find an existing state folder for this server_id
import os
from pathlib import Path
state_file: str | Path = ""
messages_file: str | Path = ""
if server_id and server_id not in ["000000000000000000", "DISCORD_SERVER_ID", "FLUXER_COMMUNITY_ID", "STOAT_SERVER_ID", "unconfigured", ""]:
# If a folder doesn't exist yet, we stick with the generic server_id names,
# but they won't be saved until set_folder is called.
if server_id != "unconfigured":
for d in Path(".").iterdir():
if d.is_dir() and d.name.endswith(f"-{server_id}"):
state_file = d / "state-migration.json"
@ -55,20 +47,19 @@ class MigrationContext:
server_id=config.discord_server_id
)
self.fluxer_writer = FluxerWriter(
token=config.fluxer_bot_token or "",
community_id=config.fluxer_community_id or "",
api_url=config.fluxer_api_url or "default"
)
self.stoat_writer = StoatWriter(
token=config.stoat_bot_token or "",
community_id=config.stoat_server_id or "",
api_url=config.stoat_api_url or "default"
)
self.writer = self.fluxer_writer if target_platform == "fluxer" else self.stoat_writer
# Build the writer for the active target platform only
token = config.target_bot_token or ""
community_id = config.target_server_id or ""
api_url = config.target_api_url or "default"
if self.target_platform == "stoat":
self.writer = StoatWriter(token=token, community_id=community_id, api_url=api_url)
self.stoat_writer = self.writer
self.fluxer_writer = FluxerWriter(token="", community_id="", api_url="default")
else:
self.writer = FluxerWriter(token=token, community_id=community_id, api_url=api_url)
self.fluxer_writer = self.writer
self.stoat_writer = StoatWriter(token="", community_id="", api_url="default")
self.is_running = False
@ -76,7 +67,7 @@ class MigrationContext:
"""Returns connection validation status as a dictionary."""
try:
d_valid = await self.discord_reader.validate()
f_valid = await self.fluxer_writer.validate()
t_valid = await self.writer.validate()
return {
"discord_token": d_valid.get("token", False),
"discord_bot_name": d_valid.get("bot_name"),
@ -84,19 +75,19 @@ class MigrationContext:
"discord_server_name": d_valid.get("server_name"),
"discord_intents": d_valid.get("intents", {}),
"discord_permissions": d_valid.get("permissions", {}),
"fluxer_token": f_valid.get("token", False),
"fluxer_bot_name": f_valid.get("bot_name"),
"fluxer_community": f_valid.get("community", False),
"fluxer_community_name": f_valid.get("community_name"),
"fluxer_permissions": f_valid.get("permissions", {})
"target_token": t_valid.get("token", False),
"target_bot_name": t_valid.get("bot_name"),
"target_community": t_valid.get("community", False),
"target_community_name": t_valid.get("community_name"),
"target_permissions": t_valid.get("permissions", {})
}
except Exception as e:
logger.error(f"Validation failed with exception: {e}")
return {
"discord_token": False,
"discord_server": False,
"fluxer_token": False,
"fluxer_community": False
"target_token": False,
"target_community": False
}
async def start_connections(self):

View file

@ -11,29 +11,55 @@ class MigrationSettings(BaseModel):
class AppConfig(BaseModel):
discord_bot_token: str
discord_server_id: str
fluxer_bot_token: Optional[str] = Field(default=None)
fluxer_community_id: Optional[str] = Field(default=None)
fluxer_api_url: Optional[str] = Field(default=None)
stoat_bot_token: Optional[str] = Field(default=None)
stoat_server_id: Optional[str] = Field(default=None)
stoat_api_url: Optional[str] = Field(default=None)
use_stoat: bool = Field(default=False)
use_fluxer: bool = Field(default=False)
tool_mode: str = Field(default="backup_only") # direct_transfer | backup_transfer | backup_only
target_platform: str = Field(default="none") # fluxer | stoat | none
target_bot_token: Optional[str] = Field(default=None)
target_server_id: Optional[str] = Field(default=None)
target_api_url: Optional[str] = Field(default="default")
migration: MigrationSettings = Field(default_factory=MigrationSettings)
# ── backwardcompat shims (readonly) ────────────────────────────────
# The rest of the codebase (fluxer/stoat modules) still reads these.
# They all delegate to the unified target_* fields.
@property
def use_fluxer(self) -> bool:
return self.target_platform == "fluxer"
@property
def use_stoat(self) -> bool:
return self.target_platform == "stoat"
@property
def fluxer_bot_token(self) -> Optional[str]:
return self.target_bot_token if self.target_platform == "fluxer" else None
@property
def fluxer_community_id(self) -> Optional[str]:
return self.target_server_id if self.target_platform == "fluxer" else None
@property
def fluxer_api_url(self) -> Optional[str]:
return self.target_api_url if self.target_platform == "fluxer" else None
@property
def stoat_bot_token(self) -> Optional[str]:
return self.target_bot_token if self.target_platform == "stoat" else None
@property
def stoat_server_id(self) -> Optional[str]:
return self.target_server_id if self.target_platform == "stoat" else None
@property
def stoat_api_url(self) -> Optional[str]:
return self.target_api_url if self.target_platform == "stoat" else None
def load_config(config_path: Union[str, Path] = "config.yaml") -> AppConfig:
path = Path(config_path)
if not path.exists():
# Create dummy config if missing
config = AppConfig(
discord_bot_token="DISCORD_BOT_TOKEN",
discord_server_id="DISCORD_SERVER_ID",
fluxer_bot_token="FLUXER_BOT_TOKEN",
fluxer_community_id="FLUXER_COMMUNITY_ID",
fluxer_api_url="default",
stoat_bot_token="STOAT_BOT_TOKEN",
stoat_server_id="STOAT_SERVER_ID",
stoat_api_url="default"
)
save_config(config, path)
print(f"Created default configuration: {config_path}")
@ -45,11 +71,46 @@ def load_config(config_path: Union[str, Path] = "config.yaml") -> AppConfig:
if not data:
raise ValueError("Configuration file is empty or invalid YAML.")
# ── migrate legacy configs that still have separate fluxer/stoat fields ──
if "fluxer_bot_token" in data or "stoat_bot_token" in data:
if data.get("fluxer_bot_token") and data["fluxer_bot_token"] not in ("FLUXER_BOT_TOKEN", None):
data.setdefault("target_platform", "fluxer")
data.setdefault("target_bot_token", data["fluxer_bot_token"])
data.setdefault("target_server_id", data.get("fluxer_community_id"))
data.setdefault("target_api_url", data.get("fluxer_api_url", "default"))
elif data.get("stoat_bot_token") and data["stoat_bot_token"] not in ("STOAT_BOT_TOKEN", None):
data.setdefault("target_platform", "stoat")
data.setdefault("target_bot_token", data["stoat_bot_token"])
data.setdefault("target_server_id", data.get("stoat_server_id"))
data.setdefault("target_api_url", data.get("stoat_api_url", "default"))
# Remove legacy keys so they don't conflict with the model
for key in ("fluxer_bot_token", "fluxer_community_id", "fluxer_api_url",
"stoat_bot_token", "stoat_server_id", "stoat_api_url",
"use_fluxer", "use_stoat"):
data.pop(key, None)
return AppConfig(**data)
def save_config(config: AppConfig, config_path: Union[str, Path] = "config.yaml"):
path = Path(config_path)
# Dump model to dictionary, excluding None values to keep the YAML clean
data = config.model_dump(exclude_none=True)
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f, default_flow_style=False, sort_keys=False)
def get_available_configs() -> list[str]:
"""Returns a list of available configuration names from `Reaper-*` folders."""
configs = []
for item in Path(".").iterdir():
if item.is_dir() and item.name.startswith("Reaper-"):
config_name = item.name[len("Reaper-"):]
if (item / "config.yaml").exists():
configs.append(config_name)
return sorted(configs)
def create_new_config(name: str) -> Path:
"""Creates a new configuration folder and default config file."""
folder_path = Path(f"Reaper-{name}")
folder_path.mkdir(exist_ok=True)
config_path = folder_path / "config.yaml"
load_config(config_path) # creates default
return folder_path

View file

@ -222,8 +222,9 @@ class MigrationState:
self.last_message_timestamps.clear()
self.save_messages()
def set_folder(self, server_id: str, clean_name: str):
new_folder = Path(f"{clean_name}-{server_id}")
def set_folder(self, server_id: str, clean_name: str, base_dir: Path | str = ""):
base = Path(base_dir) if base_dir else Path(".")
new_folder = base / f"{clean_name}-{server_id}"
# If we have an existing folder that is different, rename it
if self.state_file and self.state_file.parent.exists() and self.state_file.parent != new_folder:
@ -234,7 +235,7 @@ class MigrationState:
except Exception as e:
logger.debug(f"Could not rename {self.state_file.parent} to {new_folder}: {e}")
new_folder.mkdir(exist_ok=True)
new_folder.mkdir(parents=True, exist_ok=True)
self.state_file = new_folder / "state-migration.json"
self.messages_file = new_folder / "message-tracker.json"

View file

@ -11,11 +11,12 @@ logger = logging.getLogger(__name__)
class DiscordExporter:
"""Core logic for exporting Discord server data."""
def __init__(self, reader):
def __init__(self, reader, base_dir: Path | str = ""):
self.reader = reader
self.server_name = ""
self.server_id = ""
self.user_cache = {}
self.base_dir = Path(base_dir) if base_dir else Path(".")
async def setup(self):
"""Prepares the output directory and fetches server metadata."""
@ -26,7 +27,7 @@ class DiscordExporter:
# Create safe folder name
import re
safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', self.server_name)
self.export_path = Path(".") / f"EXPORT-{safe_name}-{self.server_id}"
self.export_path = self.base_dir / f"DISCORD-{self.server_id}"
self.export_path.mkdir(parents=True, exist_ok=True)
# Consolidate media into one folder

View file

@ -146,38 +146,141 @@ class ProgressModal(ModalScreen[None]):
btn.variant = "success"
self.query_one("#progress_bar", ProgressBar).update(total=100, progress=100)
class MainScreen(Screen):
class BackupScreen(Screen):
"""Main screen of the application."""
CSS = """
#backup_scroll {
align: center middle;
}
#main_container {
width: 80%;
height: auto;
min-height: 20;
border: solid green;
padding: 1 2;
margin: 2 0;
}
#info_container {
height: auto;
margin-bottom: 2;
border: tall cyan;
padding: 1;
}
#actions_container {
height: auto;
layout: vertical;
align: center top;
margin-top: 1;
}
Button {
width: 100%;
margin-bottom: 1;
}
#config_dialog {
width: 60%;
height: 60%;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
#config_title { text-style: bold; margin-bottom: 1; }
#config_buttons { height: auto; margin-top: 1; }
#config_buttons Button { width: 1fr; margin: 0 1; }
#channel_dialog {
width: 80%;
height: 80%;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
#chan_title { text-style: bold; margin-bottom: 1; }
.category_header {
margin-top: 1;
background: $primary 10%;
text-style: bold;
padding-left: 1;
}
#channel_list_scroll {
height: 1fr;
border: solid $primary;
margin-bottom: 1;
padding: 0 1;
}
#select_all_buttons { height: auto; margin-bottom: 1; }
#select_all_buttons Button { width: auto; margin-right: 1; }
#confirm_buttons { height: auto; margin-top: 1; }
#confirm_buttons Button { width: auto; margin: 0 1; }
RadioButton:focus {
background: transparent;
border: none;
color: $text;
}
RadioButton > .radio-button--label {
padding: 0 1;
}
RadioButton:focus > .radio-button--label {
background: transparent;
text-style: none;
}
#progress_dialog {
width: 80%;
height: 80%;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
#progress_status { text-style: bold; margin-bottom: 1; }
#progress_bar { margin-bottom: 1; }
#progress_log { height: 1fr; margin-bottom: 1; border: solid $primary; }
.label_warning { color: yellow; margin-right: 1; content-align: center middle;}
"""
BINDINGS = [
("q", "app.exit", "Quit"),
("c", "config", "Config"),
("b", "app.pop_screen", "Back"),
]
def __init__(self):
super().__init__()
self.config_path = "config.yaml"
def __init__(self, cfg_name: str, cfg_path: Path, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cfg_name = cfg_name
self.config_path = cfg_path
self.config = load_config(self.config_path)
self.engine = MigrationContext(self.config, target_platform="fluxer")
self.exporter = DiscordExporter(self.engine.discord_reader)
self.exporter = DiscordExporter(self.engine.discord_reader, base_dir=f"Reaper-{self.cfg_name}")
self.validation_results = {}
self.tokens_valid = False
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="main_container"):
yield Label("Disco Reaper - Server Backup Tool", id="title_label")
with Vertical(id="info_container"):
yield Label("Loading...", id="lbl_server")
yield Label("", id="lbl_bot")
yield Label("", id="lbl_backup")
with Vertical(id="actions_container"):
yield Button("Backup Server Profile", id="btn_backup_profile", disabled=True)
yield Button("Backup Channel Messages", id="btn_backup_msgs", disabled=True)
yield Button("Update & Sync Backup", id="btn_backup_sync", disabled=True)
yield Rule()
yield Button("Configuration", id="btn_config")
yield Button("Exit", id="btn_exit", variant="error")
with VerticalScroll(id="backup_scroll"):
with Container(id="main_container"):
yield Label("Disco Reaper - Server Backup Tool", id="title_label")
with Vertical(id="info_container"):
yield Label("Loading...", id="lbl_server")
yield Label("", id="lbl_bot")
yield Label("", id="lbl_backup")
with Vertical(id="actions_container"):
yield Button("Backup Server Profile", id="btn_backup_profile", disabled=True)
yield Button("Backup Channel Messages", id="btn_backup_msgs", disabled=True)
yield Button("Update & Sync Backup", id="btn_backup_sync", disabled=True)
yield Rule()
yield Button("Configuration", id="btn_config")
yield Button("Back", id="btn_back")
yield Button("Exit", id="btn_exit", variant="error")
yield Footer()
def on_mount(self) -> None:
@ -236,8 +339,7 @@ class MainScreen(Screen):
if not d_name or not d_id:
return None
safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', d_name)
export_path = Path(".") / f"EXPORT-{safe_name}-{d_id}"
export_path = Path(f"Reaper-{self.cfg_name}") / f"DISCORD-{d_id}"
profile_file = export_path / "server_profile.json"
if profile_file.exists():
@ -262,7 +364,7 @@ class MainScreen(Screen):
self.config.discord_server_id = reply["server"]
save_config(self.config, self.config_path)
self.engine = MigrationContext(self.config, target_platform="fluxer")
self.exporter = DiscordExporter(self.engine.discord_reader)
self.exporter = DiscordExporter(self.engine.discord_reader, base_dir=f"Reaper-{self.cfg_name}")
self.validate_config()
self.app.push_screen(
@ -273,6 +375,8 @@ class MainScreen(Screen):
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_exit":
self.app.exit()
elif event.button.id == "btn_back":
self.app.pop_screen()
elif event.button.id == "btn_config":
self.open_config()
elif event.button.id == "btn_backup_profile":
@ -460,115 +564,4 @@ class MainScreen(Screen):
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
class ReaperUI(App):
CSS = """
Screen {
align: center middle;
}
#title_label {
text-style: bold;
color: green;
margin-bottom: 1;
content-align: center middle;
width: 100%;
}
#main_container {
width: 80%;
height: 80%;
border: solid green;
padding: 1 2;
}
#info_container {
height: auto;
margin-bottom: 2;
border: tall cyan;
padding: 1;
}
#actions_container {
height: auto;
layout: vertical;
align: center top;
margin-top: 1;
}
Button {
width: 100%;
margin-bottom: 1;
}
#config_dialog {
width: 60%;
height: 60%;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
#config_title { text-style: bold; margin-bottom: 1; }
#config_buttons { height: auto; margin-top: 1; }
#config_buttons Button { width: 1fr; margin: 0 1; }
#channel_dialog {
width: 80%;
height: 80%;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
#chan_title { text-style: bold; margin-bottom: 1; }
.category_header {
margin-top: 1;
background: $primary 10%;
text-style: bold;
padding-left: 1;
}
#channel_list_scroll {
height: 1fr;
border: solid $primary;
margin-bottom: 1;
padding: 0 1;
}
#select_all_buttons { height: auto; margin-bottom: 1; }
#select_all_buttons Button { width: auto; margin-right: 1; }
#confirm_buttons { height: auto; margin-top: 1; }
#confirm_buttons Button { width: auto; margin: 0 1; }
RadioButton:focus {
background: transparent;
border: none;
color: $text;
}
RadioButton > .radio-button--label {
padding: 0 1;
}
RadioButton:focus > .radio-button--label {
background: transparent;
text-style: none;
}
#progress_dialog {
width: 80%;
height: 80%;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
#progress_status { text-style: bold; margin-bottom: 1; }
#progress_bar { margin-bottom: 1; }
#progress_log { height: 1fr; margin-bottom: 1; border: solid $primary; }
.label_warning { color: yellow; margin-right: 1; content-align: center middle;}
"""
def on_mount(self) -> None:
self.push_screen(MainScreen())
async def run_disco_reaper_tui(config_path="config.yaml"):
app = ReaperUI()
await app.run_async()

346
src/ui/main_app.py Normal file
View file

@ -0,0 +1,346 @@
import re
import os
from pathlib import Path
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
from textual.widgets import (
Header, Footer, Button, Label, Input, ListItem,
ListView, Rule, RadioButton, RadioSet,
)
from textual.screen import Screen, ModalScreen
from src.core.configuration import (
get_available_configs, create_new_config, load_config, save_config,
)
# ──────────────────────────────────────────────────────────────────────────────
# Modal: create a new Reaper-* config folder
# ──────────────────────────────────────────────────────────────────────────────
class NewConfigModal(ModalScreen[str]):
"""Modal to enter a name for a new configuration."""
DEFAULT_CSS = """
NewConfigModal { align: center middle; }
#new_config_dialog {
width: 50; height: auto;
border: thick $background 80%; background: $surface; padding: 1 2;
}
#new_config_title { text-style: bold; margin-bottom: 1; }
#new_config_buttons { height: auto; margin-top: 1; }
#new_config_buttons Button { width: 1fr; margin: 0 1; }
"""
def compose(self) -> ComposeResult:
with Vertical(id="new_config_dialog"):
yield Label("Enter new configuration name:", id="new_config_title")
yield Input(placeholder="e.g. MyServer", id="new_config_input")
with Horizontal(id="new_config_buttons"):
yield Button("Create", variant="success", id="btn_create")
yield Button("Cancel", variant="primary", id="btn_cancel")
def _get_sanitized_name(self) -> str:
raw = self.query_one("#new_config_input", Input).value.strip()
return re.sub(r"[^a-zA-Z0-9_-]+", "-", raw).strip("-")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_create":
name = self._get_sanitized_name()
if name:
self.dismiss(name)
elif event.button.id == "btn_cancel":
self.dismiss(None)
def on_key(self, event) -> None:
if event.key == "enter":
name = self._get_sanitized_name()
if name:
self.dismiss(name)
elif event.key == "escape":
self.dismiss(None)
# ──────────────────────────────────────────────────────────────────────────────
# Screen 1: pick (or create) a Reaper-* config
# ──────────────────────────────────────────────────────────────────────────────
class ConfigSelectionScreen(Screen):
"""Screen to select or create a Reaper configuration."""
DEFAULT_CSS = """
ConfigSelectionScreen { align: center middle; }
#config_sel_container {
width: 60; height: auto;
border: solid green; padding: 1 2;
}
#config_sel_title {
text-style: bold; color: green; margin-bottom: 1;
content-align: center middle; width: 100%;
}
#config_list_container {
height: 10; max-height: 20;
border: solid $primary; margin-bottom: 1;
}
#config_sel_actions { height: auto; margin-top: 1; }
#config_sel_actions Button { width: 1fr; margin: 0 1; }
"""
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="config_sel_container"):
yield Label("Disco Reaper — Select Configuration", id="config_sel_title")
with VerticalScroll(id="config_list_container"):
yield ListView(id="config_list")
with Horizontal(id="config_sel_actions"):
yield Button("Create New Config", id="btn_new_config", variant="success")
yield Button("Exit", id="btn_exit", variant="error")
yield Footer()
def on_mount(self) -> None:
self.refresh_configs()
def on_screen_resume(self) -> None:
self.refresh_configs()
def refresh_configs(self) -> None:
configs = get_available_configs()
lv = self.query_one("#config_list", ListView)
lv.clear()
for c in configs:
lv.append(ListItem(Label(f"Reaper-{c}"), name=c))
def on_list_view_selected(self, event: ListView.Selected) -> None:
cfg_name = event.item.name
cfg_path = Path(f"Reaper-{cfg_name}") / "config.yaml"
self.app.push_screen(ConfigScreen(cfg_name, cfg_path))
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_new_config":
def cb(name: str | None):
if name:
create_new_config(name)
self.refresh_configs()
self.app.push_screen(NewConfigModal(), cb)
elif event.button.id == "btn_exit":
self.app.exit()
# ──────────────────────────────────────────────────────────────────────────────
# Screen 2: edit config + pick mode + start
# ──────────────────────────────────────────────────────────────────────────────
_MODE_MAP = {
"radio_direct": "direct_transfer",
"radio_backup": "backup_transfer",
"radio_bkonly": "backup_only",
}
_MODE_LABELS = {
"direct_transfer": "radio_direct",
"backup_transfer": "radio_backup",
"backup_only": "radio_bkonly",
}
_PLAT_MAP = {
"radio_fluxer": "fluxer",
"radio_stoat": "stoat",
}
_PLAT_LABELS = {
"fluxer": "radio_fluxer",
"stoat": "radio_stoat",
}
class ConfigScreen(Screen):
"""Configuration screen — Discord config, tool mode, and target platform."""
DEFAULT_CSS = """
ConfigScreen { align: center middle; }
#cfg_scroll { width: 100%; height: 100%; align: center middle; }
#cfg_container {
width: 70; height: auto;
border: solid green; padding: 1 2; margin: 1 0;
}
#cfg_title {
text-style: bold; color: green; margin-bottom: 1;
content-align: center middle; width: 100%;
}
.section_title {
text-style: bold; color: cyan; margin-top: 1; margin-bottom: 0;
}
.field_label { margin-top: 1; }
#cfg_container Input { margin-bottom: 0; }
#mode_radio, #plat_radio {
height: auto; margin: 0 0 0 2;
}
#target_section { height: auto; }
#cfg_actions { height: auto; margin-top: 1; }
#cfg_actions Button { width: 1fr; margin: 0 1; }
"""
BINDINGS = [("escape", "go_back", "Back")]
def __init__(self, cfg_name: str, cfg_path: Path, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cfg_name = cfg_name
self.cfg_path = cfg_path
self.config = load_config(cfg_path)
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with VerticalScroll(id="cfg_scroll"):
with Container(id="cfg_container"):
yield Label(f"Configuration — {self.cfg_name}", id="cfg_title")
# ── Discord ──────────────────────────────────────────────
yield Label("Discord", classes="section_title")
yield Rule()
yield Label("Bot Token:", classes="field_label")
yield Input(
value=self.config.discord_bot_token or "",
id="inp_discord_token",
password=True,
)
yield Label("Server ID:", classes="field_label")
yield Input(
value=self.config.discord_server_id or "",
id="inp_discord_server",
)
# ── Reaper Mode ──────────────────────────────────────────
yield Label("Reaper Mode", classes="section_title")
yield Rule()
cur_mode = self.config.tool_mode or "backup_only"
with RadioSet(id="mode_radio"):
yield RadioButton(
"Shuttle Transfer (direct migration)",
id="radio_direct",
value=(cur_mode == "direct_transfer"),
)
yield RadioButton(
"Backup & Migrate (backup first, then migrate)",
id="radio_backup",
value=(cur_mode == "backup_transfer"),
)
yield RadioButton(
"Backup Only (local backup, no migration)",
id="radio_bkonly",
value=(cur_mode == "backup_only"),
)
# ── Target Platform (hidden for backup_only) ─────────────
with Vertical(id="target_section"):
yield Label("Target Platform", classes="section_title")
yield Rule()
cur_plat = self.config.target_platform or "none"
with RadioSet(id="plat_radio"):
yield RadioButton(
"Fluxer",
id="radio_fluxer",
value=(cur_plat == "fluxer"),
)
yield RadioButton(
"Stoat",
id="radio_stoat",
value=(cur_plat == "stoat"),
)
yield Label("Bot Token:", classes="field_label")
yield Input(
value=self.config.target_bot_token or "",
id="inp_target_token",
password=True,
)
yield Label("Community / Server ID:", classes="field_label")
yield Input(
value=self.config.target_server_id or "",
id="inp_target_server",
)
yield Rule()
with Horizontal(id="cfg_actions"):
yield Button("Save & Start", variant="success", id="btn_start")
yield Button("Save", variant="primary", id="btn_save")
yield Button("Back", id="btn_back")
yield Footer()
def on_mount(self) -> None:
self._toggle_target_section()
# ── show / hide the target platform section ──────────────────────────
def _get_selected_mode(self) -> str:
for rb in self.query("#mode_radio RadioButton"):
if rb.value:
return _MODE_MAP.get(rb.id, "backup_only")
return "backup_only"
def _get_selected_platform(self) -> str:
for rb in self.query("#plat_radio RadioButton"):
if rb.value:
return _PLAT_MAP.get(rb.id, "fluxer")
return "fluxer"
def _toggle_target_section(self) -> None:
section = self.query_one("#target_section")
section.display = self._get_selected_mode() != "backup_only"
def on_radio_set_changed(self, event: RadioSet.Changed) -> None:
if event.radio_set.id == "mode_radio":
self._toggle_target_section()
# ── save / start ─────────────────────────────────────────────────────
def _collect_and_save(self) -> None:
self.config.discord_bot_token = self.query_one("#inp_discord_token", Input).value.strip() or self.config.discord_bot_token
self.config.discord_server_id = self.query_one("#inp_discord_server", Input).value.strip() or self.config.discord_server_id
self.config.tool_mode = self._get_selected_mode()
if self.config.tool_mode != "backup_only":
self.config.target_platform = self._get_selected_platform()
self.config.target_bot_token = self.query_one("#inp_target_token", Input).value.strip() or self.config.target_bot_token
self.config.target_server_id = self.query_one("#inp_target_server", Input).value.strip() or self.config.target_server_id
else:
self.config.target_platform = "none"
save_config(self.config, self.cfg_path)
def _launch_mode(self) -> None:
mode = self.config.tool_mode
if mode == "direct_transfer":
from src.ui.shuttle_screen import ShuttleScreen
self.app.push_screen(ShuttleScreen(self.cfg_name, self.cfg_path))
elif mode == "backup_transfer":
from src.ui.backup_screen import BackupScreen
self.app.push_screen(BackupScreen(self.cfg_name, self.cfg_path))
else: # backup_only
from src.ui.backup_screen import BackupScreen
self.app.push_screen(BackupScreen(self.cfg_name, self.cfg_path))
def action_go_back(self) -> None:
self.app.pop_screen()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_back":
self.app.pop_screen()
elif event.button.id == "btn_save":
self._collect_and_save()
self.notify("Configuration saved.", severity="information")
elif event.button.id == "btn_start":
self._collect_and_save()
self._launch_mode()
# ──────────────────────────────────────────────────────────────────────────────
# App
# ──────────────────────────────────────────────────────────────────────────────
class ReaperApp(App):
def on_mount(self) -> None:
self.push_screen(ConfigSelectionScreen())
def run_disco_reaper_tui():
app = ReaperApp()
app.run()

View file

@ -1,350 +0,0 @@
import sys
import asyncio
import discord
import logging
import time
import re
import json
from datetime import datetime
from pathlib import Path
from rich.console import Console
from rich.prompt import Prompt, Confirm
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from src.core.configuration import load_config, save_config
from src.core.base import MigrationContext
from src.disco_reaper.exporter import DiscordExporter
console = Console()
class DiscoReaperCLI:
"""CLI app to manage Discord server data export."""
def __init__(self, config_path="config.yaml"):
self.config_path = config_path
try:
self.config = load_config(self.config_path)
# We only need the Discord side of MigrationContext
self.engine = MigrationContext(self.config, target_platform="fluxer")
self.exporter = DiscordExporter(self.engine.discord_reader)
except Exception as e:
console.print(f"[bold red]Failed to load config: {e}[/bold red]")
sys.exit(1)
self.tokens_valid = False
async def validate_config(self):
self.validation_results = {
"discord_token": False, "discord_bot_name": None,
"discord_server": False, "discord_server_name": None,
"discord_timeout": False
}
d_token = self.config.discord_bot_token
fillers = ["DISCORD_BOT_TOKEN", "000000000000000000", "DISCORD_SERVER_ID", "", None]
discord_dummy = d_token in fillers or self.config.discord_server_id in fillers
if discord_dummy:
console.print("[bold yellow]Discord setup incomplete in config.yaml[/bold yellow]")
return False
try:
with console.status("[yellow]Validating Discord token...[/yellow]"):
res = await self.engine.discord_reader.validate()
self.validation_results["discord_token"] = res.get("token", False)
self.validation_results["discord_bot_name"] = res.get("bot_name")
self.validation_results["discord_server"] = res.get("server", False)
self.validation_results["discord_server_name"] = res.get("server_name")
if not res.get("token"):
console.print("[bold red]Discord Token validation failed.[/bold red]")
elif not res.get("server"):
console.print("[bold red]Discord Server ID validation failed.[/bold red]")
else:
self.tokens_valid = True
return True
except Exception as e:
console.print(f"[bold red]Discord validation failed: {e}[/bold red]")
return False
def get_backup_info(self):
"""Checks for existing backup and returns formatted timestamp."""
d_name = self.validation_results.get("discord_server_name")
d_id = self.config.discord_server_id
if not d_name or not d_id or d_id == "DISCORD_SERVER_ID":
return None
safe_name = re.sub(r'[^a-zA-Z0-9_\-\.]', '_', d_name)
export_path = Path(".") / f"EXPORT-{safe_name}-{d_id}"
profile_file = export_path / "server_profile.json"
if profile_file.exists():
try:
with open(profile_file, "r", encoding="utf-8") as f:
data = json.load(f)
ts_str = data.get("last_backup")
if ts_str:
dt = datetime.fromisoformat(ts_str)
return dt.strftime("%d-%b-%Y %H:%M")
except Exception:
pass
return None
async def run(self):
await self.validate_config()
try:
while True:
console.print("")
console.print(Panel.fit("Disco Reaper - Server Backup Tool", style="bold green"))
d_name = self.validation_results.get("discord_server_name")
d_display = f"[bold green]\"{d_name}\"[/bold green]" if d_name else "[bold red]NOT CONNECTED[/bold red]"
console.print(f"[bold cyan]Source Server:[/bold cyan] {d_display}")
b_name = self.validation_results.get("discord_bot_name")
b_display = f"[bold green]\"{b_name}\"[/bold green]" if b_name else "[bold red]UNKNOWN[/bold red]"
console.print(f"[bold cyan]Bot name:[/bold cyan] {b_display}")
backup_ts = self.get_backup_info()
if backup_ts:
console.print(f"[bold cyan]Backup Found:[/bold cyan] [bold yellow]{backup_ts}[/bold yellow]")
console.print("\n[bold]Main Menu[/bold]")
console.print("(1) Backup Server Profile")
console.print("(2) Backup Channel Messages")
console.print("(3) Update & Sync Backup")
console.print("(4) Configuration")
console.print("(Q) Exit")
choice = Prompt.ask("\nSelect an option", choices=["1", "2", "3", "4", "Q"], default="Q", show_choices=False).upper()
if choice == "1":
await self.backup_server_profile()
elif choice == "2":
await self.backup_messages()
elif choice == "3":
await self.sync_backup()
elif choice == "4":
await self.edit_configuration()
elif choice == "Q":
break
except (KeyboardInterrupt, asyncio.CancelledError):
console.print("\n[yellow]Operation terminated by user.[/yellow]")
finally:
await self.engine.close_connections()
async def backup_server_profile(self):
"""Option 1: Backup name, banner, logo, roles, structure, and custom assets."""
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
meta = await self.exporter.setup()
with console.status("[yellow]Backing up server profile & skeleton...[/yellow]"):
# 1. Profile & Assets
await self.exporter.export_metadata()
await self.exporter.download_server_assets()
# 2. Roles
try:
role_data = await self.exporter.export_roles()
r_count = len(role_data)
except discord.Forbidden:
r_count = 0
console.print("[yellow]Warning: Could not backup roles (Missing Access).[/yellow]")
# 3. Structure
_, cat_count, chan_count = await self.exporter.export_channels_structure()
# 4. Emojis & Stickers
e_count, s_count = await self.exporter.export_assets()
console.print(f"[bold green]Server Profile backed up to: {self.exporter.export_path}[/bold green]")
console.print("\n[bold]Profile Backup Stats:[/bold]")
console.print("- name, logo & banner")
console.print(f"- {cat_count} categories & {chan_count} channels")
console.print(f"- {r_count} roles")
console.print(f"- {e_count} emojis, {s_count} stickers.")
except discord.Forbidden as e:
console.print(f"[bold red]Backup failed: {e}[/bold red]")
console.print("[yellow]Hint: Missing permissions. Check bot 'Guilds' and 'Channel' access.[/yellow]")
except Exception as e:
console.print(f"[bold red]Backup failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def backup_messages(self, force_overwrite=None, skip_found_prompt=False, auto_sync_existing=False):
"""Option 2: Backup Channels structure and all message history."""
if not self.tokens_valid: return
try:
await self.engine.discord_reader.start()
await self.exporter.setup()
# 1. Export structure first
with console.status("[yellow]Exporting channel structure...[/yellow]"):
await self.exporter.export_channels_structure()
# 2. Select Channels
with console.status("[yellow]Fetching channels & categories...[/yellow]"):
all_channels = await self.engine.discord_reader.get_channels()
all_categories = await self.engine.discord_reader.get_categories()
cat_map = {c.id: c.name for c in all_categories}
# Filter for exportable channels
eligible_channels = [
c for c in all_channels
if c.type in [discord.ChannelType.text, discord.ChannelType.news, discord.ChannelType.forum]
]
if not eligible_channels:
console.print("[yellow]No text/news channels found to backup.[/yellow]")
return
selected_channels = []
if auto_sync_existing:
selected_channels = [
c for c in eligible_channels
if (self.exporter.export_path / "message_backup" / f"{c.id}.json").exists()
]
if not selected_channels:
console.print("[yellow]No existing backups found to sync.[/yellow]")
return
# In auto-sync mode, we always use sync (force_overwrite=False) and skip prompts
force_overwrite = False
skip_found_prompt = True
else:
console.print(f"\n[bold]Select Channels to Backup ({len(eligible_channels)} total):[/bold]")
# Identify duplicate names to show categories for them
name_counts = {}
for chan in eligible_channels:
name_counts[chan.name] = name_counts.get(chan.name, 0) + 1
for i, chan in enumerate(eligible_channels):
display_name = chan.name
if name_counts[chan.name] > 1:
cat_name = cat_map.get(chan.category_id)
if cat_name:
display_name = f"{chan.name} [{cat_name}]"
# Check for existing backup
found_prefix = ""
backup_file = self.exporter.export_path / "message_backup" / f"{chan.id}.json"
if backup_file.exists():
found_prefix = "[green][FOUND][/green] "
console.print(f"({i+1}) {found_prefix}{display_name}")
console.print("(A) [bold green]All Channels[/bold green]")
console.print("(B) Back")
choices = [str(i+1) for i in range(len(eligible_channels))] + ["A", "B", "a", "b"]
selection_input = Prompt.ask("\nSelect option or indices (e.g. 1,2,5)", default="B")
if selection_input.upper() == "B":
return
if selection_input.upper() == "A":
selected_channels = eligible_channels
else:
try:
# Handle multiple indices like '1,2,5'
indices = [int(i.strip()) - 1 for i in selection_input.split(",") if i.strip().isdigit()]
selected_channels = [eligible_channels[i] for i in indices if 0 <= i < len(eligible_channels)]
except Exception:
console.print("[red]Invalid selection. Aborting.[/red]")
return
if not selected_channels:
console.print("[yellow]No valid channels selected.[/yellow]")
return
# Check if any have [FOUND]
any_found = False
for chan in selected_channels:
if (self.exporter.export_path / "message_backup" / f"{chan.id}.json").exists():
any_found = True
break
if force_overwrite is None:
force_overwrite = False
if any_found and not skip_found_prompt:
console.print("\n[bold yellow]Existing backup(s) found for some selected channels.[/bold yellow]")
console.print("(Y) Update & Sync Backup")
console.print("(F) [bold red]Force Overwrite Backup[/bold red]")
console.print("(B) Back")
sync_choice = Prompt.ask("\nSelect option", choices=["Y", "F", "B"], default="Y").upper()
if sync_choice == "B":
return
force_overwrite = (sync_choice == "F")
console.print(f"\n[yellow]Starting backup for {len(selected_channels)} channels...[/yellow]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TextColumn("- [bold yellow]{task.fields[msg_count]} {task.fields[suffix]}"),
console=console
) as progress:
for chan in selected_channels:
# Determine if it's a sync or fresh backup
backup_exists = (self.exporter.export_path / "message_backup" / f"{chan.id}.json").exists()
is_sync = backup_exists and not force_overwrite
label = "Syncing Backup" if is_sync else "Backing up"
suffix = "new messages" if is_sync else "messages"
# Create a specific task for each channel to keep it on its own line
task_id = progress.add_task(f"[cyan]{label}: {chan.name}", total=None, msg_count=0, suffix=suffix)
async def update_msg_count(name, count, tid=task_id):
progress.update(tid, msg_count=count)
await self.exporter.export_channel_messages(chan.id, progress_callback=update_msg_count, force=force_overwrite)
await self.exporter.export_threads(chan.id, progress_callback=update_msg_count, force=force_overwrite)
# Mark as finished (stop spinner)
progress.stop_task(task_id)
progress.update(task_id, description=f"[green]Completed: {chan.name}")
console.print("[bold green]Message backup complete![/bold green]")
# Update last_backup in server_profile.json
await self.exporter.export_metadata()
except Exception as e:
console.print(f"[bold red]Message backup failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
async def sync_backup(self):
"""Option 3: Update & Sync Backup (Automated incremental sync for existing backups)."""
console.print("[yellow]Syncing backup... (Updating existing data)[/yellow]")
await self.backup_server_profile()
# Automatically select and sync existing backups
await self.backup_messages(auto_sync_existing=True)
async def edit_configuration(self):
# reuse or implement simplified version of edit_configuration from app.py
console.print("\n[bold]Configuration Editor[/bold]")
d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token)
d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id)
if d_token != self.config.discord_bot_token or d_server != self.config.discord_server_id:
self.config.discord_bot_token = d_token
self.config.discord_server_id = d_server
save_config(self.config, self.config_path)
self.engine = MigrationContext(self.config, target_platform="fluxer")
self.exporter = DiscordExporter(self.engine.discord_reader)
await self.validate_config()
console.print("[bold green]Config updated![/bold green]")
else:
console.print("[yellow]No changes.[/yellow]")
async def run_disco_reaper(config_path="config.yaml"):
app = DiscoReaperCLI(config_path)
await app.run()

1157
src/ui/shuttle_screen.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -201,7 +201,7 @@ class ReaperScreen(Screen):
@property
def exporter(self): return self.app.exporter
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def validate_config(self) -> None:
def update_ui(server_text, bot_text, backup_text, buttons_enabled):
self.query_one("#lbl_server", Label).update(f"Source Server: {server_text}")
@ -213,17 +213,17 @@ class ReaperScreen(Screen):
d_token = self.config.discord_bot_token
fillers = ["DISCORD_BOT_TOKEN", "000000000000000000", "DISCORD_SERVER_ID", "", None]
if d_token in fillers or self.config.discord_server_id in fillers:
self.app.call_from_thread(update_ui, "[red]NOT CONNECTED[/red]", "[red]UNKNOWN[/red]", "", False)
update_ui("[red]NOT CONNECTED[/red]", "[red]UNKNOWN[/red]", "", False)
self.tokens_valid = False
return
self.app.call_from_thread(update_ui, "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", "", False)
update_ui("[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", "", False)
try:
res = await self.engine.discord_reader.validate()
except Exception as e:
res = {}
self.app.call_from_thread(update_ui, f"[red]Validation Error: {e}[/red]", "", "", False)
update_ui(f"[red]Validation Error: {e}[/red]", "", "", False)
self.tokens_valid = False
return
@ -246,7 +246,7 @@ class ReaperScreen(Screen):
if backup_ts:
backup_text = f"Backup Found: [yellow]{backup_ts}[/yellow]"
self.app.call_from_thread(update_ui, server_display, bot_display, backup_text, self.tokens_valid)
update_ui(server_display, bot_display, backup_text, self.tokens_valid)
def get_backup_info(self):
d_name = self.validation_results.get("discord_server_name")
@ -299,48 +299,48 @@ class ReaperScreen(Screen):
elif event.button.id == "btn_backup_sync":
self.run_backup_sync()
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_backup_profile(self) -> None:
modal = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal)
self.app.push_screen(modal)
await asyncio.sleep(0.1)
try:
self.app.call_from_thread(modal.set_status, "Starting readers...")
modal.set_status("Starting readers...")
await self.engine.discord_reader.start()
meta = await self.exporter.setup()
self.app.call_from_thread(modal.write_to_log, "[yellow]Backing up server profile & skeleton...[/yellow]")
modal.write_to_log("[yellow]Backing up server profile & skeleton...[/yellow]")
await self.exporter.export_metadata()
await self.exporter.download_server_assets()
self.app.call_from_thread(modal.write_to_log, "Exporting structure...")
modal.write_to_log("Exporting structure...")
_, cat_count, chan_count = await self.exporter.export_channels_structure()
self.app.call_from_thread(modal.write_to_log, "Exporting assets...")
modal.write_to_log("Exporting assets...")
e_count, s_count = await self.exporter.export_assets()
self.app.call_from_thread(modal.write_to_log, f"[bold green]Server Profile backed up to: {self.exporter.export_path}[/bold green]")
self.app.call_from_thread(modal.write_to_log, f"- {cat_count} categories & {chan_count} channels")
self.app.call_from_thread(modal.write_to_log, f"- {e_count} emojis, {s_count} stickers.")
modal.write_to_log(f"[bold green]Server Profile backed up to: {self.exporter.export_path}[/bold green]")
modal.write_to_log(f"- {cat_count} categories & {chan_count} channels")
modal.write_to_log(f"- {e_count} emojis, {s_count} stickers.")
except discord.Forbidden as e:
self.app.call_from_thread(modal.write_to_log, f"[bold red]Backup failed: {e}[/bold red]")
modal.write_to_log(f"[bold red]Backup failed: {e}[/bold red]")
except Exception as e:
self.app.call_from_thread(modal.write_to_log, f"[bold red]Error: {e}[/bold red]")
modal.write_to_log(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.app.call_from_thread(modal.set_status, "Finished.")
self.app.call_from_thread(modal.allow_close)
modal.set_status("Finished.")
modal.allow_close()
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_backup_messages(self) -> None:
modal_prog = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
try:
self.app.call_from_thread(modal_prog.set_status, "Fetching channels...")
modal_prog.set_status("Fetching channels...")
await self.engine.discord_reader.start()
await self.exporter.setup()
@ -355,8 +355,8 @@ class ReaperScreen(Screen):
]
if not eligible_channels:
self.app.call_from_thread(modal_prog.write_to_log, "[yellow]No text/news channels found to backup.[/yellow]")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.write_to_log("[yellow]No text/news channels found to backup.[/yellow]")
modal_prog.allow_close()
return
any_found = False
@ -366,7 +366,7 @@ class ReaperScreen(Screen):
any_found = True
backed_up_ids.add(chan.id)
self.app.call_from_thread(self.app.pop_screen)
self.app.pop_screen()
loop = asyncio.get_running_loop()
future = loop.create_future()
@ -375,8 +375,7 @@ class ReaperScreen(Screen):
if not future.done():
future.set_result(reply)
self.app.call_from_thread(
self.app.push_screen,
self.app.push_screen(
ChannelSelectModal(eligible_channels, cat_map, backed_up_ids, any_found),
check_channels
)
@ -390,50 +389,50 @@ class ReaperScreen(Screen):
selected_channels = [c for c in eligible_channels if c.id in selected_ids]
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
total_chans = len(selected_channels)
self.app.call_from_thread(modal_prog.set_status, "Backing up messages...")
self.app.call_from_thread(modal_prog.write_to_log, f"[yellow]Starting backup for {total_chans} channels...[/yellow]")
modal_prog.set_status("Backing up messages...")
modal_prog.write_to_log(f"[yellow]Starting backup for {total_chans} channels...[/yellow]")
for chan in selected_channels:
backup_exists = (self.exporter.export_path / "message_backup" / f"{chan.id}.json").exists()
is_sync = backup_exists and not force_overwrite
label = "Syncing Backup" if is_sync else "Backing up"
self.app.call_from_thread(modal_prog.write_to_log, f"[cyan]{label}: {chan.name}[/cyan]")
modal_prog.write_to_log(f"[cyan]{label}: {chan.name}[/cyan]")
async def update_msg_count(name, count):
self.app.call_from_thread(modal_prog.set_status, f"{name}: {count} messages")
modal_prog.set_status(f"{name}: {count} messages")
await self.exporter.export_channel_messages(chan.id, progress_callback=update_msg_count, force=force_overwrite)
await self.exporter.export_threads(chan.id, progress_callback=update_msg_count, force=force_overwrite)
self.app.call_from_thread(modal_prog.write_to_log, f"[green]Completed: {chan.name}[/green]")
modal_prog.write_to_log(f"[green]Completed: {chan.name}[/green]")
await self.exporter.export_metadata()
self.app.call_from_thread(modal_prog.write_to_log, "[bold green]Message backup complete![/bold green]")
modal_prog.write_to_log("[bold green]Message backup complete![/bold green]")
except Exception as e:
self.app.call_from_thread(modal_prog.write_to_log, f"[bold red]Message backup failed: {e}[/bold red]")
modal_prog.write_to_log(f"[bold red]Message backup failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.set_status("Finished.")
modal_prog.allow_close()
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_backup_sync(self) -> None:
modal_prog = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
try:
self.app.call_from_thread(modal_prog.set_status, "Starting sync...")
modal_prog.set_status("Starting sync...")
await self.engine.discord_reader.start()
await self.exporter.setup()
self.app.call_from_thread(modal_prog.write_to_log, "Updating structure...")
modal_prog.write_to_log("Updating structure...")
await self.exporter.export_metadata()
await self.exporter.download_server_assets()
await self.exporter.export_channels_structure()
@ -451,28 +450,28 @@ class ReaperScreen(Screen):
]
if not selected_channels:
self.app.call_from_thread(modal_prog.write_to_log, "[yellow]No existing backups found to sync.[/yellow]")
modal_prog.write_to_log("[yellow]No existing backups found to sync.[/yellow]")
else:
self.app.call_from_thread(modal_prog.write_to_log, f"[yellow]Syncing {len(selected_channels)} channels...[/yellow]")
modal_prog.write_to_log(f"[yellow]Syncing {len(selected_channels)} channels...[/yellow]")
for chan in selected_channels:
self.app.call_from_thread(modal_prog.write_to_log, f"[cyan]Syncing: {chan.name}[/cyan]")
modal_prog.write_to_log(f"[cyan]Syncing: {chan.name}[/cyan]")
async def update_msg_count(name, count):
self.app.call_from_thread(modal_prog.set_status, f"{name}: {count} messages")
modal_prog.set_status(f"{name}: {count} messages")
await self.exporter.export_channel_messages(chan.id, progress_callback=update_msg_count, force=False)
await self.exporter.export_threads(chan.id, progress_callback=update_msg_count, force=False)
self.app.call_from_thread(modal_prog.write_to_log, f"[green]Synced: {chan.name}[/green]")
modal_prog.write_to_log(f"[green]Synced: {chan.name}[/green]")
await self.exporter.export_metadata()
self.app.call_from_thread(modal_prog.write_to_log, "[bold green]Sync operation complete![/bold green]")
modal_prog.write_to_log("[bold green]Sync operation complete![/bold green]")
except Exception as e:
self.app.call_from_thread(modal_prog.write_to_log, f"[bold red]Sync failed: {e}[/bold red]")
modal_prog.write_to_log(f"[bold red]Sync failed: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.set_status("Finished.")
modal_prog.allow_close()
# -------------------------------------------------------------------------
# Shuttle Screen
@ -576,7 +575,7 @@ class ShuttleScreen(Screen):
is_stoat = getattr(self.config, 'use_stoat', False)
return self.app.stoat_engine if is_stoat else self.app.fluxer_engine
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def validate_config(self) -> None:
is_stoat = getattr(self.config, 'use_stoat', False)
platform_name = "Stoat" if is_stoat else "Fluxer"
@ -589,7 +588,7 @@ class ShuttleScreen(Screen):
for btn_id in ["#btn_clone", "#btn_roles", "#btn_emojis", "#btn_meta", "#btn_history", "#btn_danger"]:
self.query_one(btn_id, Button).disabled = not buttons_enabled
self.app.call_from_thread(update_ui, "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", False)
update_ui("[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]", False)
engine = self.engine
@ -667,10 +666,10 @@ class ShuttleScreen(Screen):
else:
perms_display = "[green]VALID[/green]"
self.app.call_from_thread(update_ui, t_display, d_display, perms_display, self.tokens_valid)
update_ui(t_display, d_display, perms_display, self.tokens_valid)
except Exception as e:
self.app.call_from_thread(update_ui, f"[red]Validation Error: {e}[/red]", "", "", False)
update_ui(f"[red]Validation Error: {e}[/red]", "", "", False)
self.tokens_valid = False
def on_button_pressed(self, event: Button.Pressed) -> None:
@ -699,10 +698,10 @@ class ShuttleScreen(Screen):
elif event.button.id == "btn_history":
pass # Implement later
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_clone(self) -> None:
modal_prog = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
is_stoat = getattr(self.config, 'use_stoat', False)
@ -712,25 +711,23 @@ class ShuttleScreen(Screen):
from src.stoat.clone_server import sync_channel_state, migrate_channels
try:
self.app.call_from_thread(modal_prog.set_status, "Fetching structure...")
modal_prog.set_status("Fetching structure...")
await self.engine.start_connections()
self.app.call_from_thread(modal_prog.write_to_log, "Syncing channel state...")
modal_prog.write_to_log("Syncing channel state...")
await sync_channel_state(self.engine)
categories = await self.engine.discord_reader.get_categories()
channels = await self.engine.discord_reader.get_channels()
async def update_progress(item_name: str, status: str, current: int, total: int):
self.app.call_from_thread(modal_prog.set_status, f"{status} Channel: {item_name} ({current}/{total})")
modal_prog.set_status(f"{status} Channel: {item_name} ({current}/{total})")
self.app.call_from_thread(modal_prog.write_to_log, f"[yellow]Starting Channel Cloning...[/yellow]")
modal_prog.write_to_log(f"[yellow]Starting Channel Cloning...[/yellow]")
self.engine.is_running = True
# Use False for force. UI logic should have an intermediate confirmation for Force in future iterations
# For this unified TUI, we default to sync
cloned_info = await migrate_channels(self.engine, progress_callback=update_progress, force=False)
self.app.call_from_thread(modal_prog.write_to_log, "[bold green]Server Template cloned![/bold green]")
modal_prog.write_to_log("[bold green]Server Template cloned![/bold green]")
if cloned_info and cloned_info.get("structure"):
await log_audit_event(self.engine, "Server Template Cloned", "Successfully cloned target structure.")
@ -738,37 +735,37 @@ class ShuttleScreen(Screen):
await log_audit_event(self.engine, "Server Template Cloned", "No new channels were cloned.")
except Exception as e:
self.app.call_from_thread(modal_prog.write_to_log, f"[bold red]Error during channel clone: {e}[/bold red]")
modal_prog.write_to_log(f"[bold red]Error during channel clone: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.set_status("Finished.")
modal_prog.allow_close()
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_roles(self) -> None:
modal_prog = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
is_stoat = getattr(self.config, 'use_stoat', False)
roles_mod = stoat_roles if is_stoat else fluxer_roles
try:
self.app.call_from_thread(modal_prog.set_status, "Fetching roles...")
modal_prog.set_status("Fetching roles...")
await self.engine.start_connections()
self.app.call_from_thread(modal_prog.write_to_log, "Syncing role state...")
modal_prog.write_to_log("Syncing role state...")
await roles_mod.sync_roles_state(self.engine)
async def update_progress(item_name: str, current: int, total: int):
self.app.call_from_thread(modal_prog.set_status, f"Copying Role: {item_name} ({current}/{total})")
modal_prog.set_status(f"Copying Role: {item_name} ({current}/{total})")
self.app.call_from_thread(modal_prog.write_to_log, f"[yellow]Starting Role Migration...[/yellow]")
modal_prog.write_to_log(f"[yellow]Starting Role Migration...[/yellow]")
self.engine.is_running = True
cloned_roles = await roles_mod.migrate_roles(self.engine, progress_callback=update_progress, force=False)
self.app.call_from_thread(modal_prog.write_to_log, "[bold green]Role migration complete![/bold green]")
modal_prog.write_to_log("[bold green]Role migration complete![/bold green]")
if cloned_roles:
await log_audit_event(self.engine, "Roles Cloned", "Successfully cloned roles.")
@ -776,79 +773,79 @@ class ShuttleScreen(Screen):
await log_audit_event(self.engine, "Roles Cloned", "No new roles were cloned.")
except Exception as e:
self.app.call_from_thread(modal_prog.write_to_log, f"[bold red]Error during role clone: {e}[/bold red]")
modal_prog.write_to_log(f"[bold red]Error during role clone: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.set_status("Finished.")
modal_prog.allow_close()
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_emojis(self) -> None:
modal_prog = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
is_stoat = getattr(self.config, 'use_stoat', False)
emo_mod = stoat_emoji_stickers if is_stoat else fluxer_emoji_stickers
try:
self.app.call_from_thread(modal_prog.set_status, "Fetching emojis...")
modal_prog.set_status("Fetching emojis...")
await self.engine.start_connections()
self.app.call_from_thread(modal_prog.write_to_log, "Syncing emoji & sticker state...")
modal_prog.write_to_log("Syncing emoji & sticker state...")
await emo_mod.sync_emojis_state(self.engine)
await emo_mod.sync_stickers_state(self.engine)
async def update_progress(item_name: str, item_type: str, current: int, total: int):
self.app.call_from_thread(modal_prog.set_status, f"Copying {item_type}: {item_name} ({current}/{total})")
modal_prog.set_status(f"Copying {item_type}: {item_name} ({current}/{total})")
self.app.call_from_thread(modal_prog.write_to_log, f"[yellow]Starting Asset Migration...[/yellow]")
modal_prog.write_to_log(f"[yellow]Starting Asset Migration...[/yellow]")
self.engine.is_running = True
# Using force=False
res_e, res_s = await emo_mod.migrate_emojis(self.engine, progress_callback=update_progress, force=False)
self.app.call_from_thread(modal_prog.write_to_log, "[bold green]Asset migration complete![/bold green]")
modal_prog.write_to_log("[bold green]Asset migration complete![/bold green]")
await log_audit_event(self.engine, "Assets Cloned", f"E: {res_e} S: {res_s}")
except Exception as e:
self.app.call_from_thread(modal_prog.write_to_log, f"[bold red]Error during asset migrate: {e}[/bold red]")
modal_prog.write_to_log(f"[bold red]Error during asset migrate: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.set_status("Finished.")
modal_prog.allow_close()
@work(exclusive=True, thread=True)
@work(exclusive=True)
async def run_meta(self) -> None:
modal_prog = ProgressModal()
self.app.call_from_thread(self.app.push_screen, modal_prog)
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
is_stoat = getattr(self.config, 'use_stoat', False)
meta_mod = stoat_metadata if is_stoat else fluxer_metadata
try:
self.app.call_from_thread(modal_prog.set_status, "Connecting to platforms...")
modal_prog.set_status("Connecting to platforms...")
await self.engine.start_connections()
def cb(item, status):
self.app.call_from_thread(modal_prog.write_to_log, f"{item}: {status}")
modal_prog.write_to_log(f"{item}: {status}")
self.app.call_from_thread(modal_prog.write_to_log, f"[yellow]Starting Meta Migration...[/yellow]")
modal_prog.write_to_log(f"[yellow]Starting Meta Migration...[/yellow]")
self.engine.is_running = True
await meta_mod.migrate_server_metadata(self.engine, progress_callback=cb)
self.app.call_from_thread(modal_prog.write_to_log, "[bold green]Meta migration complete![/bold green]")
modal_prog.write_to_log("[bold green]Meta migration complete![/bold green]")
except Exception as e:
self.app.call_from_thread(modal_prog.write_to_log, f"[bold red]Error during meta migrate: {e}[/bold red]")
modal_prog.write_to_log(f"[bold red]Error during meta migrate: {e}[/bold red]")
finally:
await self.engine.close_connections()
self.engine.is_running = False
self.app.call_from_thread(modal_prog.set_status, "Finished.")
self.app.call_from_thread(modal_prog.allow_close)
modal_prog.set_status("Finished.")
modal_prog.allow_close()
# -------------------------------------------------------------------------

View file

@ -1,68 +0,0 @@
import sys
import asyncio
import logging
from src.ui.unifier_tui import run_unifier_tui
def setup_logging():
level = logging.INFO
handlers = [logging.FileHandler('.unifier.log', mode='a')]
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=level,
handlers=handlers
)
def relaunch_in_terminal():
"""Detects if running without a terminal on Linux and relaunches in one."""
import os
import sys
import subprocess
import shutil
if sys.platform != "linux":
return
is_tty = sys.stdin.isatty() or sys.stdout.isatty()
if is_tty or os.environ.get("DISCO_UNIFIER_RELAUNCHED"):
return
terminals = [
("gnome-terminal", ["--"]),
("ptyxis", ["--"]),
("x-terminal-emulator", ["-e"]),
("kgx", ["-e"]),
("konsole", ["-e"]),
("xfce4-terminal", ["-e"]),
("xterm", ["-e"]),
]
executable = sys.executable if not getattr(sys, 'frozen', False) else os.path.abspath(sys.argv[0])
args = [executable] + sys.argv[1:]
env = os.environ.copy()
env["DISCO_UNIFIER_RELAUNCHED"] = "1"
for term, cmd_args in terminals:
if shutil.which(term):
try:
subprocess.Popen([term] + cmd_args + args, env=env)
sys.exit(0)
except Exception:
continue
async def main():
relaunch_in_terminal()
setup_logging()
await run_unifier_tui()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nOperation terminated by user.")
sys.exit(0)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)