preserve reply structure

This commit is contained in:
rambros 2026-02-21 20:07:26 +05:30
parent 7762922d4c
commit 7442f3c0a0
5 changed files with 210 additions and 110 deletions

1
.gitignore vendored
View file

@ -20,6 +20,7 @@ wheels/
*.egg-info/ *.egg-info/
.installed.cfg .installed.cfg
*.egg *.egg
temp.txt
# Virtual Environment # Virtual Environment
venv/ venv/

View file

@ -55,10 +55,18 @@ class MigrationEngine:
await self.discord_reader.start() await self.discord_reader.start()
await self.fluxer_writer.start() await self.fluxer_writer.start()
async def start_fluxer_only(self):
"""Starts only the Fluxer writer (used for Danger Zone operations that don't need Discord)."""
await self.fluxer_writer.start()
async def close_connections(self): async def close_connections(self):
await self.discord_reader.close() await self.discord_reader.close()
await self.fluxer_writer.close() await self.fluxer_writer.close()
async def close_fluxer_only(self):
"""Closes only the Fluxer writer. Pair with start_fluxer_only()."""
await self.fluxer_writer.close()
async def sync_server_metadata(self, progress_callback: Callable[[str, str], Awaitable[None]], components: List[str] = ["name", "icon", "banner"]): async def sync_server_metadata(self, progress_callback: Callable[[str, str], Awaitable[None]], components: List[str] = ["name", "icon", "banner"]):
"""Syncs the server name, logo and banner.""" """Syncs the server name, logo and banner."""
metadata = await self.discord_reader.get_server_metadata() metadata = await self.discord_reader.get_server_metadata()
@ -193,15 +201,24 @@ class MigrationEngine:
logger.error(f"Failed to download attachment {att.filename}: {e}") logger.error(f"Failed to download attachment {att.filename}: {e}")
try: try:
await self.fluxer_writer.send_message( # Check if this message is a reply
reply_to_fluxer_id = None
if msg.reference and msg.reference.message_id:
reply_to_fluxer_id = self.state.get_fluxer_message_id(str(msg.reference.message_id))
fluxer_msg_id = await self.fluxer_writer.send_message(
channel_id=target_channel_id, channel_id=target_channel_id,
author_name=msg.author.name, author_name=msg.author.display_name,
author_avatar_url=str(msg.author.display_avatar.url), author_avatar_url=str(msg.author.display_avatar.url),
content=msg.content, content=msg.content,
timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"), timestamp=msg.created_at.strftime("%Y-%m-%d %H:%M:%S"),
files=files if files else None files=files if files else None,
reply_to_message_id=reply_to_fluxer_id
) )
if fluxer_msg_id:
self.state.set_message_mapping(str(msg.id), fluxer_msg_id)
self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at)) self.state.update_last_message_timestamp(str(source_channel_id), str(msg.created_at))
message_count += 1 message_count += 1
if progress_callback: if progress_callback:
@ -236,8 +253,12 @@ class MigrationEngine:
if progress_callback: await progress_callback(role.name, idx + 1, total) if progress_callback: await progress_callback(role.name, idx + 1, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
async def migrate_emojis(self, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, types_to_include: List[str] = ["Emoji", "Sticker"]): async def migrate_emojis(self, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, types_to_include: List[str] = ["Emoji", "Sticker"], force: bool = False):
"""Copies custom emojis and stickers.""" """Copies custom emojis and stickers.
Args:
force: If True, skip state cache and re-copy even if already migrated.
"""
objs = [] objs = []
if "Emoji" in types_to_include: if "Emoji" in types_to_include:
emojis = await self.discord_reader.get_emojis() emojis = await self.discord_reader.get_emojis()
@ -252,7 +273,7 @@ class MigrationEngine:
if not self.is_running: break if not self.is_running: break
state_key = f"{obj_type.lower()}_{obj.id}" state_key = f"{obj_type.lower()}_{obj.id}"
fluxer_id = self.state.get_fluxer_channel_id(state_key) fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key)
if not fluxer_id: if not fluxer_id:
try: try:
if obj_type == "Emoji": if obj_type == "Emoji":
@ -296,9 +317,9 @@ class MigrationEngine:
# ──────────────── DANGER ZONE ──────────────── # ──────────────── DANGER ZONE ────────────────
async def danger_remove_logo_and_banner(self) -> None: async def danger_remove_logo_and_banner(self) -> dict:
"""Removes the community logo and banner image.""" """Removes the community logo and banner image. Returns per-field status."""
await self.fluxer_writer.remove_community_logo_and_banner() return await self.fluxer_writer.remove_community_logo_and_banner()
async def danger_delete_all_channels(self, progress_callback=None) -> int: async def danger_delete_all_channels(self, progress_callback=None) -> int:
"""Deletes every channel and category in the Fluxer community.""" """Deletes every channel and category in the Fluxer community."""
@ -312,6 +333,7 @@ class MigrationEngine:
"""Deletes all deletable roles (skips managed/bot roles and @everyone).""" """Deletes all deletable roles (skips managed/bot roles and @everyone)."""
return await self.fluxer_writer.delete_all_roles(progress_callback=progress_callback) return await self.fluxer_writer.delete_all_roles(progress_callback=progress_callback)
async def danger_delete_all_emojis_and_stickers(self, progress_callback=None) -> int: async def danger_delete_all_emojis_and_stickers(self, progress_callback=None) -> dict:
"""Deletes all custom emojis and stickers from the Fluxer community.""" """Deletes all custom emojis and stickers. Returns {"emojis": int, "stickers": int}."""
return await self.fluxer_writer.delete_all_emojis_and_stickers(progress_callback=progress_callback) return await self.fluxer_writer.delete_all_emojis_and_stickers(progress_callback=progress_callback)

View file

@ -11,6 +11,7 @@ class MigrationState:
self.channel_map: Dict[str, str] = {} self.channel_map: Dict[str, str] = {}
self.role_map: Dict[str, str] = {} self.role_map: Dict[str, str] = {}
self.user_map: Dict[str, str] = {} self.user_map: Dict[str, str] = {}
self.message_map: Dict[str, str] = {}
# tracking last message timestamp per channel to resume # tracking last message timestamp per channel to resume
self.last_message_timestamps: Dict[str, str] = {} self.last_message_timestamps: Dict[str, str] = {}
@ -24,6 +25,7 @@ class MigrationState:
self.channel_map = data.get("channels", {}) self.channel_map = data.get("channels", {})
self.role_map = data.get("roles", {}) self.role_map = data.get("roles", {})
self.user_map = data.get("users", {}) self.user_map = data.get("users", {})
self.message_map = data.get("messages", {})
self.last_message_timestamps = data.get("last_message_timestamps", {}) self.last_message_timestamps = data.get("last_message_timestamps", {})
def save(self): def save(self):
@ -31,6 +33,7 @@ class MigrationState:
"channels": self.channel_map, "channels": self.channel_map,
"roles": self.role_map, "roles": self.role_map,
"users": self.user_map, "users": self.user_map,
"messages": self.message_map,
"last_message_timestamps": self.last_message_timestamps "last_message_timestamps": self.last_message_timestamps
} }
with open(self.state_file, "w", encoding="utf-8") as f: with open(self.state_file, "w", encoding="utf-8") as f:
@ -42,6 +45,13 @@ class MigrationState:
def get_fluxer_channel_id(self, discord_id: str) -> str | None: def get_fluxer_channel_id(self, discord_id: str) -> str | None:
return self.channel_map.get(str(discord_id)) return self.channel_map.get(str(discord_id))
def set_message_mapping(self, discord_id: str, fluxer_id: str):
self.message_map[str(discord_id)] = str(fluxer_id)
self.save()
def get_fluxer_message_id(self, discord_id: str) -> str | None:
return self.message_map.get(str(discord_id))
def update_last_message_timestamp(self, channel_id: str, timestamp: str): def update_last_message_timestamp(self, channel_id: str, timestamp: str):
self.last_message_timestamps[str(channel_id)] = timestamp self.last_message_timestamps[str(channel_id)] = timestamp

View file

@ -1,7 +1,10 @@
import asyncio import asyncio
import logging
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
from fluxer import Bot, Webhook from fluxer import Bot, Webhook
logger = logging.getLogger(__name__)
class FluxerWriter: class FluxerWriter:
def __init__(self, token: str, community_id: str): def __init__(self, token: str, community_id: str):
self.token = token self.token = token
@ -121,10 +124,11 @@ class FluxerWriter:
assert self.client is not None assert self.client is not None
return await self.client.get_guild_channels(self.community_id) return await self.client.get_guild_channels(self.community_id)
async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, author_avatar_url: Optional[str] = None, files: Optional[List[Dict[str, Any]]] = None) -> None: async def send_message(self, channel_id: str, author_name: str, content: str, timestamp: str, author_avatar_url: Optional[str] = None, files: Optional[List[Dict[str, Any]]] = None, reply_to_message_id: Optional[str] = None) -> Optional[str]:
""" """
Sends a message to the target channel. Sends a message to the target channel.
Uses a webhook to mimic the original author if possible. Uses a webhook to mimic the original author if possible.
Returns the ID of the sent message if available.
""" """
assert self.client is not None assert self.client is not None
@ -138,38 +142,45 @@ class FluxerWriter:
# Use webhook for avatar/username spoofing # Use webhook for avatar/username spoofing
webhook = await self._get_or_create_webhook(channel_id) webhook = await self._get_or_create_webhook(channel_id)
# Use webhook for avatar/username spoofing # Prepare content with subtext timestamp
webhook = await self._get_or_create_webhook(channel_id) # -# is Fluxer/Discord's subtext markdown: small, muted grey text
prefix = f"-# {timestamp}\n"
# Prepare content with timestamp (crucial for migration)
# We still add the timestamp to the message body so it's searchable and preserved
prefix = f"**[{timestamp}]**:\n"
final_content = prefix + content if content else prefix final_content = prefix + content if content else prefix
try: try:
# Current limitation: fluxer.py execute_webhook doesn't support 'files' yet. # Current limitation: fluxer.py execute_webhook doesn't support 'files' or 'message_reference' yet.
# So if we have files, we MUST use the bot's direct send method. # So if we have files OR a reply, we MUST use the bot's direct send method.
if webhook and not files: if webhook and not files and not reply_to_message_id:
await webhook.send( msg = await webhook.send(
content=final_content, content=final_content,
username=f"{author_name} (via Discord)", username=f"{author_name} (via Discord)",
avatar_url=author_avatar_url avatar_url=author_avatar_url,
wait=True
) )
return str(msg.id) if msg else None
else: else:
# Use bot direct message (supports files) # Use bot direct message (supports files and message_reference)
# We add the author name to the prefix since bot name won't match # We add the author name to the prefix since bot name won't match
bot_prefix = f"**[{timestamp}] {author_name}**:\n" bot_prefix = f"-# {timestamp} · {author_name}\n"
bot_content = bot_prefix + content if content else bot_prefix bot_content = bot_prefix + content if content else bot_prefix
await self.client.send_message(
message_reference = None
if reply_to_message_id:
message_reference = {"message_id": str(reply_to_message_id), "channel_id": str(channel_id)}
msg_data = await self.client.send_message(
channel_id=channel_id, channel_id=channel_id,
content=bot_content, content=bot_content,
files=files files=files,
message_reference=message_reference
) )
return str(msg_data["id"]) if msg_data else None
except Exception as e: except Exception as e:
err_msg = f"Failed to copy message: {e}" err_msg = f"Failed to copy message: {e}"
if hasattr(e, 'errors') and e.errors: if hasattr(e, 'errors') and e.errors:
err_msg += f" - Details: {e.errors}" err_msg += f" - Details: {e.errors}"
print(err_msg) print(err_msg)
return None
async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str: async def create_role(self, name: str, color: int, hoist: bool, mentionable: bool) -> str:
@ -206,7 +217,7 @@ class FluxerWriter:
) )
return str(emoji["id"]) return str(emoji["id"])
except Exception as e: except Exception as e:
print(f"Failed to copy emoji {name}: {e}") logger.error(f"Failed to copy emoji '{name}': {e}", exc_info=True)
return "" return ""
async def create_sticker(self, name: str, image_bytes: bytes) -> str: async def create_sticker(self, name: str, image_bytes: bytes) -> str:
@ -223,7 +234,7 @@ class FluxerWriter:
) )
return str(sticker["id"]) return str(sticker["id"])
except Exception as e: except Exception as e:
print(f"Failed to copy sticker {name}: {e}") logger.error(f"Failed to copy sticker '{name}': {e}", exc_info=True)
return "" return ""
async def update_guild_metadata(self, name: Optional[str] = None, icon: Optional[bytes] = None, banner: Optional[bytes] = None) -> None: async def update_guild_metadata(self, name: Optional[str] = None, icon: Optional[bytes] = None, banner: Optional[bytes] = None) -> None:
@ -256,19 +267,50 @@ class FluxerWriter:
except Exception as e: except Exception as e:
print(f"Failed to update community metadata: {e}") print(f"Failed to update community metadata: {e}")
async def remove_community_logo_and_banner(self) -> None: async def remove_community_logo_and_banner(self) -> dict:
""" """
Removes the community logo (icon) and banner by setting them to None. Removes the community logo (icon) and banner.
Fetches the current guild state first so it can report whether each
field was actually set (REMOVED) or already empty (SKIP).
Correct API calls per Fluxer contract:
await http.modify_guild(guild_id, icon=None)
await http.modify_guild(guild_id, banner=None)
Returns:
{"icon": "REMOVED"|"SKIP", "banner": "REMOVED"|"SKIP"}
""" """
assert self.client is not None assert self.client is not None
try:
await self.client.modify_guild( # 1. Check current state
guild_id=self.community_id, guild = await self.client.get_guild(self.community_id)
icon=None, has_icon = bool(guild.get("icon"))
banner=None has_banner = bool(guild.get("banner"))
)
except Exception as e: # 2. Remove icon if set
print(f"Failed to remove community logo/banner: {e}") if has_icon:
try:
await self.client.modify_guild(
guild_id=self.community_id,
icon=None
)
except Exception as e:
print(f"Failed to remove community icon: {e}")
# 3. Remove banner if set
if has_banner:
try:
await self.client.modify_guild(
guild_id=self.community_id,
banner=None
)
except Exception as e:
print(f"Failed to remove community banner: {e}")
return {
"icon": "REMOVED" if has_icon else "SKIP",
"banner": "REMOVED" if has_banner else "SKIP",
}
async def delete_all_channels(self, progress_callback=None) -> int: async def delete_all_channels(self, progress_callback=None) -> int:
""" """
@ -356,24 +398,25 @@ class FluxerWriter:
print(f"Failed to delete role {role.get('name')}: {e}") print(f"Failed to delete role {role.get('name')}: {e}")
return deleted return deleted
async def delete_all_emojis_and_stickers(self, progress_callback=None) -> int: async def delete_all_emojis_and_stickers(self, progress_callback=None) -> dict:
""" """
Deletes all custom emojis and stickers in the Fluxer community. Deletes all custom emojis and stickers in the Fluxer community.
Returns the total count of deleted items. Returns {"emojis": int, "stickers": int} with independent counts.
""" """
assert self.client is not None assert self.client is not None
deleted = 0 emoji_deleted = 0
sticker_deleted = 0
# Delete emojis # Delete emojis
try: try:
emojis = await self.client.get_guild_emojis(self.community_id) emojis = await self.client.get_guild_emojis(self.community_id)
emoji_total = len(emojis) emoji_total = len(emojis)
for idx, emoji in enumerate(emojis): for emoji in emojis:
try: try:
await self.client.delete_guild_emoji(self.community_id, emoji["id"]) await self.client.delete_guild_emoji(self.community_id, emoji["id"])
deleted += 1 emoji_deleted += 1
if progress_callback: if progress_callback:
await progress_callback(emoji.get("name", "Unknown"), "Emoji", deleted, emoji_total) await progress_callback(emoji.get("name", "Unknown"), "Emoji", emoji_deleted, emoji_total)
except Exception as e: except Exception as e:
print(f"Failed to delete emoji {emoji.get('name')}: {e}") print(f"Failed to delete emoji {emoji.get('name')}: {e}")
except Exception as e: except Exception as e:
@ -383,18 +426,19 @@ class FluxerWriter:
try: try:
stickers = await self.client.get_guild_stickers(self.community_id) stickers = await self.client.get_guild_stickers(self.community_id)
sticker_total = len(stickers) sticker_total = len(stickers)
for idx, sticker in enumerate(stickers): for sticker in stickers:
try: try:
await self.client.delete_guild_sticker(self.community_id, sticker["id"]) await self.client.delete_guild_sticker(self.community_id, sticker["id"])
deleted += 1 sticker_deleted += 1
if progress_callback: if progress_callback:
await progress_callback(sticker.get("name", "Unknown"), "Sticker", deleted, sticker_total) await progress_callback(sticker.get("name", "Unknown"), "Sticker", sticker_deleted, sticker_total)
except Exception as e: except Exception as e:
print(f"Failed to delete sticker {sticker.get('name')}: {e}") print(f"Failed to delete sticker {sticker.get('name')}: {e}")
except Exception as e: except Exception as e:
print(f"Failed to fetch stickers: {e}") print(f"Failed to fetch stickers: {e}")
return deleted return {"emojis": emoji_deleted, "stickers": sticker_deleted}
async def close(self): async def close(self):
"""Cleanly close connection and stop bot task.""" """Cleanly close connection and stop bot task."""

View file

@ -348,22 +348,36 @@ class MigrationCLI:
await self.engine.start_connections() await self.engine.start_connections()
emojis = await self.engine.discord_reader.get_emojis() emojis = await self.engine.discord_reader.get_emojis()
stickers = await self.engine.discord_reader.get_stickers() stickers = await self.engine.discord_reader.get_stickers()
console.print(f"\n[bold]Custom emojis found: {len(emojis)}[/bold]") console.print(f"\n[bold]Custom emojis found: {len(emojis)}[/bold]")
for e in emojis: for e in emojis:
console.print(f" - Emoji: {e.name}") already = self.engine.state.get_fluxer_channel_id(f"emoji_{e.id}")
tag = " [dim](already copied)[/dim]" if already else ""
console.print(f" - Emoji: {e.name}{tag}")
console.print(f"[bold]Custom stickers found: {len(stickers)}[/bold]") console.print(f"[bold]Custom stickers found: {len(stickers)}[/bold]")
for s in stickers: for s in stickers:
console.print(f" - Sticker: {s.name}") already = self.engine.state.get_fluxer_channel_id(f"sticker_{s.id}")
tag = " [dim](already copied)[/dim]" if already else ""
console.print(f" - Sticker: {s.name}{tag}")
# Warn if everything is already in the state cache
all_ids = ([f"emoji_{e.id}" for e in emojis] +
[f"sticker_{s.id}" for s in stickers])
cached_count = sum(
1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k)
)
if cached_count > 0:
console.print(f"\n[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) marked as already copied in state.json.[/yellow]")
console.print("[yellow] If the target community was reset, choose Force Re-copy.[/yellow]")
console.print("\n(1) Copy Emojis only") console.print("\n(1) Copy Emojis only")
console.print("(2) Copy Stickers only") console.print("(2) Copy Stickers only")
console.print("(3) Copy Emojis and Stickers") console.print("(3) Copy Emojis and Stickers")
console.print("(B) Back") console.print("(B) Back")
choice = Prompt.ask("Select an option", choices=["1", "2", "3", "B", "b"], default="B").upper() choice = Prompt.ask("Select an option", choices=["1", "2", "3", "B", "b"], default="B").upper()
if choice == "B": if choice == "B":
return return
@ -375,6 +389,23 @@ class MigrationCLI:
elif choice == "3": elif choice == "3":
types_to_include = ["Emoji", "Sticker"] types_to_include = ["Emoji", "Sticker"]
# Ask about force re-copy only if there are cached items in scope
in_scope_keys = []
if "Emoji" in types_to_include:
in_scope_keys += [f"emoji_{e.id}" for e in emojis]
if "Sticker" in types_to_include:
in_scope_keys += [f"sticker_{s.id}" for s in stickers]
cached_in_scope = sum(
1 for k in in_scope_keys if self.engine.state.get_fluxer_channel_id(k)
)
force = False
if cached_in_scope > 0:
force = Confirm.ask(
f"[yellow]{cached_in_scope} item(s) already in state cache. Force re-copy anyway?[/yellow]",
default=False
)
console.print("\n[bold green]Starting Migration...[/bold green]") console.print("\n[bold green]Starting Migration...[/bold green]")
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
@ -383,21 +414,26 @@ class MigrationCLI:
TaskProgressColumn(), TaskProgressColumn(),
console=console console=console
) as progress: ) as progress:
emoji_task = progress.add_task("[cyan]Copying Assets...", total=100) emoji_task = progress.add_task("[cyan]Copying Assets...", total=100)
async def update_progress(item_name: str, item_type: str, current: int, total: int): async def update_progress(item_name: str, item_type: str, current: int, total: int):
progress.update(emoji_task, total=total, completed=current, description=f"[cyan]Copying {item_type}: {item_name}") progress.update(emoji_task, total=total, completed=current, description=f"[cyan]Copying {item_type}: {item_name}")
self.engine.is_running = True self.engine.is_running = True
await self.engine.migrate_emojis(progress_callback=update_progress, types_to_include=types_to_include) await self.engine.migrate_emojis(
progress_callback=update_progress,
types_to_include=types_to_include,
force=force
)
console.print("[bold green]Migration complete![/bold green]") console.print("[bold green]Migration complete![/bold green]")
except Exception as e: except Exception as e:
console.print(f"[bold red]Error during emoji migration: {str(e)}[/bold red]") console.print(f"[bold red]Error during emoji migration: {str(e)}[/bold red]")
finally: finally:
await self.engine.close_connections() await self.engine.close_connections()
self.engine.is_running = False self.engine.is_running = False
async def sync_server_metadata(self): async def sync_server_metadata(self):
@ -564,55 +600,36 @@ class MigrationCLI:
"""Danger Zone irreversible destructive operations on the Fluxer community.""" """Danger Zone irreversible destructive operations on the Fluxer community."""
console.print("") console.print("")
console.print(Panel.fit( console.print(Panel.fit(
"[bold red]⚠ DANGER ZONE ⚠[/bold red]\n" "[bold red]\u26a0 DANGER ZONE \u26a0[/bold red]\n"
"[yellow]These actions are PERMANENT and IRREVERSIBLE on your Fluxer community.[/yellow]\n" "[yellow]These actions are PERMANENT and IRREVERSIBLE on your Fluxer community.[/yellow]\n"
"[yellow]Always double-check before confirming.[/yellow]", "[yellow]Always double-check before confirming.[/yellow]",
style="bold red" style="bold red"
)) ))
console.print("(1) Remove Community Logo and Banner") console.print("(1) Delete all Channels & Categories")
console.print("(2) Delete all Channels & Categories") console.print("(2) Reset Channel & Category Permissions")
console.print("(3) Reset Channel & Category Permissions") console.print("(3) Delete all Roles [dim](bot role is protected)[/dim]")
console.print("(4) Delete all Roles [dim](bot role is protected)[/dim]") console.print("(4) Delete all custom Emojis & Stickers")
console.print("(5) Delete all custom Emojis & Stickers")
console.print("(B) Back") console.print("(B) Back")
choice = Prompt.ask( choice = Prompt.ask(
"Select a Danger Zone option", "Select a Danger Zone option",
choices=["1", "2", "3", "4", "5", "B", "b"], choices=["1", "2", "3", "4", "B", "b"],
default="B" default="B"
).upper() ).upper()
if choice == "B": if choice == "B":
return return
# ---- (1) Remove Logo & Banner ---- # ---- (1) Delete all Channels & Categories ----
if choice == "1": if choice == "1":
console.print("")
console.print("[bold red]This will REMOVE the community logo and banner image.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return
if not Confirm.ask("[bold red]Last chance confirm permanent removal?[/bold red]"):
return
try:
await self.engine.start_connections()
with console.status("[red]Removing logo and banner...[/red]"):
await self.engine.danger_remove_logo_and_banner()
console.print("[bold green]Community logo and banner removed.[/bold green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_connections()
# ---- (2) Delete all Channels & Categories ----
elif choice == "2":
console.print("") console.print("")
console.print("[bold red]This will DELETE every channel and category in the Fluxer community.[/bold red]") console.print("[bold red]This will DELETE every channel and category in the Fluxer community.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return return
if not Confirm.ask("[bold red]Last chance this cannot be undone. Continue?[/bold red]"): if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"):
return return
try: try:
await self.engine.start_connections() await self.engine.start_fluxer_only()
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
@ -627,22 +644,23 @@ class MigrationCLI:
description=f"[red]Deleting: {name}") description=f"[red]Deleting: {name}")
count = await self.engine.danger_delete_all_channels(progress_callback=on_channel_deleted) count = await self.engine.danger_delete_all_channels(progress_callback=on_channel_deleted)
console.print(f"[bold green]Done. {count} channels/categories deleted.[/bold green]") console.print(f"[bold green]{count} channels/categories deleted.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e: except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]") console.print(f"[bold red]Error: {e}[/bold red]")
finally: finally:
await self.engine.close_connections() await self.engine.close_fluxer_only()
# ---- (3) Reset Channel & Category Permissions ---- # ---- (2) Reset Channel & Category Permissions ----
elif choice == "3": elif choice == "2":
console.print("") console.print("")
console.print("[bold red]This will RESET all permission overwrites on every channel and category.[/bold red]") console.print("[bold red]This will RESET all permission overwrites on every channel and category.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return return
if not Confirm.ask("[bold red]Last chance all custom permission overrides will be wiped. Continue?[/bold red]"): if not Confirm.ask("[bold red]Last chance \u2013 all custom permission overrides will be wiped. Continue?[/bold red]"):
return return
try: try:
await self.engine.start_connections() await self.engine.start_fluxer_only()
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
@ -657,23 +675,24 @@ class MigrationCLI:
description=f"[red]Resetting: {name}") description=f"[red]Resetting: {name}")
count = await self.engine.danger_reset_channel_permissions(progress_callback=on_perm_reset) count = await self.engine.danger_reset_channel_permissions(progress_callback=on_perm_reset)
console.print(f"[bold green]Done. Permissions reset on {count} channels/categories.[/bold green]") console.print(f"[bold green]Permissions reset on {count} channels/categories.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e: except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]") console.print(f"[bold red]Error: {e}[/bold red]")
finally: finally:
await self.engine.close_connections() await self.engine.close_fluxer_only()
# ---- (4) Delete all Roles ---- # ---- (3) Delete all Roles ----
elif choice == "4": elif choice == "3":
console.print("") console.print("")
console.print("[bold red]This will DELETE all roles in the Fluxer community.[/bold red]") console.print("[bold red]This will DELETE all roles in the Fluxer community.[/bold red]")
console.print("[dim]Managed roles (including the bot's own role) and @everyone are automatically protected.[/dim]") console.print("[dim]Managed roles (including the bot's own role) and @everyone are automatically protected.[/dim]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return return
if not Confirm.ask("[bold red]Last chance confirm permanent role deletion?[/bold red]"): if not Confirm.ask("[bold red]Last chance \u2013 confirm permanent role deletion?[/bold red]"):
return return
try: try:
await self.engine.start_connections() await self.engine.start_fluxer_only()
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
@ -688,22 +707,23 @@ class MigrationCLI:
description=f"[red]Deleting role: {name}") description=f"[red]Deleting role: {name}")
count = await self.engine.danger_delete_all_roles(progress_callback=on_role_deleted) count = await self.engine.danger_delete_all_roles(progress_callback=on_role_deleted)
console.print(f"[bold green]Done. {count} roles deleted.[/bold green]") console.print(f"[bold green]{count} roles deleted.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e: except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]") console.print(f"[bold red]Error: {e}[/bold red]")
finally: finally:
await self.engine.close_connections() await self.engine.close_fluxer_only()
# ---- (5) Delete all Emojis & Stickers ---- # ---- (4) Delete all Emojis & Stickers ----
elif choice == "5": elif choice == "4":
console.print("") console.print("")
console.print("[bold red]This will DELETE all custom emojis and stickers in the Fluxer community.[/bold red]") console.print("[bold red]This will DELETE all custom emojis and stickers in the Fluxer community.[/bold red]")
if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"): if not Confirm.ask("[bold red]Are you absolutely sure?[/bold red]"):
return return
if not Confirm.ask("[bold red]Last chance this cannot be undone. Continue?[/bold red]"): if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"):
return return
try: try:
await self.engine.start_connections() await self.engine.start_fluxer_only()
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
TextColumn("[progress.description]{task.description}"), TextColumn("[progress.description]{task.description}"),
@ -717,12 +737,15 @@ class MigrationCLI:
progress.update(asset_task, total=total, completed=current, progress.update(asset_task, total=total, completed=current,
description=f"[red]Deleting {asset_type}: {name}") description=f"[red]Deleting {asset_type}: {name}")
count = await self.engine.danger_delete_all_emojis_and_stickers(progress_callback=on_asset_deleted) counts = await self.engine.danger_delete_all_emojis_and_stickers(progress_callback=on_asset_deleted)
console.print(f"[bold green]Done. {count} emojis/stickers deleted.[/bold green]") console.print(f"[bold green]{counts.get('emojis', 0)} emojis deleted.[/bold green]")
console.print(f"[bold green]{counts.get('stickers', 0)} stickers deleted.[/bold green]")
console.print("[bold green]Done.[/bold green]")
except Exception as e: except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]") console.print(f"[bold red]Error: {e}[/bold red]")
finally: finally:
await self.engine.close_connections() await self.engine.close_fluxer_only()
async def run_cli(): async def run_cli():