improve permissions check for all platforms

This commit is contained in:
rambros 2026-03-21 16:08:31 +05:30
parent 7230d0cfcc
commit 2c7937a05b
4 changed files with 230 additions and 178 deletions

View file

@ -126,14 +126,29 @@ class DiscordReader:
results["server"] = True results["server"] = True
results["server_name"] = guild.name results["server_name"] = guild.name
# Check intents # Check intents (fetch app info with strict 1s timeout to prevent hang)
results["intents"]["message_content"] = temp_client.intents.message_content try:
import asyncio
app_info = await asyncio.wait_for(temp_client.application_info(), timeout=1.5)
flags = app_info.flags
has_msg_content = getattr(flags, 'gateway_message_content', False) or getattr(flags, 'gateway_message_content_limited', False)
has_members = getattr(flags, 'gateway_guild_members', False) or getattr(flags, 'gateway_guild_members_limited', False)
results["intents"]["message_content"] = has_msg_content
results["intents"]["members"] = has_members
except Exception as e:
logger.debug(f"Failed to check application intents (timed out/error): {e}")
# Fallback to true since the code requested it
results["intents"]["message_content"] = True
results["intents"]["members"] = True
# Check permissions # Check permissions
try: try:
member = await guild.fetch_member(temp_client.user.id) member = await guild.fetch_member(temp_client.user.id)
perms = member.guild_permissions perms = member.guild_permissions
results["permissions"]["view_channel"] = perms.view_channel results["permissions"]["view_channel"] = perms.view_channel
results["permissions"]["read_messages"] = perms.read_messages
results["permissions"]["read_message_history"] = perms.read_message_history results["permissions"]["read_message_history"] = perms.read_message_history
except Exception as e: except Exception as e:
logger.debug(f"Member fetch failed: {e}") logger.debug(f"Member fetch failed: {e}")

View file

@ -107,11 +107,7 @@ class FluxerWriter:
community_name = None community_name = None
error_reason = None error_reason = None
permissions = { permissions = {
"manage_channels": False, "administrator": False
"manage_messages": False,
"manage_roles": False,
"manage_emojis_stickers": False,
"manage_webhooks": False
} }
try: try:
@ -139,53 +135,40 @@ class FluxerWriter:
"permissions": permissions "permissions": permissions
} }
# Check community # Check community and permissions concurrently
try: try:
guild_data = await self.client.get_guild(self.community_id) # 1. Fetch data concurrently
guild_data, member_data, roles_data = await asyncio.gather(
self.client.get_guild(self.community_id),
self.client.get_guild_member(self.community_id, me_id),
self.client.get_guild_roles(self.community_id)
)
if guild_data: if guild_data:
is_community_valid = True is_community_valid = True
community_name = guild_data.get("name") community_name = guild_data.get("name")
owner_id = int(guild_data.get("owner_id", 0))
if me_id:
try: # 2. Compute effective permissions
# Fetch member to get roles member_role_ids = {int(r) for r in member_data.get("roles", [])}
member_data = await self.client.get_guild_member(self.community_id, me_id) computed_perms = 0
member_role_ids = [int(r) for r in member_data.get("roles", [])] guild_id_int = int(self.community_id)
# Fetch all roles to get their permissions for r_data in roles_data:
all_roles_data = await self.client.get_guild_roles(self.community_id) r_id = int(r_data["id"])
# Add permissions for @everyone (role ID == guild ID) or roles the bot has
# Calculate total permissions if r_id == guild_id_int or r_id in member_role_ids:
# In Discord/Fluxer, permissions are additive computed_perms |= int(r_data.get("permissions", 0))
total_perms = 0
fluxer_guild_id = 0 # 3. Check for Administrator bypass (Guild Owner or Administrator bit 1<<3)
try: is_admin = (me_id == owner_id) or bool(computed_perms & (1 << 3))
fluxer_guild_id = int(self.community_id)
except (ValueError, TypeError): # 4. Map permissions dictionary
pass permissions["administrator"] = is_admin
for r_data in all_roles_data:
r_id = int(r_data["id"])
# Add @everyone permissions (role ID same as guild ID)
if r_id == fluxer_guild_id or r_id in member_role_ids:
total_perms |= int(r_data.get("permissions", 0))
# Bitmask Mapping (Discord standard)
is_admin = bool(total_perms & (1 << 3))
permissions["manage_channels"] = is_admin or bool(total_perms & (1 << 4))
permissions["manage_messages"] = is_admin or bool(total_perms & (1 << 13))
permissions["manage_roles"] = is_admin or bool(total_perms & (1 << 28))
permissions["manage_webhooks"] = is_admin or bool(total_perms & (1 << 29))
permissions["manage_emojis_stickers"] = is_admin or bool(total_perms & (1 << 30))
except Exception as e:
logger.error(f"Failed to calculate Fluxer permissions: {e}")
error_reason = f"Permission error: {str(e)}"
else: else:
error_reason = "Community not found" error_reason = "Community not found"
except Exception as e: except Exception as e:
error_reason = f"Community Error: {str(e)}" error_reason = f"Community/Permission Error: {str(e)}"
except Exception as e: except Exception as e:
error_reason = str(e) error_reason = str(e)

