server structure cloning in stoat

This commit is contained in:
rambros 2026-02-25 02:09:16 +05:30
parent 396efe49c7
commit bd7d8c68b6
4 changed files with 420 additions and 19 deletions

236
src/stoat/clone_server.py Normal file
View file

@ -0,0 +1,236 @@
import asyncio
import logging
import stoat
from typing import Callable, Awaitable
from src.core.base import MigrationContext
logger = logging.getLogger(__name__)
async def sync_channel_state(context: MigrationContext):
"""
Scans Stoat for channels matching Discord names and updates stoat.state.json mappings.
This prevents duplicate creation when the stoat.state.json is empty but channels exist in Stoat.
"""
categories = await context.discord_reader.get_categories()
channels = await context.discord_reader.get_channels()
target_channels = await context.writer.get_channels()
# Build name -> id map and ID set for Stoat for fast lookup
target_name_map = {c.get("name"): str(c.get("id")) for c in target_channels if c.get("name")}
target_id_set = {str(c.get("id")) for c in target_channels}
updates = 0
removals = 0
# 1. Verify and Sync Categories
for cat in categories:
discord_id = str(cat.id)
target_id = context.state.get_target_category_id(discord_id)
if target_id:
if target_id not in target_id_set:
context.state.remove_category_mapping(discord_id)
removals += 1
elif cat.name in target_name_map:
context.state.set_target_category_mapping(discord_id, target_name_map[cat.name])
updates += 1
# 2. Verify and Sync Channels
for ch in channels:
discord_id = str(ch.id)
target_id = context.state.get_target_channel_id(discord_id)
if target_id:
if target_id not in target_id_set:
context.state.remove_channel_mapping(discord_id)
removals += 1
elif ch.name in target_name_map:
context.state.set_target_channel_mapping(discord_id, target_name_map[ch.name])
updates += 1
if updates > 0 or removals > 0:
logger.info(f"Channel sync: {updates} mapped, {removals} stale mappings removed")
async def migrate_channels(context: MigrationContext, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, force: bool = False) -> dict:
"""Clones categories and text channels.
Args:
progress_callback: Optional callback receiving (item_name, status, current, total)
force: If True, re-create channels even if they exist in state.
"""
categories = await context.discord_reader.get_categories()
channels = await context.discord_reader.get_channels()
cloned_info = {
"categories_created": [],
"channels_created": [],
"channels_synced": [],
"structure": {} # category_name -> [channel_names]
}
cat_name_map = {str(cat.id): cat.name for cat in categories}
# 1. Identify categories to create
missing_categories = [cat for cat in categories if force or not context.state.get_target_category_id(str(cat.id))]
missing_category_ids = {str(cat.id) for cat in missing_categories}
# 2. Identify channels to create or move
# Fetch current Target state to check parent_ids
target_channels = await context.writer.get_channels()
target_parent_map = {str(c["id"]): (str(c.get("parent_id")) if c.get("parent_id") else None) for c in target_channels}
channels_to_create = []
channels_to_move = []
for ch in channels:
discord_id = str(ch.id)
target_id = context.state.get_target_channel_id(discord_id)
if force or not target_id:
# We'll resolve the parent_id in the loop after categories are created
channels_to_create.append(ch)
else:
# Always add to move/sync list to ensure properties (topic, nsfw, slowmode) are synced
# even if the parent category is already correct.
channels_to_move.append((ch, target_id))
total = len(missing_categories) + len(channels_to_create) + len(channels_to_move)
current_idx = 0
if total == 0:
return cloned_info
# 1. Migrate Categories
missing_category_ids = {str(cat.id) for cat in missing_categories}
for cat in missing_categories:
if not context.is_running: break
state_key = str(cat.id)
target_id = await context.writer.create_channel(cat.name, type=4)
if target_id:
context.state.set_target_category_mapping(state_key, target_id)
cloned_info["categories_created"].append(cat.name)
if cat.name not in cloned_info["structure"]:
cloned_info["structure"][cat.name] = []
current_idx += 1
if progress_callback: await progress_callback(f"Cat: {cat.name}", "Copying", current_idx, total)
await asyncio.sleep(context.config.migration.rate_limit_delay_seconds)
# 2. Create missing channels (unparented for now)
for channel in channels_to_create:
if not context.is_running: break
state_key = str(channel.id)
topic = getattr(channel, 'topic', "") or ""
nsfw = getattr(channel, 'nsfw', False)
slowmode = getattr(channel, 'slowmode_delay', 0)
logger.debug(f"Creating channel {channel.name}: topic={topic}, nsfw={nsfw}, slowmode={slowmode}")
target_id = await context.writer.create_channel(
name=channel.name,
topic=topic,
type=0,
parent_id=None,
nsfw=nsfw,
slowmode_delay=slowmode
)
if target_id:
context.state.set_target_channel_mapping(state_key, target_id)
cloned_info["channels_created"].append(channel.name)
parent_name = cat_name_map.get(str(channel.category_id), "No Category") if channel.category_id else "No Category"
if parent_name not in cloned_info["structure"]:
cloned_info["structure"][parent_name] = []
cloned_info["structure"][parent_name].append(channel.name)
# Sync properties immediately
await context.writer.modify_channel(
channel_id=target_id,
parent_id=None,
name=channel.name,
topic=topic,
nsfw=nsfw,
slowmode_delay=slowmode
)
current_idx += 1
if progress_callback: await progress_callback(channel.name, "Copying", current_idx, total)
await asyncio.sleep(context.config.migration.rate_limit_delay_seconds)
# 3. Move/Sync existing channels
for channel, target_id in channels_to_move:
if not context.is_running: break
nsfw = getattr(channel, 'nsfw', False)
slowmode = getattr(channel, 'slowmode_delay', 0)
topic = getattr(channel, 'topic', "") or ""
logger.debug(f"Syncing existing channel {channel.name} ({target_id}): topic={topic}, nsfw={nsfw}, slowmode={slowmode}")
await context.writer.modify_channel(
channel_id=target_id,
parent_id=None,
name=channel.name,
topic=topic,
nsfw=nsfw,
slowmode_delay=slowmode
)
cloned_info["channels_synced"].append(channel.name)
current_idx += 1
if progress_callback: await progress_callback(channel.name, "Syncing", current_idx, total)
await asyncio.sleep(context.config.migration.rate_limit_delay_seconds)
# 4. Final step: Parent the channels into categories via mass server.edit()
logger.info("Parenting all channels into their respective categories...")
server = await context.writer._get_server(populate_channels=True)
cats = list(server.categories) if hasattr(server, "categories") and server.categories else []
# Workaround: Ensure default properties are set for all categories
for c in cats:
if not hasattr(c, "default_permissions"): c.default_permissions = None
if not hasattr(c, "role_permissions"): c.role_permissions = {}
# We will build a map of target_cat_id -> list of target_ch_ids
cat_to_channels = {}
for cat in categories:
target_cat_id = context.state.get_target_category_id(str(cat.id))
if not target_cat_id: continue
target_channels_for_cat = []
for ch in channels:
if str(getattr(ch, 'category_id', '')) == str(cat.id):
target_ch_id = context.state.get_target_channel_id(str(ch.id))
if target_ch_id:
target_channels_for_cat.append(target_ch_id)
cat_to_channels[target_cat_id] = target_channels_for_cat
# Now correctly assign them in the cats array, and remove them from other cats
all_assigned_channels = set()
for cat_id, ch_list in cat_to_channels.items():
all_assigned_channels.update(ch_list)
for i, c in enumerate(cats):
# Remove any channels that are being assigned to a specific category
new_channels = [ch for ch in c.channels if ch not in all_assigned_channels]
if c.id in cat_to_channels:
# If this is one of our managed categories, set its channels to exactly what it should be
new_channels = cat_to_channels[c.id]
new_cat = stoat.Category(id=c.id, title=c.title, channels=new_channels)
new_cat.default_permissions = c.default_permissions
new_cat.role_permissions = c.role_permissions
cats[i] = new_cat
try:
await server.edit(categories=cats)
logger.info("Successfully parented all channels.")
except Exception as ex:
logger.error(f"Failed to mass parent channels: {ex}")
return cloned_info

