show preview for confirmation

This commit is contained in:
rambros 2026-03-03 22:42:41 +05:30
parent 217ec83e6d
commit 8cf97f9739

View file

@ -314,11 +314,43 @@ class ShuttlePane(Container):
modal = ProgressScreen() modal = ProgressScreen()
self.app.push_screen(modal) self.app.push_screen(modal)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
discord_started = False
try: try:
# Phase 1: Connect early to fetch source server structure for preview
modal.set_status("Connecting to Source Server (Discord) for Preview...")
try:
await self.engine.discord_reader.start()
discord_started = True
except Exception as e:
logger.warning(f"Could not pre-connect to Discord for Clone preview: {e}")
modal.set_status(f"Awaiting Confirmation for {len(selections)} Operations...") modal.set_status(f"Awaiting Confirmation for {len(selections)} Operations...")
# Fetch and display live preview from Discord
preview = await self._fetch_clone_preview(selections) if discord_started else {}
if "roles" in preview:
roles = preview["roles"]
modal.write(f"[bold cyan]Roles to be Cloned ({len(roles)}):[/bold cyan]")
for r in roles[:15]:
modal.write(f" - {r}")
if len(roles) > 15:
modal.write(f" [dim]... and {len(roles)-15} more[/dim]")
modal.write("")
if "structure" in preview:
structure = preview["structure"]
total_ch = sum(len(chans) for chans in structure.values())
modal.write(f"[bold cyan]Server Structure ({len(structure)} Categories, {total_ch} Channels):[/bold cyan]")
for cat_name, channels in structure.items():
modal.write(f" [bold yellow]📁 {cat_name}[/bold yellow]")
for ch_name in channels:
modal.write(f" - # {ch_name}")
modal.write("")
choice = await modal.phase_wait_confirm( choice = await modal.phase_wait_confirm(
btn_start_label="Start Cloning", btn_start_label="Start Cloning",
btn_id_label="Force Copy", btn_id_label="Force Clone (may create duplicates)",
show_id=True show_id=True
) )
if choice == "btn_back": if choice == "btn_back":
@ -333,6 +365,8 @@ class ShuttlePane(Container):
force_mode = (choice == "btn_start_id") force_mode = (choice == "btn_start_id")
modal.cancel_callback = lambda: setattr(self.engine, "is_running", False) modal.cancel_callback = lambda: setattr(self.engine, "is_running", False)
modal.phase_progress() modal.phase_progress()
# Re-confirm connections (reader is already started, writer starts now)
await self.engine.start_connections() await self.engine.start_connections()
self.engine.is_running = True self.engine.is_running = True
@ -789,89 +823,47 @@ class ShuttlePane(Container):
self.run_batch_danger(selections) self.run_batch_danger(selections)
self.app.push_screen(OptionSelectModal("⚠ DANGER ZONE ⚠", options), on_result) self.app.push_screen(OptionSelectModal("⚠ DANGER ZONE ⚠", options), on_result)
async def _analyze_danger_zone(self, selections: list[str]) -> dict:
"""Analyzes the target server to find items that will be deleted."""
results = {}
writer = self.engine.fluxer_writer if self.target_platform == "fluxer" else self.engine.writer
for sel in selections:
if sel == "dz_del_channels":
chans = await writer.get_channels()
# filter reaper logs
chans = [c for c in chans if str(c.get("name", "")).lower() not in ["reaper-logs", "reaper_logs"]]
results["channels"] = [c.get("name", "Unknown") for c in chans]
elif sel == "dz_del_roles":
roles = []
if self.target_platform == "fluxer":
role_data = await writer.client.get_guild_roles(writer.community_id)
roles = [r.get("name", "Unknown") for r in role_data if not r.get("managed") and r.get("name") != "@everyone"]
else:
server = await writer._get_server()
# Stoat roles are in a dict .roles
for r_id, role in server.roles.items():
if str(r_id) != writer.community_id:
roles.append(getattr(role, "name", "Unknown Role"))
results["roles"] = roles
elif sel == "dz_del_assets":
assets = []
if self.target_platform == "fluxer":
emojis = await writer.client.get_guild_emojis(writer.community_id)
stickers = await writer.client.get_guild_stickers(writer.community_id)
assets.extend([e.get("name", "Unknown Emoji") for e in emojis])
assets.extend([s.get("name", "Unknown Sticker") for s in stickers])
else:
server = await writer._get_server()
emojis = await server.fetch_emojis()
assets.extend([getattr(e, "name", "Unknown Emoji") for e in emojis])
results["assets"] = assets
elif sel == "dz_reset_perms":
# For reset perms, we just list channels affected.
if "channels" not in results:
chans = await writer.get_channels()
chans = [c for c in chans if str(c.get("name", "")).lower() not in ["reaper-logs", "reaper_logs"]]
results["channels_perms"] = [c.get("name", "Unknown") for c in chans]
else:
results["channels_perms"] = results["channels"]
return results
@work(exclusive=True) @work(exclusive=True)
async def run_batch_danger(self, selections: list[str]) -> None: async def run_batch_danger(self, selections: list[str]) -> None:
modal = ProgressScreen() modal = ProgressScreen()
self.app.push_screen(modal) self.app.push_screen(modal)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
target_started = False
try: try:
modal.set_status("Analyzing Target Server...") # Phase 1: Connect early to fetch real item names for preview
modal.write("[bold cyan]Analyzing target server for destructive impact...[/bold cyan]") modal.set_status("Connecting to Target Server for Preview...")
try:
await self.engine.start_target_only() await self.engine.start_target_only()
analysis = await self._analyze_danger_zone(selections) target_started = True
except Exception as e:
logger.warning(f"Could not pre-connect for DZ preview: {e}")
modal.set_status(f"Awaiting Confirmation for {len(selections)} Destructive Operations...") modal.set_status(f"Awaiting Confirmation for {len(selections)} Destructive Operations...")
modal.write("[bold red]WARNING: THIS WILL DELETE DATA PERMANENTLY! MUST PROCEED TO CONTINUE.[/bold red]") modal.write("[bold red]WARNING: THIS WILL DELETE DATA PERMANENTLY! MUST PROCEED TO CONTINUE.[/bold red]")
modal.write("") modal.write("")
modal.write("[bold cyan]Scheduled for Destruction/Reset:[/bold cyan]")
if "channels" in analysis: # Fetch and display live item names from target server
names = analysis["channels"] preview = await self._fetch_dz_preview(selections) if target_started else {}
modal.write(f"- [red]Channels ({len(names)}):[/red] {', '.join(names[:10])}{'...' if len(names) > 10 else ''}")
if "roles" in analysis: dz_labels = {
names = analysis["roles"] "dz_del_channels": "Channels & Categories to be Deleted",
modal.write(f"- [red]Roles ({len(names)}):[/red] {', '.join(names[:10])}{'...' if len(names) > 10 else ''}") "dz_reset_perms": "Channels with Permissions to Reset",
"dz_del_roles": "Roles to be Deleted",
if "assets" in analysis: "dz_del_assets": "Emojis & Stickers to be Deleted"
names = analysis["assets"] }
modal.write(f"- [red]Emojis/Stickers ({len(names)}):[/red] {', '.join(names[:10])}{'...' if len(names) > 10 else ''}") for sel in selections:
section_title = dz_labels.get(sel, sel)
if "channels_perms" in analysis and "channels" not in analysis: items = preview.get(sel, [])
names = analysis["channels_perms"] if items:
modal.write(f"- [red]Permission Resets on {len(names)} Channels[/red]") modal.write(f"[bold cyan]{section_title} ({len(items)}):[/bold cyan]")
for name in items[:20]: # cap at 20 to avoid flooding the log
modal.write("") modal.write(f" [red]- {name}[/red]")
if len(items) > 20:
modal.write(f" [dim]... and {len(items) - 20} more[/dim]")
else:
modal.write(f"[bold cyan]{section_title}:[/bold cyan]")
modal.write(f" [dim](could not fetch list)[/dim]")
modal.write("")
choice = await modal.phase_wait_confirm(btn_start_label="WIPE ALL DATA", show_id=False) choice = await modal.phase_wait_confirm(btn_start_label="WIPE ALL DATA", show_id=False)
if choice == "btn_back": if choice == "btn_back":
@ -886,7 +878,9 @@ class ShuttlePane(Container):
modal.cancel_callback = lambda: setattr(self.engine, "is_running", False) modal.cancel_callback = lambda: setattr(self.engine, "is_running", False)
modal.phase_progress() modal.phase_progress()
self.engine.is_running = True self.engine.is_running = True
# Connection already started above # Writer already started above, no need to reconnect
if not target_started:
await self.engine.start_target_only()
for sel in selections: for sel in selections:
if not self.engine.is_running: break if not self.engine.is_running: break
@ -902,12 +896,117 @@ class ShuttlePane(Container):
modal.phase_report("Danger Zone Operations Complete") modal.phase_report("Danger Zone Operations Complete")
modal.write("[bold green]All selected destructive operations finished.[/bold green]") modal.write("[bold green]All selected destructive operations finished.[/bold green]")
except Exception as e: except Exception as e:
modal.write(f"[bold red]Error during analysis: {e}[/bold red]") modal.write(f"[bold red]Error: {e}[/bold red]")
modal.phase_report("Danger Zone Batch", "error") modal.phase_report("Danger Zone Batch", "error")
finally: finally:
self.engine.is_running = False self.engine.is_running = False
await self.engine.close_target_only() await self.engine.close_target_only()
async def _fetch_dz_preview(self, selections: list[str]) -> dict[str, list[str]]:
"""Fetches real item names from the target server for each Danger Zone selection.
Returns a dict mapping selection ID -> list of item names."""
preview: dict[str, list[str]] = {}
writer = self.engine.writer
is_fluxer = self.target_platform == "fluxer"
try:
if "dz_del_channels" in selections or "dz_reset_perms" in selections:
channels_raw = await writer.get_channels()
protected = ["reaper-logs", "reaper_logs"]
channel_names = [
c.get("name", "Unknown") for c in channels_raw
if c.get("type") != 4 and str(c.get("name", "")).lower() not in protected
]
category_names = [
c.get("name", "Unknown") for c in channels_raw
if c.get("type") == 4 and str(c.get("name", "")).lower() not in protected
]
all_names = category_names + channel_names
if "dz_del_channels" in selections:
preview["dz_del_channels"] = all_names
if "dz_reset_perms" in selections:
preview["dz_reset_perms"] = channel_names # only channels, not categories
except Exception as e:
logger.warning(f"DZ preview: failed to fetch channels: {e}")
try:
if "dz_del_roles" in selections:
if is_fluxer:
community_id = self.engine.config.target_server_id
roles_raw = await writer.client.get_guild_roles(community_id)
role_names = [
r.get("name", "Unknown") for r in roles_raw
if not r.get("managed") and r.get("name") != "@everyone"
]
else:
# Stoat: server.roles is a dict {id: Role}
server = await writer._get_server()
role_names = [
role.name for role in server.roles.values()
if str(role.id) != writer.community_id
]
preview["dz_del_roles"] = role_names
except Exception as e:
logger.warning(f"DZ preview: failed to fetch roles: {e}")
try:
if "dz_del_assets" in selections:
asset_names = []
if is_fluxer:
community_id = self.engine.config.target_server_id
emojis = await writer.client.get_guild_emojis(community_id)
asset_names += [f"{e.get('name', '?')} (emoji)" for e in emojis]
try:
stickers = await writer.client.get_guild_stickers(community_id)
asset_names += [f"{s.get('name', '?')} (sticker)" for s in stickers]
except Exception:
pass # Stickers may not exist
else:
server = await writer._get_server()
emojis = await server.fetch_emojis()
asset_names += [f"{e.name} (emoji)" for e in emojis]
preview["dz_del_assets"] = asset_names
except Exception as e:
logger.warning(f"DZ preview: failed to fetch assets: {e}")
return preview
async def _fetch_clone_preview(self, selections: list[str]) -> dict[str, Any]:
"""Fetches preview data from Discord (source server) for cloning confirmation."""
preview = {}
reader = self.engine.discord_reader
try:
if "sub_clone_roles" in selections:
roles = await reader.get_roles()
preview["roles"] = [r.name for r in roles]
except Exception as e:
logger.warning(f"Clone Preview: failed to fetch roles: {e}")
try:
if "sub_clone_channels" in selections:
# Build hierarchy
categories = await reader.get_categories()
channels = await reader.get_channels()
# group channels by category parent
structure = {}
# Handle categorization
for cat in categories:
# In discord.py fetch_channels() returns full objects
structure[cat.name] = [ch.name for ch in channels if ch.category_id == cat.id]
# Handle uncategorized
uncategorized = [ch.name for ch in channels if ch.category_id is None]
if uncategorized:
structure["No Category"] = uncategorized
preview["structure"] = structure
except Exception as e:
logger.warning(f"Clone Preview: failed to fetch structure: {e}")
return preview
async def _logic_dz_delete_channels(self, modal: ProgressScreen) -> None: async def _logic_dz_delete_channels(self, modal: ProgressScreen) -> None:
if self.target_platform == "fluxer": if self.target_platform == "fluxer":
from src.fluxer.danger_zone import danger_delete_all_channels from src.fluxer.danger_zone import danger_delete_all_channels