View file

@ -108,9 +108,6 @@ class StoatWriter:
return self._server return self._server
async def validate(self) -> dict: async def validate(self) -> dict:
if self._validation_cache:
return self._validation_cache
results = { results = {
"token": False, "token": False,
"community": False, "community": False,
@ -122,7 +119,14 @@ class StoatWriter:
"manage_server": False, "manage_server": False,
"manage_permissions": False, "manage_permissions": False,
"manage_roles": False, "manage_roles": False,
"manage_customization": False "manage_customization": False,
"manage_messages": False,
"manage_webhooks": False,
"view_channel": False,
"send_messages": False,
"send_embeds": False,
"upload_files": False,
"masquerade": False
} }
} }
@ -162,18 +166,18 @@ class StoatWriter:
perms = server.permissions_for(me, safe=False) perms = server.permissions_for(me, safe=False)
results["permissions"] = { results["permissions"] = {
"manage_channels": perms.manage_channels, "manage_channels": getattr(perms, "manage_channels", False),
"manage_server": perms.manage_server, "manage_server": getattr(perms, "manage_server", False),
"manage_permissions": perms.manage_permissions, "manage_permissions": getattr(perms, "manage_permissions", False),
"manage_roles": perms.manage_roles, "manage_roles": getattr(perms, "manage_roles", False),
"manage_customization": perms.manage_customization, "manage_customization": getattr(perms, "manage_customization", False),
"manage_messages": perms.manage_messages, "manage_messages": getattr(perms, "manage_messages", False),
"send_messages": perms.send_messages, "manage_webhooks": getattr(perms, "manage_webhooks", False),
"masquerade": perms.use_masquerade, "view_channel": getattr(perms, "view_channel", False),
"upload_files": perms.upload_files, "send_messages": getattr(perms, "send_messages", False),
"react": perms.react, "send_embeds": getattr(perms, "send_embeds", False),
"mention_everyone": perms.mention_everyone, "upload_files": getattr(perms, "upload_files", False),
"mention_roles": perms.mention_roles "masquerade": getattr(perms, "use_masquerade", False)
} }
except stoat.NotFound: except stoat.NotFound:
results["error_reason"] = "Bot not member of server" results["error_reason"] = "Bot not member of server"
@ -191,7 +195,6 @@ class StoatWriter:
except Exception as e: except Exception as e:
results["error_reason"] = f"Stoat validation failed: {str(e)}" results["error_reason"] = f"Stoat validation failed: {str(e)}"
self._validation_cache = results
return results return results
async def get_channels(self) -> List[Dict[str, Any]]: async def get_channels(self) -> List[Dict[str, Any]]:

View file

@ -14,7 +14,7 @@ from typing import Any, Optional, Union, List, Dict, Callable
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.containers import Container, Vertical, Horizontal, VerticalScroll from textual.containers import Container, Vertical, Horizontal, VerticalScroll
from textual.widgets import Button, Label, Rule from textual.widgets import Button, Label, Rule, LoadingIndicator
from textual import work from textual import work
from src.core.configuration import load_config from src.core.configuration import load_config
@ -86,13 +86,16 @@ class OperationPane(Container):
DEFAULT_CSS = """ DEFAULT_CSS = """
OperationPane { height: auto; width: 100%; } OperationPane { height: auto; width: 100%; }
OperationPane #op_info { OperationPane #op_info {
height: auto; border: tall yellow; padding: 1; margin-bottom: 1; layout: vertical; height: auto; border: tall yellow; padding: 1 1 0 1; margin-bottom: 1; layout: vertical;
} }
#op_info_split { height: auto; layout: horizontal; width: 100%; margin-bottom: 1; } #op_info_split { height: auto; layout: horizontal; width: 100%; margin-bottom: 1; }
.info_pane { width: 1fr; height: auto; } .info_pane { width: 1fr; height: auto; }
.info_pane Label { width: 100%; } .info_pane Label { width: 100%; }
.pane_header { text-style: bold; color: $accent; margin-bottom: 1; } .pane_header { text-style: bold; color: $accent; margin-bottom: 1; }
.pane_status { text-style: bold; margin-top: 1; } .status_row { height: auto; min-height: 1; width: 100%; margin-top: 1; }
.status_row Label { width: 100%; height: auto; }
.status_row LoadingIndicator { width: 8; height: 1; margin: 0; min-width: 8; }
.pane_status { text-style: bold; }
#op_info_split Rule { height: 100%; margin: 0 2; color: $accent; } #op_info_split Rule { height: 100%; margin: 0 2; color: $accent; }
#op_lbl_backup { display: none; } #op_lbl_backup { display: none; }
@ -126,7 +129,10 @@ class OperationPane(Container):
yield Label("Source: [yellow]Loading...[/yellow]", id="op_lbl_d_bot") yield Label("Source: [yellow]Loading...[/yellow]", id="op_lbl_d_bot")
else: else:
yield Label("Bot: [yellow]Loading...[/yellow]", id="op_lbl_d_bot") yield Label("Bot: [yellow]Loading...[/yellow]", id="op_lbl_d_bot")
yield Label("Status: [yellow]Validating...[/yellow]", id="op_lbl_d_status", classes="pane_status")
with Horizontal(classes="status_row"):
yield LoadingIndicator(id="op_d_loader")
yield Label("", id="op_lbl_d_status", classes="pane_status")
yield Rule(orientation="vertical", id="op_vrule") yield Rule(orientation="vertical", id="op_vrule")
@ -134,7 +140,10 @@ class OperationPane(Container):
yield Label("Target", id="op_lbl_t_header", classes="pane_header") yield Label("Target", id="op_lbl_t_header", classes="pane_header")
yield Label("Community: [yellow]Loading...[/yellow]", id="op_lbl_t_comm") yield Label("Community: [yellow]Loading...[/yellow]", id="op_lbl_t_comm")
yield Label("Bot: [yellow]Loading...[/yellow]", id="op_lbl_t_bot") yield Label("Bot: [yellow]Loading...[/yellow]", id="op_lbl_t_bot")
yield Label("Status: [yellow]Validating...[/yellow]", id="op_lbl_t_status", classes="pane_status")
with Horizontal(classes="status_row"):
yield LoadingIndicator(id="op_t_loader")
yield Label("", id="op_lbl_t_status", classes="pane_status")
yield Label("", id="op_lbl_backup") yield Label("", id="op_lbl_backup")
@ -153,7 +162,8 @@ class OperationPane(Container):
def on_mount(self) -> None: def on_mount(self) -> None:
self._rebuild_engine() self._rebuild_engine()
self.run_validate() # Wait for DOM to be stable before first validation
self.call_after_refresh(self.run_validate)
def on_show(self) -> None: def on_show(self) -> None:
"""Re-validate when the pane regains visibility.""" """Re-validate when the pane regains visibility."""
@ -220,10 +230,12 @@ class OperationPane(Container):
d_name = v.get("discord_server_name") d_name = v.get("discord_server_name")
d_bot = v.get("discord_bot_name") d_bot = v.get("discord_bot_name")
if v.get("discord_timeout"): if v.get("discord_validating"):
s_disp, b_disp = "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]"
elif v.get("discord_timeout"):
s_disp, b_disp = "[red]TIMEOUT[/red]", "[red]TIMEOUT[/red]" s_disp, b_disp = "[red]TIMEOUT[/red]", "[red]TIMEOUT[/red]"
elif v.get("discord_token") and v.get("discord_server"): elif v.get("discord_token") and v.get("discord_server"):
s_disp = f'[green]"{d_name}"[/green]' s_disp = f'[cyan]"{d_name}"[/cyan]'
b_disp = f'[green]{d_bot}[/green]' b_disp = f'[green]{d_bot}[/green]'
elif v.get("discord_token") and not v.get("discord_server"): elif v.get("discord_token") and not v.get("discord_server"):
s_disp, b_disp = "[red]SERVER NOT SELECTED[/red]", f"[green]{d_bot}[/green]" s_disp, b_disp = "[red]SERVER NOT SELECTED[/red]", f"[green]{d_bot}[/green]"
@ -240,13 +252,13 @@ class OperationPane(Container):
else: else:
s_disp, b_disp = "[red]NOT SET UP[/red]", "[red]NOT SET UP[/red]" s_disp, b_disp = "[red]NOT SET UP[/red]", "[red]NOT SET UP[/red]"
self.query_one("#op_lbl_d_server", Label).update(f"Server: {s_disp}") for lbl in self.query("#op_lbl_d_server"): lbl.update(f"Server: {s_disp}")
if self.view_mode == "backup": if self.view_mode == "backup":
self.query_one("#op_lbl_d_bot", Label).update(f"Source: {b_disp}") for lbl in self.query("#op_lbl_d_bot"): lbl.update(f"Source: {b_disp}")
elif self.config.tool_mode == "backup_transfer": elif self.config.tool_mode == "backup_transfer":
self.query_one("#op_lbl_d_bot", Label).update(f"Source: {b_disp}") for lbl in self.query("#op_lbl_d_bot"): lbl.update(f"Source: {b_disp}")
else: else:
self.query_one("#op_lbl_d_bot", Label).update(f"Bot: {b_disp}") for lbl in self.query("#op_lbl_d_bot"): lbl.update(f"Bot: {b_disp}")
# Discord Side Status # Discord Side Status
d_err = v.get("discord_error") d_err = v.get("discord_error")
@ -255,61 +267,82 @@ class OperationPane(Container):
d_missing = [] d_missing = []
if d_err is None and v.get("discord_token") and v.get("discord_server"): if d_err is None and v.get("discord_token") and v.get("discord_server"):
if not di.get("message_content"): d_missing.append("Message Intent") if not di.get("message_content"): d_missing.append("Message Content Intent")
if not dp.get("view_channel"): d_missing.append("View Channel") if not di.get("members"): d_missing.append("Server Members Intent")
if not dp.get("read_message_history"): d_missing.append("Msg History")
if not dp.get("view_channel"): d_missing.append("View Channels")
if not dp.get("read_messages"): d_missing.append("Read Messages")
if not dp.get("read_message_history"): d_missing.append("Read Message History")
if v.get("discord_token") and v.get("discord_server") and not d_missing: if v.get("discord_validating"):
d_status = "[green]VALID[/green]" d_status = ""
elif v.get("discord_token") and not v.get("discord_server"): for ldr in self.query("#op_d_loader"): ldr.display = True
d_status = "[red]SERVER NOT SET[/red]" for lbl in self.query("#op_lbl_d_status"): lbl.display = False
elif v.get("discord_timeout"):
d_status = "[red]TIMEOUT[/red]"
elif d_err:
d_status = f"[red]{d_err}[/red]"
elif d_missing:
d_status = f"[yellow]MISSING: {', '.join(d_missing)}[/yellow]"
else: else:
d_status = "[red]INVALID[/red]" for ldr in self.query("#op_d_loader"): ldr.display = False
self.query_one("#op_lbl_d_status", Label).update(f"Status: {d_status}") for lbl in self.query("#op_lbl_d_status"): lbl.display = True
if v.get("discord_token") and v.get("discord_server") and not d_missing:
d_status = "STATUS: [green]VALID[/green]"
elif v.get("discord_token") and not v.get("discord_server"):
d_status = "[red]SERVER NOT SET[/red]"
elif v.get("discord_timeout"):
d_status = "[red]TIMEOUT[/red]"
elif d_err:
d_status = f"[red]{d_err}[/red]"
elif d_missing:
d_status = f"[yellow]MISSING: {', '.join(d_missing)}[/yellow]"
else:
d_status = "[red]INVALID[/red]"
if d_status:
for lbl in self.query("#op_lbl_d_status"): lbl.update(f"{d_status}")
# Target / Backup Info # Target / Backup Info
if self.view_mode == "backup": if self.view_mode == "backup":
backup_text = v.get("backup_info_text", "") backup_text = v.get("backup_info_text", "")
self.query_one("#op_lbl_backup", Label).update(backup_text) for lbl in self.query("#op_lbl_backup"):
self.query_one("#op_lbl_backup", Label).display = bool(backup_text) lbl.update(backup_text)
lbl.display = bool(backup_text)
# Hide target side in backup mode completely # Hide target side in backup mode completely
self.query_one("#op_vrule").display = False for rle in self.query("#op_vrule"): rle.display = False
self.query_one("#op_target_pane").display = False for pne in self.query("#op_target_pane"): pne.display = False
enabled = (v.get("discord_token") and v.get("discord_server") and not d_missing) enabled = (v.get("discord_token") and v.get("discord_server") and not d_missing)
for bid in ("#op_backup_msgs", "#op_backup_sync"): for bid in ("#op_backup_msgs", "#op_backup_sync"):
self.query_one(bid, Button).disabled = not enabled self.query_one(bid, Button).disabled = not enabled
self.query_one("#op_backup_stats", Button).display = self.has_backup for btn in self.query("#op_backup_stats"):
self.query_one("#op_backup_stats", Button).disabled = not self.has_backup btn.display = self.has_backup
self.query_one("#op_backup_stats_rule", Rule).display = self.has_backup btn.disabled = not self.has_backup
for rle in self.query("#op_backup_stats_rule"): rle.display = self.has_backup
else: else:
# Show target side in shuttle mode
for rle in self.query("#op_vrule"): rle.display = True
for pne in self.query("#op_target_pane"): pne.display = True
# Target # Target
plat = "Fluxer" if self.target_platform == "fluxer" else "Stoat" plat = "Fluxer" if self.target_platform == "fluxer" else "Stoat"
t_name = v.get("target_community_name") t_name = v.get("target_community_name")
t_bot = v.get("target_bot_name") t_bot = v.get("target_bot_name")
self.query_one("#op_lbl_t_header", Label).update(plat) for lbl in self.query("#op_lbl_t_header"): lbl.update(plat)
if v.get("target_timeout"): if v.get("target_validating"):
c_disp, tb_disp = "[yellow]Validating...[/yellow]", "[yellow]Validating...[/yellow]"
elif v.get("target_timeout"):
c_disp, tb_disp = "[red]TIMEOUT[/red]", "[red]TIMEOUT[/red]" c_disp, tb_disp = "[red]TIMEOUT[/red]", "[red]TIMEOUT[/red]"
elif v.get("target_token") and v.get("target_community"): elif v.get("target_token") and v.get("target_community"):
c_disp = f'[green]"{t_name}"[/green]' c_disp = f'[cyan]"{t_name}"[/cyan]'
tb_disp = f'[green]{t_bot}[/green]' tb_disp = f'[green]{t_bot}[/green]'
elif v.get("target_token") is False: elif v.get("target_token") is False:
c_disp, tb_disp = "[red]INVALID TOKEN[/red]", "[red]INVALID TOKEN[/red]" c_disp, tb_disp = "[red]INVALID TOKEN[/red]", "[red]INVALID TOKEN[/red]"
else: else:
c_disp, tb_disp = "[red]NOT SET UP[/red]", "[red]NOT SET UP[/red]" c_disp, tb_disp = "[red]NOT SET UP[/red]", "[red]NOT SET UP[/red]"
self.query_one("#op_lbl_t_comm", Label).update(f"Community: {c_disp}") for lbl in self.query("#op_lbl_t_comm"): lbl.update(f"Community: {c_disp}")
self.query_one("#op_lbl_t_bot", Label).update(f"Bot: {tb_disp}") for lbl in self.query("#op_lbl_t_bot"): lbl.update(f"Bot: {tb_disp}")
# Target Side Status # Target Side Status
t_err = v.get("target_error") t_err = v.get("target_error")
@ -320,18 +353,27 @@ class OperationPane(Container):
if tp: if tp:
t_missing = [k.replace('_', ' ').title() for k, val_p in tp.items() if not val_p] t_missing = [k.replace('_', ' ').title() for k, val_p in tp.items() if not val_p]
if v.get("target_token") and v.get("target_community") and not t_missing: if v.get("target_validating"):
t_status = "[green]VALID[/green]" t_status = ""
elif v.get("target_timeout"): for ldr in self.query("#op_t_loader"): ldr.display = True
t_status = "[red]TIMEOUT[/red]" for lbl in self.query("#op_lbl_t_status"): lbl.display = False
elif t_err:
t_status = f"[red]{t_err}[/red]"
elif t_missing:
# Show first two for brevity
t_status = f"[yellow]MISSING: {', '.join(t_missing[:2])}{'...' if len(t_missing)>2 else ''}[/yellow]"
else: else:
t_status = "[red]INVALID[/red]" for ldr in self.query("#op_t_loader"): ldr.display = False
self.query_one("#op_lbl_t_status", Label).update(f"Status: {t_status}") for lbl in self.query("#op_lbl_t_status"): lbl.display = True
if v.get("target_token") and v.get("target_community") and not t_missing:
t_status = "STATUS: [green]VALID[/green]"
elif v.get("target_timeout"):
t_status = "ERROR: [red]TIMEOUT[/red]"
elif t_err:
t_status = f"ERROR: [red]{t_err}[/red]"
elif t_missing:
t_status = f"[yellow]MISSING: {', '.join(t_missing)} Permission[/yellow]"
else:
t_status = "ERROR: [red]INVALID[/red]"
if t_status:
for lbl in self.query("#op_lbl_t_status"): lbl.update(f"{t_status}")
# Buttons # Buttons
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_danger"): for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_danger"):
@ -345,24 +387,26 @@ class OperationPane(Container):
return return
try: try:
plat = "Fluxer" if self.target_platform == "fluxer" else "Stoat" plat = "Fluxer" if self.target_platform == "fluxer" else "Stoat"
self.query_one("#op_lbl_t_header", Label).update(plat) # Use query().first() or check presence to avoid NoMatches crashes
self.query_one("#op_lbl_d_server", Label).update("Server: [yellow]Validating...[/yellow]") for lbl in self.query("#op_lbl_t_header"): lbl.update(plat)
self.query_one("#op_lbl_d_bot", Label).update("Source: [yellow]Validating...[/yellow]" if self.view_mode == "backup" else "Bot: [yellow]Validating...[/yellow]") for lbl in self.query("#op_lbl_d_server"): lbl.update("Server: [yellow]Validating...[/yellow]")
self.query_one("#op_lbl_d_status", Label).update("Status: [yellow]Validating...[/yellow]") for lbl in self.query("#op_lbl_d_bot"): lbl.update("Source: [yellow]Validating...[/yellow]" if self.view_mode == "backup" else "Bot: [yellow]Validating...[/yellow]")
self.query_one("#op_lbl_t_comm", Label).update("Community: [yellow]Validating...[/yellow]") for lbl in self.query("#op_lbl_t_comm"): lbl.update("Community: [yellow]Validating...[/yellow]")
self.query_one("#op_lbl_t_bot", Label).update("Bot: [yellow]Validating...[/yellow]") for lbl in self.query("#op_lbl_t_bot"): lbl.update("Bot: [yellow]Validating...[/yellow]")
self.query_one("#op_lbl_t_status", Label).update("Status: [yellow]Validating...[/yellow]")
# Disable all operation buttons while validation is in progress # Disable all operation buttons while validation is in progress
if self.view_mode == "shuttle": if self.view_mode == "shuttle":
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_danger"): for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_danger"):
self.query_one(bid, Button).disabled = True for btn in self.query(bid): btn.disabled = True
elif self.view_mode == "backup": elif self.view_mode == "backup":
for bid in ("#op_backup_msgs", "#op_backup_sync"): for bid in ("#op_backup_msgs", "#op_backup_sync"):
self.query_one(bid, Button).disabled = True for btn in self.query(bid): btn.disabled = True
except Exception: except Exception as e:
pass logger.error(f"Error in run_validate setup: {e}")
self.validation_results = { self.validation_results = {
"discord_validating": False,
"target_validating": False,
"discord_token": False, "discord_bot_name": None, "discord_token": False, "discord_bot_name": None,
"discord_server": False, "discord_server_name": None, "discord_server": False, "discord_server_name": None,
"discord_intents": {}, "discord_permissions": {}, "discord_intents": {}, "discord_permissions": {},
@ -392,28 +436,28 @@ class OperationPane(Container):
t_token_dummy = (self.config.target_bot_token or "") in fillers t_token_dummy = (self.config.target_bot_token or "") in fillers
t_server_dummy = (self.config.target_server_id or "") in fillers t_server_dummy = (self.config.target_server_id or "") in fillers
tasks = {} # Flag which operations are being validated
# We always validate Discord token if it's not a dummy, even if server ID is missing validating_discord = False
validating_target = False
if self.config.tool_mode == "backup_transfer" and self.view_mode == "shuttle": if self.config.tool_mode == "backup_transfer" and self.view_mode == "shuttle":
# Backup validation: only if we have a server ID to search for if not d_server_dummy: validating_discord = True
if not d_server_dummy:
tasks["discord"] = asyncio.create_task(self.engine.discord_reader.validate())
else: else:
if not d_token_dummy: if not d_token_dummy: validating_discord = True
tasks["discord"] = asyncio.create_task(self.engine.discord_reader.validate())
if self.view_mode == "shuttle" and not t_token_dummy: if self.view_mode == "shuttle" and not t_token_dummy:
tasks["target"] = asyncio.create_task(self.engine.writer.validate()) validating_target = True
all_tasks = list(tasks.values()) self.validation_results["discord_validating"] = validating_discord
try: self.validation_results["target_validating"] = validating_target
done = set()
if all_tasks: # Trigger the UI spinners instantly
done, _ = await asyncio.wait(all_tasks, timeout=10.0) self._update_info_labels()
dt = tasks.get("discord") async def check_discord():
if dt and dt in done: try:
res = dt.result() import asyncio
res = await asyncio.wait_for(self.engine.discord_reader.validate(), timeout=10.0)
self.validation_results["discord_token"] = res.get("token", False) self.validation_results["discord_token"] = res.get("token", False)
self.validation_results["discord_bot_name"] = res.get("bot_name") self.validation_results["discord_bot_name"] = res.get("bot_name")
self.validation_results["discord_server"] = res.get("server", False) self.validation_results["discord_server"] = res.get("server", False)
@ -421,59 +465,66 @@ class OperationPane(Container):
self.validation_results["discord_intents"] = res.get("intents", {}) self.validation_results["discord_intents"] = res.get("intents", {})
self.validation_results["discord_permissions"] = res.get("permissions", {}) self.validation_results["discord_permissions"] = res.get("permissions", {})
self.validation_results["discord_error"] = res.get("error_reason") self.validation_results["discord_error"] = res.get("error_reason")
elif dt and dt not in done: except asyncio.TimeoutError:
self.validation_results["discord_timeout"] = True self.validation_results["discord_timeout"] = True
dt.cancel() except asyncio.CancelledError:
pass
except Exception as e:
self.validation_results["discord_error"] = str(e)
finally:
self.validation_results["discord_validating"] = False
self._check_and_update()
tt = tasks.get("target") async def check_target():
if tt and tt in done: try:
res = tt.result() import asyncio
res = await asyncio.wait_for(self.engine.writer.validate(), timeout=10.0)
self.validation_results["target_token"] = res.get("token", False) self.validation_results["target_token"] = res.get("token", False)
self.validation_results["target_bot_name"] = res.get("bot_name") self.validation_results["target_bot_name"] = res.get("bot_name")
self.validation_results["target_community"] = res.get("community", False) self.validation_results["target_community"] = res.get("community", False)
self.validation_results["target_community_name"] = res.get("community_name") self.validation_results["target_community_name"] = res.get("community_name")
self.validation_results["target_permissions"] = res.get("permissions", {}) self.validation_results["target_permissions"] = res.get("permissions", {})
self.validation_results["target_error"] = res.get("error_reason") self.validation_results["target_error"] = res.get("error_reason")
elif tt and tt not in done: except asyncio.TimeoutError:
self.validation_results["target_timeout"] = True self.validation_results["target_timeout"] = True
tt.cancel() except asyncio.CancelledError:
pass
except Exception as e:
self.validation_results["target_error"] = str(e)
finally:
self.validation_results["target_validating"] = False
self._check_and_update()
discord_ok = self.validation_results.get("discord_token") and self.validation_results.get("discord_server") coros = []
if validating_discord: coros.append(check_discord())
if self.view_mode == "backup": if validating_target: coros.append(check_target())
self.tokens_valid = bool(discord_ok)
try:
if coros:
import asyncio
await asyncio.gather(*coros)
else:
self._check_and_update()
except asyncio.CancelledError:
pass
def _check_and_update(self) -> None:
"""Called safely on the main thread after any validation task finishes."""
v = self.validation_results
discord_ok = v.get("discord_token") and v.get("discord_server")
if self.view_mode == "backup":
self.tokens_valid = bool(discord_ok)
if self.tokens_valid:
info = self._get_backup_info() info = self._get_backup_info()
if info: if info:
self.validation_results["backup_info_text"] = f"Last backup: [cyan]{info}[/cyan]" self.validation_results["backup_info_text"] = f"Last backup: [cyan]{info}[/cyan]"
self.has_backup = True self.has_backup = True
else: else:
target_ok = self.validation_results.get("target_token") and self.validation_results.get("target_community") target_ok = v.get("target_token") and v.get("target_community")
self.tokens_valid = bool(discord_ok and target_ok) self.tokens_valid = bool(discord_ok and target_ok)
if self.tokens_valid and self.view_mode == "shuttle":
srv_id = self.config.target_server_id
srv_name = self.validation_results.get("target_community_name", "unknown")
if srv_id and srv_name:
safe = re.sub(r"[^a-zA-Z0-9_\-\.]", "_", srv_name)
self.engine.state.set_folder(str(srv_id), safe, base_dir=self._base_dir())
self.permissions_complete = True
if self.tokens_valid:
di = self.validation_results.get("discord_intents", {})
dp = self.validation_results.get("discord_permissions", {})
if not all([di.get("message_content"), dp.get("view_channel"), dp.get("read_message_history")]):
self.permissions_complete = False
if self.view_mode == "shuttle":
tp = self.validation_results.get("target_permissions", {})
if tp and not all(tp.values()):
self.permissions_complete = False
except Exception:
pass
finally:
for t in all_tasks:
if t and not t.done():
t.cancel()
self._update_info_labels() self._update_info_labels()