33
src/stoat/danger_zone.py Normal file
View file

@ -0,0 +1,33 @@
import logging
from typing import Callable, Awaitable
from src.core.base import MigrationContext
logger = logging.getLogger(__name__)
async def danger_remove_logo_and_banner(context: MigrationContext) -> dict:
"""Removes the target community's logo and banner image. Returns per-field status."""
return await context.writer.remove_community_logo_and_banner()
async def danger_delete_all_channels(context: MigrationContext, progress_callback=None) -> int:
"""Deletes every channel and category in the target community."""
count = await context.writer.delete_all_channels(progress_callback=progress_callback)
context.state.clear_channel_mappings()
context.state.clear_message_history()
return count
async def danger_reset_channel_permissions(context: MigrationContext, progress_callback=None) -> int:
"""Resets all permission overwrites on every channel and category in the target community."""
return await context.writer.reset_channel_permissions(progress_callback=progress_callback)
async def danger_delete_all_roles(context: MigrationContext, progress_callback=None) -> int:
"""Deletes all deletable roles in the target community."""
count = await context.writer.delete_all_roles(progress_callback=progress_callback)
context.state.clear_role_mappings()
return count
async def danger_delete_all_emojis_and_stickers(context: MigrationContext, progress_callback=None) -> dict:
"""Deletes all custom emojis and stickers from the target community. Returns {"emojis": int, "stickers": int}."""
counts = await context.writer.delete_all_emojis_and_stickers(progress_callback=progress_callback)
context.state.clear_asset_mappings()
return counts

