server metadata cloning in stoat
This commit is contained in:
parent
17d18bdb1e
commit
396efe49c7
5 changed files with 405 additions and 57 deletions
|
|
@ -6,65 +6,58 @@ logger = logging.getLogger(__name__)
|
||||||
async def log_audit_event(context: MigrationContext, title: str, description: str, files: list[dict] | None = None) -> None:
|
async def log_audit_event(context: MigrationContext, title: str, description: str, files: list[dict] | None = None) -> None:
|
||||||
"""
|
"""
|
||||||
Logs an event by sending a summary to the `#reaper-logs` audit channel.
|
Logs an event by sending a summary to the `#reaper-logs` audit channel.
|
||||||
If the channel does not exist, it will dynamically create it and hide it from @everyone.
|
If the channel does not exist, it will dynamically create it.
|
||||||
"""
|
"""
|
||||||
# 1. Initialize or Validate channel
|
# 1. Initialize or Validate channel
|
||||||
channel_id = context.state.audit_log_channel
|
channel_id = context.state.audit_log_channel
|
||||||
|
|
||||||
if channel_id:
|
if channel_id:
|
||||||
# Verify it still exists in Fluxer to avoid 404 errors
|
|
||||||
try:
|
try:
|
||||||
fluxer_channels = await context.fluxer_writer.get_channels()
|
channels = await context.writer.get_channels()
|
||||||
channel_exists = any(str(ch.get("id")) == str(channel_id) for ch in fluxer_channels)
|
channel_exists = any(str(ch.get("id")) == str(channel_id) for ch in channels)
|
||||||
if not channel_exists:
|
if not channel_exists:
|
||||||
logger.info(f"Audit channel {channel_id} no longer exists in Fluxer. Resetting.")
|
|
||||||
context.state.audit_log_channel = None
|
context.state.audit_log_channel = None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to verify audit channel existence: {e}")
|
logger.warning(f"Failed to verify audit channel existence: {e}")
|
||||||
|
|
||||||
if not context.state.audit_log_channel:
|
if not context.state.audit_log_channel:
|
||||||
logger.info("Audit log channel not found or invalid in state. Checking Fluxer community...")
|
|
||||||
try:
|
try:
|
||||||
# Check if it already exists in the community but isn't in state
|
channels = await context.writer.get_channels()
|
||||||
channels = await context.fluxer_writer.get_channels()
|
|
||||||
channel_id = None
|
channel_id = None
|
||||||
|
|
||||||
for ch in channels:
|
for ch in channels:
|
||||||
name = str(ch.get("name", "")).lower()
|
name = str(ch.get("name", "")).lower()
|
||||||
if name in ["reaper-logs", "reaper_logs"]:
|
if name in ["reaper-logs", "reaper_logs"]:
|
||||||
channel_id = str(ch.get("id"))
|
channel_id = str(ch.get("id"))
|
||||||
logger.info(f"Found existing audit channel: {channel_id}")
|
|
||||||
break
|
break
|
||||||
|
|
||||||
if not channel_id:
|
if not channel_id:
|
||||||
logger.info("Audit log channel not found. Creating #reaper-logs.")
|
channel_id = await context.writer.create_channel(
|
||||||
# Create channel
|
|
||||||
channel_id = await context.fluxer_writer.create_channel(
|
|
||||||
name="reaper-logs",
|
name="reaper-logs",
|
||||||
topic="Fluxer Reaper - Migration audit logs.",
|
topic="Discord Reaper - Migration audit logs.",
|
||||||
type=0
|
type=0
|
||||||
)
|
)
|
||||||
|
|
||||||
# Immediately lock down 'View Channel' (1024) for @everyone (community_id)
|
context.state.audit_log_channel = channel_id
|
||||||
await context.fluxer_writer.set_channel_permission(
|
|
||||||
|
# For Fluxer, we still do the lockdown as it's proven to work
|
||||||
|
if context.target_platform == "fluxer":
|
||||||
|
await context.writer.set_channel_permission(
|
||||||
channel_id=channel_id,
|
channel_id=channel_id,
|
||||||
overwrite_id=context.config.fluxer_community_id, # @everyone matches community ID
|
overwrite_id=context.writer.community_id,
|
||||||
allow=0,
|
allow=0,
|
||||||
deny=1024,
|
deny=1024,
|
||||||
is_role=True
|
is_role=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save permanently
|
|
||||||
context.state.audit_log_channel = channel_id
|
|
||||||
context.state.save_state()
|
context.state.save_state()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to setup audit log channel: {e}")
|
logger.error(f"Failed to setup audit log channel: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. Format and send the message natively through FluxerBot (avoiding impersonation webhook for admin logs)
|
# 2. Format and send the message
|
||||||
content = f"**[{title}]**\n{description}"
|
content = f"**[{title}]**\n{description}"
|
||||||
try:
|
try:
|
||||||
await context.fluxer_writer.send_marker(context.state.audit_log_channel, content, files=files)
|
await context.writer.send_marker(context.state.audit_log_channel, content, files=files)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to send audit log event: {e}")
|
logger.error(f"Failed to send audit log event: {e}")
|
||||||
|
|
|
||||||
157
src/stoat/roles_permissions.py
Normal file
157
src/stoat/roles_permissions.py
Normal file
|
|
@ -0,0 +1,157 @@
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from typing import Callable, Awaitable
|
||||||
|
|
||||||
|
from src.core.base import MigrationContext
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
async def sync_roles_state(context: MigrationContext):
|
||||||
|
"""
|
||||||
|
Scans Stoat for roles matching Discord names and updates stoat.state.json mappings.
|
||||||
|
"""
|
||||||
|
discord_roles = await context.discord_reader.get_roles()
|
||||||
|
server = await context.stoat_writer._get_server()
|
||||||
|
stoat_roles = list(server.roles.values())
|
||||||
|
|
||||||
|
# Build name -> id maps and ID sets for Stoat for fast lookup
|
||||||
|
stoat_role_map = {r.name: str(r.id) for r in stoat_roles if r.name}
|
||||||
|
stoat_role_ids = {str(r.id) for r in stoat_roles}
|
||||||
|
|
||||||
|
updates = 0
|
||||||
|
removals = 0
|
||||||
|
|
||||||
|
# Verify and Sync Roles
|
||||||
|
for role in discord_roles:
|
||||||
|
discord_id = str(role.id)
|
||||||
|
stoat_id = context.state.get_target_role_id(discord_id)
|
||||||
|
|
||||||
|
if stoat_id:
|
||||||
|
if stoat_id not in stoat_role_ids:
|
||||||
|
context.state.remove_role_mapping(discord_id)
|
||||||
|
removals += 1
|
||||||
|
elif role.name in stoat_role_map:
|
||||||
|
context.state.set_role_mapping(discord_id, stoat_role_map[role.name])
|
||||||
|
updates += 1
|
||||||
|
|
||||||
|
if updates > 0 or removals > 0:
|
||||||
|
logger.info(f"Role sync: {updates} mapped, {removals} stale mappings removed")
|
||||||
|
|
||||||
|
|
||||||
|
async def sync_permissions(context: MigrationContext, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None) -> dict:
|
||||||
|
"""Syncs category and channel role overrides/permissions."""
|
||||||
|
categories = await context.discord_reader.get_categories()
|
||||||
|
channels = await context.discord_reader.get_channels()
|
||||||
|
|
||||||
|
# Only sync for items that are already mapped
|
||||||
|
categories = [c for c in categories if context.state.get_target_category_id(str(c.id))]
|
||||||
|
channels = [c for c in channels if context.state.get_target_channel_id(str(c.id))]
|
||||||
|
|
||||||
|
synced_info = {
|
||||||
|
"categories_synced": [],
|
||||||
|
"channels_synced": [],
|
||||||
|
"structure": {} # category_name -> [channel_names]
|
||||||
|
}
|
||||||
|
|
||||||
|
total = len(categories) + len(channels)
|
||||||
|
current_idx = 0
|
||||||
|
|
||||||
|
if total == 0:
|
||||||
|
return synced_info
|
||||||
|
|
||||||
|
async def _sync_overwrites(discord_item, stoat_id):
|
||||||
|
"""Helper to sync role overwrites for a given channel or category."""
|
||||||
|
for target, overwrite in discord_item.overwrites.items():
|
||||||
|
if type(target).__name__ == "Role":
|
||||||
|
discord_role_id = str(target.id)
|
||||||
|
# Handle @everyone role special case
|
||||||
|
if discord_role_id == str(context.config.discord_server_id):
|
||||||
|
# In Stoat, @everyone is usually the community ID
|
||||||
|
stoat_role_id = str(context.stoat_writer.community_id)
|
||||||
|
is_role = False # Use set_default_permissions logic
|
||||||
|
else:
|
||||||
|
stoat_role_id = context.state.get_target_role_id(discord_role_id)
|
||||||
|
is_role = True
|
||||||
|
|
||||||
|
if not stoat_role_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
allow_val, deny_val = overwrite.pair()
|
||||||
|
await context.stoat_writer.set_channel_permission(
|
||||||
|
channel_id=stoat_id,
|
||||||
|
overwrite_id=stoat_role_id,
|
||||||
|
allow=allow_val.value,
|
||||||
|
deny=deny_val.value,
|
||||||
|
is_role=is_role
|
||||||
|
)
|
||||||
|
|
||||||
|
# Dictionary to map category names to their synced channels
|
||||||
|
cat_name_map = {str(cat.id): cat.name for cat in (await context.discord_reader.get_categories())}
|
||||||
|
|
||||||
|
# Sync Category Permissions (Role Overwrites)
|
||||||
|
for cat in categories:
|
||||||
|
if not context.is_running: break
|
||||||
|
stoat_id = context.state.get_target_category_id(str(cat.id))
|
||||||
|
if stoat_id:
|
||||||
|
try:
|
||||||
|
await _sync_overwrites(cat, stoat_id)
|
||||||
|
synced_info["categories_synced"].append(cat.name)
|
||||||
|
if cat.name not in synced_info["structure"]:
|
||||||
|
synced_info["structure"][cat.name] = []
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed syncing permissions for category {cat.name}: {e}")
|
||||||
|
|
||||||
|
current_idx += 1
|
||||||
|
if progress_callback: await progress_callback(f"Cat: {cat.name}", current_idx, total)
|
||||||
|
|
||||||
|
# Sync Channel Permissions
|
||||||
|
for channel in channels:
|
||||||
|
if not context.is_running: break
|
||||||
|
stoat_id = context.state.get_target_channel_id(str(channel.id))
|
||||||
|
if stoat_id:
|
||||||
|
try:
|
||||||
|
await _sync_overwrites(channel, stoat_id)
|
||||||
|
synced_info["channels_synced"].append(channel.name)
|
||||||
|
|
||||||
|
parent_name = cat_name_map.get(str(channel.category_id), "No Category") if channel.category_id else "No Category"
|
||||||
|
if parent_name not in synced_info["structure"]:
|
||||||
|
synced_info["structure"][parent_name] = []
|
||||||
|
synced_info["structure"][parent_name].append(channel.name)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed syncing permissions for channel {channel.name}: {e}")
|
||||||
|
|
||||||
|
current_idx += 1
|
||||||
|
if progress_callback: await progress_callback(channel.name, current_idx, total)
|
||||||
|
|
||||||
|
return synced_info
|
||||||
|
|
||||||
|
|
||||||
|
async def migrate_roles(context: MigrationContext, progress_callback: Callable[[str, int, int], Awaitable[None]] | None = None, force: bool = False) -> list[str]:
|
||||||
|
"""Copies roles and their baseline permissions. Returns a list of cloned role names."""
|
||||||
|
roles = await context.discord_reader.get_roles()
|
||||||
|
|
||||||
|
if not force:
|
||||||
|
roles = [r for r in roles if not context.state.get_target_role_id(str(r.id))]
|
||||||
|
|
||||||
|
total = len(roles)
|
||||||
|
cloned_role_names = []
|
||||||
|
|
||||||
|
if total == 0:
|
||||||
|
return cloned_role_names
|
||||||
|
|
||||||
|
for idx, role in enumerate(roles):
|
||||||
|
if not context.is_running: break
|
||||||
|
|
||||||
|
stoat_id = await context.stoat_writer.create_role(
|
||||||
|
name=role.name,
|
||||||
|
color=role.color.value,
|
||||||
|
hoist=role.hoist
|
||||||
|
)
|
||||||
|
if stoat_id:
|
||||||
|
context.state.set_role_mapping(str(role.id), stoat_id)
|
||||||
|
cloned_role_names.append(role.name)
|
||||||
|
|
||||||
|
if progress_callback: await progress_callback(role.name, idx + 1, total)
|
||||||
|
await asyncio.sleep(context.config.migration.rate_limit_delay_seconds)
|
||||||
|
|
||||||
|
return cloned_role_names
|
||||||
55
src/stoat/server_metadata.py
Normal file
55
src/stoat/server_metadata.py
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
import logging
|
||||||
|
from typing import Callable, Awaitable, List
|
||||||
|
|
||||||
|
from src.core.base import MigrationContext
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
async def sync_server_metadata(context: MigrationContext, progress_callback: Callable[[str, str], Awaitable[None]], components: List[str] = ["name", "icon", "banner"]) -> dict:
|
||||||
|
"""Syncs the server name, logo and banner. Returns a dict of cloned attributes."""
|
||||||
|
metadata = await context.discord_reader.get_server_metadata()
|
||||||
|
cloned_data = {}
|
||||||
|
|
||||||
|
# 1. Sync Name
|
||||||
|
if "name" in components:
|
||||||
|
try:
|
||||||
|
name = metadata.get("name")
|
||||||
|
await context.writer.update_guild_metadata(name=name)
|
||||||
|
cloned_data["name"] = name
|
||||||
|
await progress_callback("Server Name", "DONE")
|
||||||
|
except Exception:
|
||||||
|
await progress_callback("Server Name", "ERROR")
|
||||||
|
|
||||||
|
# 2. Sync Icon
|
||||||
|
if "icon" in components:
|
||||||
|
try:
|
||||||
|
icon_bytes = None
|
||||||
|
if context.discord_reader.guild and context.discord_reader.guild.icon:
|
||||||
|
icon_bytes = await context.discord_reader.download_asset(context.discord_reader.guild.icon)
|
||||||
|
|
||||||
|
if icon_bytes:
|
||||||
|
await context.writer.update_guild_metadata(icon=icon_bytes)
|
||||||
|
cloned_data["icon"] = icon_bytes
|
||||||
|
await progress_callback("Server Icon", "DONE")
|
||||||
|
else:
|
||||||
|
await progress_callback("Server Icon", "SKIP")
|
||||||
|
except Exception:
|
||||||
|
await progress_callback("Server Icon", "ERROR")
|
||||||
|
|
||||||
|
# 3. Sync Banner
|
||||||
|
if "banner" in components:
|
||||||
|
try:
|
||||||
|
banner_bytes = None
|
||||||
|
if context.discord_reader.guild and context.discord_reader.guild.banner:
|
||||||
|
banner_bytes = await context.discord_reader.download_asset(context.discord_reader.guild.banner)
|
||||||
|
|
||||||
|
if banner_bytes:
|
||||||
|
await context.writer.update_guild_metadata(banner=banner_bytes)
|
||||||
|
cloned_data["banner"] = banner_bytes
|
||||||
|
await progress_callback("Server Banner", "DONE")
|
||||||
|
else:
|
||||||
|
await progress_callback("Server Banner", "SKIP")
|
||||||
|
except Exception:
|
||||||
|
await progress_callback("Server Banner", "ERROR")
|
||||||
|
|
||||||
|
return cloned_data
|
||||||
|
|
@ -11,13 +11,22 @@ class StoatWriter:
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.client = stoat.Client(token=self.token, bot=True)
|
self.client = stoat.Client(token=self.token, bot=True)
|
||||||
# We don't fetch the server here to avoid slowing down startup if not needed
|
|
||||||
# but we might want a helper to get it
|
|
||||||
self._server = None
|
self._server = None
|
||||||
|
self._me = None
|
||||||
|
try:
|
||||||
|
self._me = await self.client.fetch_user("@me")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch bot user in StoatWriter: {e}")
|
||||||
|
|
||||||
async def _get_server(self):
|
@property
|
||||||
if not self._server:
|
def my_id(self):
|
||||||
self._server = await self.client.fetch_server(self.community_id)
|
return str(self._me.id) if self._me else None
|
||||||
|
|
||||||
|
async def _get_server(self, populate_channels=False):
|
||||||
|
# Always refetch if channels are requested to ensure we have them
|
||||||
|
# Stoat Server objects use __slots__, so we can't easily add our own tracking attributes.
|
||||||
|
if not self._server or populate_channels:
|
||||||
|
self._server = await self.client.fetch_server(self.community_id, populate_channels=populate_channels)
|
||||||
return self._server
|
return self._server
|
||||||
|
|
||||||
async def validate(self) -> dict:
|
async def validate(self) -> dict:
|
||||||
|
|
@ -91,10 +100,46 @@ class StoatWriter:
|
||||||
return results
|
return results
|
||||||
|
|
||||||
async def get_channels(self) -> List[Dict[str, Any]]:
|
async def get_channels(self) -> List[Dict[str, Any]]:
|
||||||
|
try:
|
||||||
|
server = await self._get_server(populate_channels=True)
|
||||||
|
channels = server.channels
|
||||||
|
results = []
|
||||||
|
for ch in channels:
|
||||||
|
# Map Stoat types to Fluxer/Discord integers for internal compatibility
|
||||||
|
# 0: Text, 2: Voice, 4: Category
|
||||||
|
ch_type = -1
|
||||||
|
if isinstance(ch, stoat.TextChannel):
|
||||||
|
ch_type = 0
|
||||||
|
elif isinstance(ch, stoat.VoiceChannel):
|
||||||
|
ch_type = 2
|
||||||
|
elif isinstance(ch, stoat.Category):
|
||||||
|
ch_type = 4
|
||||||
|
|
||||||
|
results.append({
|
||||||
|
"id": str(ch.id),
|
||||||
|
"name": getattr(ch, "title", ch.name),
|
||||||
|
"type": ch_type
|
||||||
|
})
|
||||||
|
return results
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch Stoat channels: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
async def create_channel(self, name: str, **kwargs) -> str:
|
async def create_channel(self, name: str, type: int = 0, topic: str = "", parent_id: Optional[str] = None, **kwargs) -> str:
|
||||||
return "dummy_stoat_channel_id"
|
server = await self._get_server()
|
||||||
|
try:
|
||||||
|
if type == 4: # Category
|
||||||
|
cat = await server.create_category(title=name)
|
||||||
|
return str(cat.id)
|
||||||
|
else: # Text Channel
|
||||||
|
ch = await server.create_text_channel(name=name, description=topic)
|
||||||
|
# If parent_id is provided, Stoat might need a separate call to move it?
|
||||||
|
# Actually server.create_text_channel might take category?
|
||||||
|
# Let's check the signature again.
|
||||||
|
return str(ch.id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create Stoat channel {name}: {e}")
|
||||||
|
return ""
|
||||||
|
|
||||||
async def modify_channel(self, channel_id: str, **kwargs) -> bool:
|
async def modify_channel(self, channel_id: str, **kwargs) -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
@ -105,11 +150,38 @@ class StoatWriter:
|
||||||
async def send_message(self, **kwargs) -> Optional[str]:
|
async def send_message(self, **kwargs) -> Optional[str]:
|
||||||
return "dummy_stoat_message_id"
|
return "dummy_stoat_message_id"
|
||||||
|
|
||||||
async def send_marker(self, **kwargs) -> Optional[str]:
|
async def send_marker(self, channel_id: str, content: str, files: Optional[List[Dict[str, Any]]] = None) -> Optional[str]:
|
||||||
return "dummy_stoat_marker_id"
|
try:
|
||||||
|
channel = await self.client.fetch_channel(channel_id)
|
||||||
|
# Stoat channel.send takes content
|
||||||
|
# files support might be different, skipping for markers
|
||||||
|
msg = await channel.send(content=content)
|
||||||
|
return str(msg.id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to send Stoat marker to {channel_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
async def create_role(self, **kwargs) -> str:
|
|
||||||
return "dummy_stoat_role_id"
|
async def create_role(self, name: str, color: int = 0, hoist: bool = False, **kwargs) -> str:
|
||||||
|
server = await self._get_server()
|
||||||
|
try:
|
||||||
|
# Create role first (Stoat create_role only takes name and rank)
|
||||||
|
role = await server.create_role(name=name)
|
||||||
|
|
||||||
|
# Convert integer color to hex string if not 0
|
||||||
|
hex_color = None
|
||||||
|
if color != 0:
|
||||||
|
hex_color = f"#{color:06x}"
|
||||||
|
|
||||||
|
# Edit role to set color and hoist
|
||||||
|
await role.edit(
|
||||||
|
color=hex_color if hex_color is not None else stoat.UNDEFINED,
|
||||||
|
hoist=hoist
|
||||||
|
)
|
||||||
|
return str(role.id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create Stoat role {name}: {e}")
|
||||||
|
return ""
|
||||||
|
|
||||||
async def create_emoji(self, name: str, image_bytes: bytes, **kwargs) -> str:
|
async def create_emoji(self, name: str, image_bytes: bytes, **kwargs) -> str:
|
||||||
server = await self._get_server()
|
server = await self._get_server()
|
||||||
|
|
@ -135,17 +207,80 @@ class StoatWriter:
|
||||||
logger.error(f"Failed to update Stoat guild metadata: {e}")
|
logger.error(f"Failed to update Stoat guild metadata: {e}")
|
||||||
|
|
||||||
async def remove_community_logo_and_banner(self) -> dict:
|
async def remove_community_logo_and_banner(self) -> dict:
|
||||||
return {"icon": "SKIP", "banner": "SKIP"}
|
server = await self._get_server()
|
||||||
|
has_icon = bool(server.icon)
|
||||||
|
has_banner = bool(server.banner)
|
||||||
|
|
||||||
|
if has_icon:
|
||||||
|
try:
|
||||||
|
await server.edit(icon=None)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to remove Stoat community icon: {e}")
|
||||||
|
|
||||||
|
if has_banner:
|
||||||
|
try:
|
||||||
|
await server.edit(banner=None)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to remove Stoat community banner: {e}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"icon": "REMOVED" if has_icon else "SKIP",
|
||||||
|
"banner": "REMOVED" if has_banner else "SKIP",
|
||||||
|
}
|
||||||
|
|
||||||
async def delete_all_channels(self, **kwargs) -> int:
|
async def delete_all_channels(self, **kwargs) -> int:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
async def reset_channel_permissions(self, **kwargs) -> int:
|
async def set_channel_permission(self, channel_id: str, overwrite_id: str, allow: int, deny: int, is_role: bool = True):
|
||||||
return 0
|
try:
|
||||||
|
channel = await self.client.fetch_channel(channel_id)
|
||||||
|
|
||||||
|
# Stoat Permissions objects are created from raw integers
|
||||||
|
allow_perms = stoat.Permissions(allow)
|
||||||
|
deny_perms = stoat.Permissions(deny)
|
||||||
|
|
||||||
|
# If overwrite_id is the community_id, it refers to the default permissions (@everyone)
|
||||||
|
if str(overwrite_id) == self.community_id:
|
||||||
|
override = stoat.PermissionOverride(allow=allow_perms, deny=deny_perms)
|
||||||
|
await channel.set_default_permissions(override)
|
||||||
|
elif is_role:
|
||||||
|
# Stoat uses set_role_permissions(role_id, allow=..., deny=...)
|
||||||
|
await channel.set_role_permissions(overwrite_id, allow=allow_perms, deny=deny_perms)
|
||||||
|
else:
|
||||||
|
# This case might not be directly supported for individual users in the same way
|
||||||
|
# but we'll try something if needed.
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to set Stoat channel permission for {overwrite_id} on {channel_id}: {e}")
|
||||||
|
|
||||||
async def delete_all_roles(self, **kwargs) -> int:
|
async def delete_all_roles(self, **kwargs) -> int:
|
||||||
|
server = await self._get_server()
|
||||||
|
# Stoat roles are in server.roles (dict) or fetch_roles()
|
||||||
|
# Let's use fetch_roles if available or access .roles
|
||||||
|
try:
|
||||||
|
# server.roles is a dict of {id: Role}
|
||||||
|
roles = list(server.roles.values())
|
||||||
|
except Exception:
|
||||||
|
# Fallback
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
deleted = 0
|
||||||
|
for role in roles:
|
||||||
|
# Skip @everyone (usually has rank 0 or special flag?)
|
||||||
|
# In Stoat, @everyone is usually the guild ID.
|
||||||
|
if str(role.id) == self.community_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check if managed/bot role - Stoat Role doesn't have a clear .managed property
|
||||||
|
# but we can try to guess or just attempt delete.
|
||||||
|
try:
|
||||||
|
await role.delete()
|
||||||
|
deleted += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Skipping role {role.name} (likely managed or @everyone): {e}")
|
||||||
|
|
||||||
|
return deleted
|
||||||
|
|
||||||
async def delete_all_emojis_and_stickers(self, **kwargs) -> dict:
|
async def delete_all_emojis_and_stickers(self, **kwargs) -> dict:
|
||||||
server = await self._get_server()
|
server = await self._get_server()
|
||||||
emojis = await server.fetch_emojis()
|
emojis = await server.fetch_emojis()
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,12 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskPr
|
||||||
from src.core.configuration import load_config, save_config
|
from src.core.configuration import load_config, save_config
|
||||||
from src.core.base import MigrationContext
|
from src.core.base import MigrationContext
|
||||||
from src.fluxer.clone_server import sync_channel_state, migrate_channels
|
from src.fluxer.clone_server import sync_channel_state, migrate_channels
|
||||||
from src.fluxer.roles_permissions import sync_roles_state, sync_permissions, migrate_roles
|
import src.fluxer.roles_permissions as fluxer_roles
|
||||||
|
import src.stoat.roles_permissions as stoat_roles
|
||||||
import src.fluxer.emoji_stickers as fluxer_emoji_stickers
|
import src.fluxer.emoji_stickers as fluxer_emoji_stickers
|
||||||
import src.stoat.emoji_stickers as stoat_emoji_stickers
|
import src.stoat.emoji_stickers as stoat_emoji_stickers
|
||||||
from src.fluxer.server_metadata import sync_server_metadata
|
import src.fluxer.server_metadata as fluxer_metadata
|
||||||
|
import src.stoat.server_metadata as stoat_metadata
|
||||||
from src.fluxer.migrate_message import analyze_migration, migrate_messages
|
from src.fluxer.migrate_message import analyze_migration, migrate_messages
|
||||||
from src.fluxer.danger_zone import danger_remove_logo_and_banner, danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers
|
from src.fluxer.danger_zone import danger_remove_logo_and_banner, danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers
|
||||||
from src.core.audit import log_audit_event
|
from src.core.audit import log_audit_event
|
||||||
|
|
@ -574,6 +576,9 @@ class MigrationCLI:
|
||||||
console.print("(2) Sync Category Permissions & Channel Permissions")
|
console.print("(2) Sync Category Permissions & Channel Permissions")
|
||||||
console.print("(B) Back")
|
console.print("(B) Back")
|
||||||
|
|
||||||
|
# Select appropriate role module
|
||||||
|
roles_mod = fluxer_roles if self.engine.target_platform == "fluxer" else stoat_roles
|
||||||
|
|
||||||
choice = Prompt.ask("Select an option [1/2/B]", choices=["1", "2", "B", "b"], default="B", show_choices=False).upper()
|
choice = Prompt.ask("Select an option [1/2/B]", choices=["1", "2", "B", "b"], default="B", show_choices=False).upper()
|
||||||
|
|
||||||
if choice == "B":
|
if choice == "B":
|
||||||
|
|
@ -583,26 +588,26 @@ class MigrationCLI:
|
||||||
try:
|
try:
|
||||||
await self.engine.start_connections()
|
await self.engine.start_connections()
|
||||||
|
|
||||||
with console.status("[yellow]Checking Fluxer for existing roles...[/yellow]"):
|
with console.status(f"[yellow]Checking {self.engine.target_platform.capitalize()} for existing roles...[/yellow]"):
|
||||||
await sync_roles_state(self.engine)
|
await roles_mod.sync_roles_state(self.engine)
|
||||||
|
|
||||||
roles = await self.engine.discord_reader.get_roles()
|
roles = await self.engine.discord_reader.get_roles()
|
||||||
|
|
||||||
table = Table(show_header=True, header_style="bold #4641D9")
|
table = Table(show_header=True, header_style="bold #4641D9")
|
||||||
table.add_column("Discord Role")
|
table.add_column("Discord Role")
|
||||||
table.add_column("Status", justify="center")
|
table.add_column("Status", justify="center")
|
||||||
table.add_column("Fluxer ID", justify="right")
|
table.add_column(f"{self.engine.target_platform.capitalize()} ID", justify="right")
|
||||||
|
|
||||||
cached_count = 0
|
cached_count = 0
|
||||||
for r in roles:
|
for r in roles:
|
||||||
fluxer_id = self.engine.state.get_fluxer_role_id(str(r.id))
|
target_id = self.engine.state.get_target_role_id(str(r.id))
|
||||||
status = "[bold green]NEW[/bold green]"
|
status = "[bold green]NEW[/bold green]"
|
||||||
fid_str = "[dim]N/A[/dim]"
|
tid_str = "[dim]N/A[/dim]"
|
||||||
if fluxer_id:
|
if target_id:
|
||||||
status = "[dim]already copied[/dim]"
|
status = "[dim]already copied[/dim]"
|
||||||
fid_str = f"[cyan]{fluxer_id}[/cyan]"
|
tid_str = f"[cyan]{target_id}[/cyan]"
|
||||||
cached_count += 1
|
cached_count += 1
|
||||||
table.add_row(r.name, status, fid_str)
|
table.add_row(r.name, status, tid_str)
|
||||||
|
|
||||||
console.print(table)
|
console.print(table)
|
||||||
|
|
||||||
|
|
@ -642,7 +647,7 @@ class MigrationCLI:
|
||||||
progress.update(role_task, total=total, completed=current, description=f"[cyan]Syncing Role: {item_name}")
|
progress.update(role_task, total=total, completed=current, description=f"[cyan]Syncing Role: {item_name}")
|
||||||
|
|
||||||
self.engine.is_running = True
|
self.engine.is_running = True
|
||||||
cloned_roles = await migrate_roles(self.engine, progress_callback=update_progress, force=force)
|
cloned_roles = await roles_mod.migrate_roles(self.engine, progress_callback=update_progress, force=force)
|
||||||
|
|
||||||
console.print("[bold green]Role migration complete![/bold green]")
|
console.print("[bold green]Role migration complete![/bold green]")
|
||||||
if cloned_roles:
|
if cloned_roles:
|
||||||
|
|
@ -665,8 +670,8 @@ class MigrationCLI:
|
||||||
categories = await self.engine.discord_reader.get_categories()
|
categories = await self.engine.discord_reader.get_categories()
|
||||||
channels = await self.engine.discord_reader.get_channels()
|
channels = await self.engine.discord_reader.get_channels()
|
||||||
|
|
||||||
mapped_cats = sum(1 for c in categories if self.engine.state.get_fluxer_category_id(str(c.id)))
|
mapped_cats = sum(1 for c in categories if self.engine.state.get_target_category_id(str(c.id)))
|
||||||
mapped_chs = sum(1 for c in channels if self.engine.state.get_fluxer_channel_id(str(c.id)))
|
mapped_chs = sum(1 for c in channels if self.engine.state.get_target_channel_id(str(c.id)))
|
||||||
total_mapped = mapped_cats + mapped_chs
|
total_mapped = mapped_cats + mapped_chs
|
||||||
|
|
||||||
console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]")
|
console.print(f"\n[yellow]Ready to sync permissions for {total_mapped} items ({mapped_cats} categories, {mapped_chs} channels).[/yellow]")
|
||||||
|
|
@ -693,7 +698,7 @@ class MigrationCLI:
|
||||||
progress.update(perm_task, total=total, completed=current, description=f"[cyan]Syncing: {item_name}")
|
progress.update(perm_task, total=total, completed=current, description=f"[cyan]Syncing: {item_name}")
|
||||||
|
|
||||||
self.engine.is_running = True
|
self.engine.is_running = True
|
||||||
synced_info = await sync_permissions(self.engine, progress_callback=update_progress)
|
synced_info = await roles_mod.sync_permissions(self.engine, progress_callback=update_progress)
|
||||||
|
|
||||||
console.print("[bold green]Permission synchronization complete![/bold green]")
|
console.print("[bold green]Permission synchronization complete![/bold green]")
|
||||||
|
|
||||||
|
|
@ -899,7 +904,10 @@ class MigrationCLI:
|
||||||
color = "green" if status == "DONE" else "red" if status == "ERROR" else "yellow"
|
color = "green" if status == "DONE" else "red" if status == "ERROR" else "yellow"
|
||||||
console.print(f"{item} [[bold {color}]{status}[/bold {color}]]")
|
console.print(f"{item} [[bold {color}]{status}[/bold {color}]]")
|
||||||
|
|
||||||
cloned_data = await sync_server_metadata(self.engine, progress_callback, components=components)
|
# Select appropriate metadata module
|
||||||
|
metadata_mod = fluxer_metadata if self.engine.target_platform == "fluxer" else stoat_metadata
|
||||||
|
|
||||||
|
cloned_data = await metadata_mod.sync_server_metadata(self.engine, progress_callback, components=components)
|
||||||
console.print("[bold green]Server profile sync finished![/bold green]")
|
console.print("[bold green]Server profile sync finished![/bold green]")
|
||||||
|
|
||||||
audit_lines = ["Successfully synchronized Community profile:"]
|
audit_lines = ["Successfully synchronized Community profile:"]
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue