backup auto test in tui

This commit is contained in:
rambros 2026-03-30 22:42:03 +05:30
parent bd80760667
commit 2d33b16d2c

View file

@ -165,7 +165,7 @@ class OperationPane(Container):
yield Button("Danger Zone ⚠", id="op_danger", variant="error", disabled=True, flat=True, tooltip="Dangerous operations:\ndelete channels, roles, emojis on target\n(use with caution)") yield Button("Danger Zone ⚠", id="op_danger", variant="error", disabled=True, flat=True, tooltip="Dangerous operations:\ndelete channels, roles, emojis on target\n(use with caution)")
if self.cfg_name == "AutoTest": if self.cfg_name == "AutoTest":
yield Button("RUN AUTO TEST", id="op_autotest", variant="warning", flat="false", disabled=True, tooltip="Execute automated test sequence for the AutoTest profile") yield Button("RUN AUTO TEST", id="op_autotest", variant="warning", flat="false", tooltip="Execute automated test sequence for the AutoTest profile")
def on_mount(self) -> None: def on_mount(self) -> None:
self._rebuild_engine() self._rebuild_engine()
@ -329,7 +329,7 @@ class OperationPane(Container):
for pne in self.query("#op_target_pane"): pne.display = False for pne in self.query("#op_target_pane"): pne.display = False
enabled = (v.get("discord_token") and v.get("discord_server") and not d_missing) enabled = (v.get("discord_token") and v.get("discord_server") and not d_missing)
for bid in ("#op_backup_msgs", "#op_backup_sync"): for bid in ("#op_backup_msgs", "#op_backup_sync", "#op_autotest"):
for btn in self.query(bid): btn.disabled = not enabled for btn in self.query(bid): btn.disabled = not enabled
for btn in self.query("#op_backup_stats"): for btn in self.query("#op_backup_stats"):
@ -593,7 +593,7 @@ class OperationPane(Container):
from src.ui.backup_stats import BackupStatsScreen from src.ui.backup_stats import BackupStatsScreen
target_dir = Path(self._base_dir()) / f"DISCORD_BACKUP-{self.config.discord_server_id}" target_dir = Path(self._base_dir()) / f"DISCORD_BACKUP-{self.config.discord_server_id}"
self.app.push_screen(BackupStatsScreen(self.cfg_name, target_dir)) self.app.push_screen(BackupStatsScreen(self.cfg_name, target_dir))
elif bid == "op_autotest": elif bid == "op_autotest" or bid == "btn_autotest":
self.run_autotest_sequence() self.run_autotest_sequence()
@work(exclusive=True) @work(exclusive=True)
@ -667,39 +667,37 @@ class OperationPane(Container):
else: else:
modal.write("\n[bold yellow]Phase 5: Skipping Waterfall (Live mode selected)[/bold yellow]") modal.write("\n[bold yellow]Phase 5: Skipping Waterfall (Live mode selected)[/bold yellow]")
# 7. Individual Channel Migration (Automated) # 7. Individual Channel Migration (Automated)
modal.write("\n[bold cyan]Phase 7: Individual Channel Migration[/bold cyan]") modal.write("\n[bold cyan]Phase 7: Individual Channel Migration[/bold cyan]")
await self._logic_autotest_migrate_all_channels(modal=modal) await self._logic_autotest_migrate_all_channels(modal=modal)
async def _run_backup_autotest_logic(self, modal: ProgressScreen) -> None: async def _run_backup_autotest_logic(self, modal: ProgressScreen) -> None:
"""Executes the full backup test sequence.""" """Executes the full backup test sequence."""
import shutil
modal.set_status("AUTO TEST: Launching Backup Sequence...") modal.set_status("AUTO TEST: Launching Backup Sequence...")
modal.write("[bold yellow]Starting Backup Auto-Test...[/bold yellow]") modal.write("[bold yellow]Starting Backup Auto-Test...[/bold yellow]")
# 1. Clear old backup # 1. Clear old backup directory completely
modal.set_status("Clearing old backup database...") server_dir = Path(self._base_dir()) / f"DISCORD_BACKUP-{self.config.discord_server_id}"
db_path = Path(self._base_dir()) / f"DISCORD_BACKUP-{self.config.discord_server_id}" / "backup.db" if server_dir.exists():
if db_path.exists(): modal.write(f"[yellow]Deleting existing backup directory: {server_dir}[/yellow]")
modal.write(f"[yellow]Deleting existing database: {db_path.name}[/yellow]") try:
db_path.unlink() shutil.rmtree(server_dir)
except Exception as e:
modal.write(f"[red]Warning: Could not delete directory: {e}[/red]")
# 2. Setup exporter # 2. Setup & Metadata
modal.set_status("Initializing Discord connection...")
await self.engine.discord_reader.start() await self.engine.discord_reader.start()
await self.exporter.setup() await self.exporter.setup()
self.exporter.is_running = True self.exporter.is_running = True
# 3. Full Backup
modal.write("\n[bold cyan]Phase 1: Full Server Backup[/bold cyan]") modal.write("\n[bold cyan]Phase 1: Full Server Backup[/bold cyan]")
modal.show_stats()
await self.exporter.export_metadata()
await self.exporter.download_server_assets()
await self.exporter.export_channels_structure()
await self.exporter.export_roles()
await self.exporter.export_assets()
# 3. Use unified backup logic in autotest mode
all_channels = await self.engine.discord_reader.get_channels()
eligible_channels = [ eligible_channels = [
c for c in await self.engine.discord_reader.get_channels() c for c in all_channels
if c.type in [ if c.type in [
self.engine.discord_reader.CHANNEL_TYPE_TEXT, self.engine.discord_reader.CHANNEL_TYPE_TEXT,
self.engine.discord_reader.CHANNEL_TYPE_NEWS, self.engine.discord_reader.CHANNEL_TYPE_NEWS,
@ -707,23 +705,12 @@ class OperationPane(Container):
] ]
] ]
total_chans = len(eligible_channels) await self._logic_full_backup(
for i, chan in enumerate(eligible_channels): modal=modal,
if not self.exporter.is_running: break selected_channels=eligible_channels,
force_overwrite=True,
modal.set_item_status(f"Backing up ({i+1}/{total_chans}): #{chan.name}") is_autotest=True
modal.set_progress(i, total_chans) )
async def update_backup(name, count, author=None, preview=None, threads=0, files=0):
modal.update_stats(messages=str(count), threads=str(threads), files=str(files))
await self.exporter.export_channel_messages(
chan.id, progress_callback=update_backup, force=True
)
modal.write(f"[green]Completed: #{chan.name}[/green]")
modal.set_progress(total_chans, total_chans)
# ── (1) clone server template (combined) ───────────────────────────── # ── (1) clone server template (combined) ─────────────────────────────
def _open_clone_menu(self): def _open_clone_menu(self):
@ -2289,6 +2276,7 @@ class OperationPane(Container):
@work(exclusive=True) @work(exclusive=True)
async def run_backup_messages(self) -> None: async def run_backup_messages(self) -> None:
"""UI entry point for full backup."""
modal_prog = ProgressScreen(log_level=self.config.log_level) modal_prog = ProgressScreen(log_level=self.config.log_level)
self.app.push_screen(modal_prog) self.app.push_screen(modal_prog)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@ -2298,38 +2286,6 @@ class OperationPane(Container):
await self.engine.discord_reader.start() await self.engine.discord_reader.start()
await self.exporter.setup() await self.exporter.setup()
# Check if profile is empty
profile_exists = False
if self.exporter.db:
try:
profile_exists = self.exporter.db.get_guild_profile() is not None
except Exception:
profile_exists = False
if not profile_exists:
modal_prog.set_status("First-Time Setup: Exporting Server Profile...")
modal_prog.write("[yellow]No existing profile found. Performing primary profile backup...[/yellow]")
modal_prog.write("[yellow]Exporting server metadata...[/yellow]")
await self.exporter.export_metadata()
modal_prog.write("[yellow]Syncing server assets (icon/banner)...[/yellow]")
await self.exporter.download_server_assets()
modal_prog.write("[yellow]Exporting server structure...[/yellow]")
await self.exporter.export_channels_structure()
modal_prog.write("[yellow]Exporting roles & permissions...[/yellow]")
await self.exporter.export_roles()
modal_prog.write("[yellow]Exporting custom emojis & stickers...[/yellow]")
await self.exporter.export_assets()
modal_prog.write("[bold green]Primary profile setup complete![/bold green]")
modal_prog.write("")
else:
modal_prog.write("[dim]Existing profile detected. Scanning structure...[/dim]")
await self.exporter.export_channels_structure()
all_channels = await self.engine.discord_reader.get_channels() all_channels = await self.engine.discord_reader.get_channels()
all_categories = await self.engine.discord_reader.get_categories() all_categories = await self.engine.discord_reader.get_categories()
cat_map = {c.id: c.name for c in all_categories} cat_map = {c.id: c.name for c in all_categories}
@ -2348,8 +2304,9 @@ class OperationPane(Container):
modal_prog.allow_close() modal_prog.allow_close()
return return
any_found = False # Analyze which are already backed up
backed_up_ids = set() backed_up_ids = set()
any_found = False
if self.exporter.db: if self.exporter.db:
channel_stats = self.exporter.db.get_stats_by_channel() channel_stats = self.exporter.db.get_stats_by_channel()
for chan in eligible_channels: for chan in eligible_channels:
@ -2357,87 +2314,98 @@ class OperationPane(Container):
any_found = True any_found = True
backed_up_ids.add(chan.id) backed_up_ids.add(chan.id)
self.app.pop_screen() # Manual selection
loop = asyncio.get_running_loop()
future = loop.create_future()
def check_channels(reply: dict | None) -> None:
if not future.done(): future.set_result(reply)
while True: self.app.push_screen(
loop = asyncio.get_running_loop() ChannelSelectScreen(eligible_channels, cat_map, backed_up_ids, any_found),
future = loop.create_future() check_channels,
)
def check_channels(reply: dict | None) -> None: reply = await future
if not future.done(): if not reply: return
future.set_result(reply)
self.app.push_screen( selected_ids = reply["channels"]
ChannelSelectScreen(eligible_channels, cat_map, backed_up_ids, any_found), force_overwrite = reply["force"]
check_channels, selected_channels = [c for c in eligible_channels if c.id in selected_ids]
# Confirmation phase
new_channels = [c for c in selected_channels if c.id not in backed_up_ids]
existing_channels = [c for c in selected_channels if c.id in backed_up_ids]
modal_confirm = ProgressScreen(log_level=self.config.log_level)
self.app.push_screen(modal_confirm)
await asyncio.sleep(0.1)
modal_confirm.set_status(f"Confirm to proceed with Backup of [bold]{len(selected_channels)}[/bold] channels")
modal_confirm.show_info(f"[cyan]Backup Channels[/cyan]", f"{len(new_channels)} new, {len(existing_channels)} existing")
choice = await modal_confirm.phase_wait_confirm(btn_start_label="Start Channel Backup", show_id=False)
if choice != "btn_start_first":
modal_confirm.dismiss()
return
await self._logic_full_backup(
modal=modal_confirm,
selected_channels=selected_channels,
force_overwrite=force_overwrite,
is_autotest=False
)
except Exception as e:
logger.error(f"Backup Error: {traceback.format_exc()}")
modal_prog.write(f"[bold red]Backup failed: {e}[/bold red]")
modal_prog.phase_report("Backup", "error", show_back=False)
finally:
await self.engine.close_connections()
async def _logic_full_backup(self, modal: ProgressScreen, selected_channels: list, force_overwrite: bool, is_autotest: bool = False) -> None:
"""Non-interactive core backup logic."""
if not self.exporter.is_running:
self.exporter.is_running = True
try:
# 1. Metadata and Assets
modal.set_status("Exporting Server Profile...")
await self.exporter.export_metadata()
await self.exporter.download_server_assets()
await self.exporter.export_channels_structure()
await self.exporter.export_roles()
await self.exporter.export_assets()
# 2. Channel Messages
total_chans = len(selected_channels)
modal.write(f"\n[bold cyan]Backing up {total_chans} channels...[/bold cyan]")
modal.show_stats()
for i, chan in enumerate(selected_channels):
if not self.exporter.is_running: break
modal.set_item_status(f"[cyan]Processing ({i+1}/{total_chans}): #{chan.name}[/cyan]")
modal.set_progress(i, total_chans)
modal.write(f"[cyan]Backing up: #{chan.name}[/cyan]")
async def update_backup(name, count, author_name=None, message_preview=None, thread_count=0, file_count=0):
modal.update_stats(messages=str(count), threads=str(thread_count), files=str(file_count))
if author_name and message_preview and count % 20 == 0:
modal.write(f"[dim]{author_name}:[/dim] {message_preview}")
await self.exporter.export_channel_messages(
chan.id, progress_callback=update_backup, force=force_overwrite
) )
modal.write(f"[green]Completed: #{chan.name}[/green]")
reply = await future modal.set_progress(total_chans, total_chans)
if not reply: modal.write("[bold green]Backup complete![/bold green]")
return modal.phase_report("Full Backup", show_back=False)
selected_ids = reply["channels"] except Exception as e:
force_overwrite = reply["force"] modal.write(f"[bold red]Core backup failed: {e}[/bold red]")
selected_channels = [c for c in eligible_channels if c.id in selected_ids] logger.error(f"Core Backup Error: {traceback.format_exc()}")
raise e
# Phase 2: Confirmation
modal_prog = ProgressScreen(log_level=self.config.log_level) # Re-instantiate to avoid Textual re-push UI freeze
self.app.push_screen(modal_prog)
await asyncio.sleep(0.1)
new_channels = [c for c in selected_channels if c.id not in backed_up_ids]
existing_channels = [c for c in selected_channels if c.id in backed_up_ids]
server = getattr(self.engine.discord_reader, 'guild', None)
if server:
modal_prog.write(f"[bold cyan]Server Profile:[/bold cyan]")
modal_prog.write(f" Name: [green]{server.name}[/green]")
modal_prog.write(f" Icon: [green]{'Present' if server.icon else 'None'}[/green]")
modal_prog.write("")
modal_prog.set_status(f"Confirm to proceed with Backup of [bold]{len(selected_channels)}[/bold] channels")
modal_prog.show_info(f"[cyan]Backup Channels[/cyan]", f"{len(new_channels)} new, {len(existing_channels)} existing")
# Show categorized channel lists in the bottom log
if new_channels:
modal_prog.write("[bold green]New Backups to be created:[/bold green]")
for idx, c in enumerate(new_channels):
modal_prog.write(f" {idx+1}. #{c.name}")
if existing_channels:
action = "Overwritten" if force_overwrite else "Updated"
modal_prog.write(f"[bold yellow]\nExisting backups to be {action}:[/bold yellow]")
for idx, c in enumerate(existing_channels):
modal_prog.write(f" {idx+1}. #{c.name}")
choice = await modal_prog.phase_wait_confirm(btn_start_label="Start Channel Backup", show_id=False)
if choice == "btn_back":
modal_prog.dismiss()
continue
elif choice == "btn_start_id":
loop = asyncio.get_running_loop()
future = loop.create_future()
def id_callback(res: int | None) -> None:
if not future.done():
future.set_result(res)
id_modal = MessageIDInputModal(self.engine.discord_reader, selected_channels[0].id)
self.app.push_screen(id_modal, id_callback)
verified_id = await future
if verified_id is None:
# User cancelled the ID input
continue
after_id = verified_id
elif choice == "btn_main_menu":
modal_prog.dismiss()
return
# If we are here, proceeding either via Start First or Start from ID (after_id)
if choice == "btn_start_first":
after_id = None
break
modal_prog.phase_progress() modal_prog.phase_progress()
modal_prog.show_stats() modal_prog.show_stats()