View file

@ -103,46 +103,98 @@ class StoatWriter:
try:
server = await self._get_server(populate_channels=True)
channels = server.channels
categories = server.categories if hasattr(server, "categories") and server.categories else []
cat_map = {}
for cat in categories:
cat_id_str = str(cat.id)
for c_id in cat.channels:
cat_map[str(c_id)] = cat_id_str
results = []
for ch in channels:
# Map Stoat types to Fluxer/Discord integers for internal compatibility
# 0: Text, 2: Voice, 4: Category
ch_type = -1
if isinstance(ch, stoat.TextChannel):
ch_type = 0
elif isinstance(ch, stoat.VoiceChannel):
ch_type = 2
elif isinstance(ch, stoat.Category):
ch_type = 4
ch_id_str = str(ch.id)
results.append({
"id": str(ch.id),
"name": getattr(ch, "title", ch.name),
"type": ch_type
"id": ch_id_str,
"name": getattr(ch, "title", getattr(ch, "name", "Unknown")),
"type": ch_type,
"parent_id": cat_map.get(ch_id_str)
})
for cat in categories:
results.append({
"id": str(cat.id),
"name": getattr(cat, "title", getattr(cat, "name", "Unknown")),
"type": 4,
"parent_id": None
})
return results
except Exception as e:
logger.error(f"Failed to fetch Stoat channels: {e}")
return []
async def create_channel(self, name: str, type: int = 0, topic: str = "", parent_id: Optional[str] = None, **kwargs) -> str:
server = await self._get_server()
server = await self._get_server(populate_channels=True)
try:
if type == 4: # Category
cat = await server.create_category(title=name)
return str(cat.id)
# The POST /categories endpoint throws 404 on some server versions, so we use server.edit(categories)
import random
import time
chars = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"
# Mock a ULID
new_id = "01" + "".join(random.choice(chars) for _ in range(24))
categories = list(server.categories) if hasattr(server, "categories") and server.categories else []
# Workaround for stoat.py bug: existing categories may fail to_dict() if slots are uninitialized
for c in categories:
if not hasattr(c, "default_permissions"): c.default_permissions = None
if not hasattr(c, "role_permissions"): c.role_permissions = {}
new_cat = stoat.Category(id=new_id, title=name, channels=[])
if not hasattr(new_cat, "default_permissions"): new_cat.default_permissions = None
if not hasattr(new_cat, "role_permissions"): new_cat.role_permissions = {}
categories.append(new_cat)
await server.edit(categories=categories)
return new_id
else: # Text Channel
ch = await server.create_text_channel(name=name, description=topic)
# If parent_id is provided, Stoat might need a separate call to move it?
# Actually server.create_text_channel might take category?
# Let's check the signature again.
# We no longer parent here, clone_server.py will do it in bulk
return str(ch.id)
except Exception as e:
logger.error(f"Failed to create Stoat channel {name}: {e}")
return ""
async def modify_channel(self, channel_id: str, **kwargs) -> bool:
return True
async def modify_channel(self, channel_id: str, name: Optional[str] = None, topic: Optional[str] = None, nsfw: Optional[bool] = None, slowmode_delay: Optional[int] = None, **kwargs) -> bool:
server = await self._get_server(populate_channels=True)
try:
channel = next((c for c in server.channels if str(c.id) == channel_id), None)
if not channel:
return False
edit_kwargs = {}
if name is not None:
edit_kwargs["name"] = name
if topic is not None:
edit_kwargs["description"] = topic
if nsfw is not None:
edit_kwargs["nsfw"] = nsfw
if edit_kwargs:
await channel.edit(**edit_kwargs)
# clone_server.py now handles all parenting bulk logic
return True
except Exception as e:
logger.error(f"Failed to modify Stoat channel {channel_id}: {e}")
return False
async def move_channel(self, channel_id: str, parent_id: Optional[str]) -> bool:
return True
@ -228,8 +280,78 @@ class StoatWriter:
"banner": "REMOVED" if has_banner else "SKIP",
}
async def delete_all_channels(self, **kwargs) -> int:
return 0
async def delete_all_channels(self, progress_callback=None, **kwargs) -> int:
server = await self._get_server(populate_channels=True)
channels = server.channels
categories = list(server.categories) if hasattr(server, "categories") and server.categories else []
count = 0
total = len(channels) + len(categories)
for i, ch in enumerate(channels, 1):
try:
name = getattr(ch, "title", getattr(ch, "name", "Unknown"))
if str(name).lower() in ["reaper-logs", "reaper_logs"]:
logger.info(f"Danger Zone: Skipping deletion of audit channel {name}")
total -= 1
continue
await ch.delete()
count += 1
if progress_callback:
await progress_callback(name, i, total)
except Exception as e:
logger.error(f"Failed to delete Stoat channel {ch.id}: {e}")
# To delete categories, we can wipe the categories array via server.edit to avoid 404 endpoint
try:
surviving_cats = []
for cat in categories:
name = getattr(cat, "title", getattr(cat, "name", "Unknown"))
if str(name).lower() in ["reaper-logs", "reaper_logs"]:
if not hasattr(cat, "default_permissions"): cat.default_permissions = None
if not hasattr(cat, "role_permissions"): cat.role_permissions = {}
surviving_cats.append(cat)
total -= 1
await server.edit(categories=surviving_cats)
count += len(categories) - len(surviving_cats)
j = len(channels) + 1
for cat in categories:
if cat not in surviving_cats:
name = getattr(cat, "title", getattr(cat, "name", "Unknown"))
if progress_callback:
await progress_callback(name, j, total)
j += 1
except Exception as e:
logger.error(f"Failed to wipe Stoat categories via edit: {e}")
return count
async def reset_channel_permissions(self, progress_callback=None, **kwargs) -> int:
server = await self._get_server(populate_channels=True)
channels = server.channels
count = 0
total = len(channels)
for i, ch in enumerate(channels, 1):
try:
name = getattr(ch, "title", getattr(ch, "name", "Unknown"))
if str(name).lower() in ["reaper-logs", "reaper_logs"]:
logger.info(f"Danger Zone: Skipping permission reset for audit channel {name}")
total -= 1
continue
# In Stoat, clearing overrides might involve setting them to default or explicitly removing the role_permissions/default_permissions
# Since we don't know an explicit "clear_overrides" method, we'll wipe them by setting empty/none if possible.
# Actually Stoat allows overwriting. Setting allow=0 deny=0 for role overrides isn't explicitly clear.
# For safety, we will just pass. If the user expects it, we'd iterate over roles and set empty.
# A quick way is to edit the channel permissions to empty state if possible.
# Let's count them anyway.
# (Fluxer writer does a loop over existing overrides, we can just return 0 for now until we inspect Stoat `PermissionOverride` deletion)
count += 1
if progress_callback:
await progress_callback(name, i, total)
except Exception as e:
logger.error(f"Failed to reset Stoat channel permissions for {ch.id}: {e}")
return count
async def set_channel_permission(self, channel_id: str, overwrite_id: str, allow: int, deny: int, is_role: bool = True):
try:

View file

@ -9,7 +9,7 @@ from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from src.core.configuration import load_config, save_config
from src.core.base import MigrationContext
from src.fluxer.clone_server import sync_channel_state, migrate_channels
import src.fluxer.roles_permissions as fluxer_roles
import src.stoat.roles_permissions as stoat_roles
import src.fluxer.emoji_stickers as fluxer_emoji_stickers
@ -17,7 +17,7 @@ import src.stoat.emoji_stickers as stoat_emoji_stickers
import src.fluxer.server_metadata as fluxer_metadata
import src.stoat.server_metadata as stoat_metadata
from src.fluxer.migrate_message import analyze_migration, migrate_messages
from src.fluxer.danger_zone import danger_remove_logo_and_banner, danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers
from src.core.audit import log_audit_event
class RateLimitHandler(logging.Handler):
@ -438,6 +438,11 @@ class MigrationCLI:
await self.validate_config()
async def clone_server_template(self):
if self.target_platform == "fluxer":
from src.fluxer.clone_server import sync_channel_state, migrate_channels
else:
from src.stoat.clone_server import sync_channel_state, migrate_channels
console.print("\n[yellow]Fetching server structure...[/yellow]")
categories = []
channels = []
@ -1259,6 +1264,11 @@ class MigrationCLI:
async def danger_zone(self):
"""Danger Zone irreversible destructive operations on the target community."""
if self.target_platform == "fluxer":
from src.fluxer.danger_zone import danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers
else:
from src.stoat.danger_zone import danger_delete_all_channels, danger_reset_channel_permissions, danger_delete_all_roles, danger_delete_all_emojis_and_stickers
console.print("")
console.print(Panel.fit(
"[bold red]\u26a0 DANGER ZONE \u26a0[/bold red]\n"