unified config

This commit is contained in:
rambros 2026-02-24 22:58:41 +05:30
parent a2a2d0dab5
commit 49752229bf
8 changed files with 263 additions and 128 deletions

6
.gitignore vendored
View file

@ -28,10 +28,8 @@ ENV/
# Configuration and Secrets
configs/
!fluxer.config.example.yaml
!stoat.config.example.yaml
fluxer.config.yaml
stoat.config.yaml
!config.example.yaml
config.yaml
# Logs and State
*.log

View file

@ -3,6 +3,9 @@ discord_server_id: 'DISCORD_SERVER_ID' # ID of the source Discord Server
fluxer_bot_token: FLUXER_BOT_TOKEN # Token used to connect the Fluxer Bot
fluxer_community_id: 'FLUXER_COMMUNITY_ID' # ID of the target Fluxer Community
fluxer_api_url: 'default' # URL of the Fluxer API (default is https://api.fluxer.app)
stoat_bot_token: STOAT_BOT_TOKEN # Token used to connect the Stoat Bot
stoat_server_id: 'STOAT_SERVER_ID' # ID of the target Stoat Server
stoat_api_url: 'default' # URL of the Stoat API (default is https://api.stoat.chat)
migration:
batch_size: 50
rate_limit_delay_seconds: 2

View file

@ -5,25 +5,45 @@ from pathlib import Path
from src.ui.app import run_cli
from src.core.configuration import load_config
def select_config():
def setup_logging():
try:
config = load_config()
log_level_str = config.migration.log_level.upper()
level = getattr(logging, log_level_str, logging.INFO)
except Exception:
level = logging.INFO
handlers = [logging.FileHandler('migration.log', mode='a')]
if level == logging.DEBUG:
handlers.append(logging.StreamHandler(sys.stdout))
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=level,
handlers=handlers
)
def select_platform(config):
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
console = Console()
fluxer_exists = Path("fluxer.config.yaml").exists()
stoat_exists = Path("stoat.config.yaml").exists()
fillers = ["YOUR_FLUXER_TOKEN", "FLUXER_BOT_TOKEN", "YOUR_STOAT_TOKEN", "STOAT_BOT_TOKEN", ""]
fluxer_set = config.fluxer_bot_token not in fillers
stoat_set = config.stoat_bot_token not in fillers
if fluxer_exists and not stoat_exists:
return "fluxer.config.yaml"
elif stoat_exists and not fluxer_exists:
return "stoat.config.yaml"
elif fluxer_exists and stoat_exists:
if fluxer_set and not stoat_set:
return "fluxer"
elif stoat_set and not fluxer_set:
return "stoat"
elif fluxer_set and stoat_set:
console.print(Panel.fit(
"[bold]Both Fluxer and Stoat configurations found.[/bold]\n"
"Which one do you want to use?",
title="[bold cyan]Configuration Selection[/bold cyan]"
title="[bold cyan]Platform Selection[/bold cyan]"
))
console.print("(1) [bold blue]Fluxer[/bold blue]")
console.print("(2) [bold red]Stoat[/bold red]")
@ -32,12 +52,13 @@ def select_config():
choice = Prompt.ask("Select an option [[bold cyan]1/2/Q[/bold cyan]]", choices=["1", "2", "Q", "q"], show_choices=False).upper()
if choice == "1":
return "fluxer.config.yaml"
return "fluxer"
elif choice == "2":
return "stoat.config.yaml"
return "stoat"
else:
sys.exit(0)
else:
# Both are fillers
console.print(Panel.fit(
"[bold]First setup, Tool configuration[/bold]\n"
"Which platform do you want to migrate to?",
@ -50,32 +71,12 @@ def select_config():
choice = Prompt.ask("Select an option [[bold cyan]1/2/Q[/bold cyan]]", choices=["1", "2", "Q", "q"], show_choices=False).upper()
if choice == "1":
return "fluxer.config.yaml"
return "fluxer"
elif choice == "2":
return "stoat.config.yaml"
return "stoat"
else:
sys.exit(0)
def setup_logging(config_path):
try:
config = load_config(config_path)
log_level_str = config.migration.log_level.upper()
level = getattr(logging, log_level_str, logging.INFO)
except Exception:
level = logging.INFO
platform = config_path.split('.')[0]
handlers = [logging.FileHandler(f'{platform}.migration.log', mode='a')]
if level == logging.DEBUG:
handlers.append(logging.StreamHandler(sys.stdout))
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=level,
handlers=handlers
)
def relaunch_in_terminal():
"""Detects if running without a terminal on Linux and relaunches in one."""
import os
@ -140,10 +141,11 @@ def relaunch_in_terminal():
def main():
relaunch_in_terminal()
config_path = select_config()
setup_logging(config_path)
config = load_config()
setup_logging()
platform = select_platform(config)
try:
asyncio.run(run_cli(config_path))
asyncio.run(run_cli(target_platform=platform))
except KeyboardInterrupt:
print("\nOperation terminated by user.")
sys.exit(0)

View file

@ -5,14 +5,16 @@ from src.core.configuration import AppConfig
from src.core.state import MigrationState
from src.core.discord_reader import DiscordReader
from src.fluxer.writer import FluxerWriter
from src.stoat.writer import StoatWriter
logger = logging.getLogger(__name__)
class MigrationContext:
"""Holds state and connections for reading from Discord and writing to Fluxer."""
def __init__(self, config: AppConfig):
def __init__(self, config: AppConfig, target_platform: str = "fluxer"):
self.config = config
self.target_platform = target_platform
self.state = MigrationState()
self.discord_reader = DiscordReader(
@ -25,6 +27,14 @@ class MigrationContext:
community_id=config.fluxer_community_id
)
self.stoat_writer = StoatWriter(
token=config.stoat_bot_token,
community_id=config.stoat_server_id
)
self.writer = self.fluxer_writer if target_platform == "fluxer" else self.stoat_writer
self.is_running = False
async def validate_all(self) -> Dict[str, Any]:
@ -56,11 +66,11 @@ class MigrationContext:
async def start_connections(self):
await self.discord_reader.start()
await self.fluxer_writer.start()
await self.writer.start()
async def start_fluxer_only(self):
"""Starts only the Fluxer writer (used for Danger Zone operations that don't need Discord)."""
await self.fluxer_writer.start()
async def start_target_only(self):
"""Starts only the target platform writer (used for Danger Zone operations that don't need Discord)."""
await self.writer.start()
async def close_connections(self):
try:
@ -68,16 +78,16 @@ class MigrationContext:
except Exception as e:
logger.debug(f"Error closing Discord reader: {e}")
try:
await self.fluxer_writer.close()
await self.writer.close()
except Exception as e:
logger.debug(f"Error closing Fluxer writer: {e}")
logger.debug(f"Error closing target writer: {e}")
async def close_fluxer_only(self):
"""Closes only the Fluxer writer. Pair with start_fluxer_only()."""
async def close_target_only(self):
"""Closes only the target platform writer. Pair with start_target_only()."""
try:
await self.fluxer_writer.close()
await self.writer.close()
except Exception as e:
logger.debug(f"Error closing Fluxer writer: {e}")
logger.debug(f"Error closing target writer: {e}")
def stop(self):

View file

@ -9,22 +9,28 @@ class MigrationSettings(BaseModel):
class AppConfig(BaseModel):
discord_bot_token: str
fluxer_bot_token: str
discord_server_id: str
fluxer_bot_token: str
fluxer_community_id: str
fluxer_api_url: str = Field(default="default")
stoat_bot_token: str
stoat_server_id: str
stoat_api_url: str = Field(default="default")
migration: MigrationSettings = Field(default_factory=MigrationSettings)
def load_config(config_path: str | Path = "fluxer.config.yaml") -> AppConfig:
def load_config(config_path: str | Path = "config.yaml") -> AppConfig:
path = Path(config_path)
if not path.exists():
# Create dummy config if missing
config = AppConfig(
discord_bot_token="YOUR_DISCORD_TOKEN",
fluxer_bot_token="YOUR_FLUXER_TOKEN",
discord_server_id="000000000000000000",
fluxer_bot_token="YOUR_FLUXER_TOKEN",
fluxer_community_id="000000000000000000",
fluxer_api_url="default"
fluxer_api_url="default",
stoat_bot_token="YOUR_STOAT_TOKEN",
stoat_server_id="000000000000000000",
stoat_api_url="default"
)
save_config(config, path)
print(f"Created default configuration: {config_path}")
@ -38,7 +44,7 @@ def load_config(config_path: str | Path = "fluxer.config.yaml") -> AppConfig:
return AppConfig(**data)
def save_config(config: AppConfig, config_path: str | Path = "fluxer.config.yaml"):
def save_config(config: AppConfig, config_path: str | Path = "config.yaml"):
path = Path(config_path)
# Dump model to dictionary
data = config.model_dump()

View file

@ -1,4 +1,5 @@
import logging
from typing import Optional, List, Dict, Any
logger = logging.getLogger(__name__)
@ -16,8 +17,59 @@ class StoatWriter:
"community": True,
"bot_name": "Stoat Dummy",
"community_name": "Stoat Community Dummy",
"permissions": {}
"permissions": {
"manage_channels": True,
"manage_messages": True,
"manage_roles": True,
"manage_emojis_stickers": True,
"manage_webhooks": True
}
}
async def get_channels(self) -> List[Dict[str, Any]]:
return []
async def create_channel(self, name: str, **kwargs) -> str:
return "dummy_stoat_channel_id"
async def modify_channel(self, channel_id: str, **kwargs) -> bool:
return True
async def move_channel(self, channel_id: str, parent_id: Optional[str]) -> bool:
return True
async def send_message(self, **kwargs) -> Optional[str]:
return "dummy_stoat_message_id"
async def send_marker(self, **kwargs) -> Optional[str]:
return "dummy_stoat_marker_id"
async def create_role(self, **kwargs) -> str:
return "dummy_stoat_role_id"
async def create_emoji(self, **kwargs) -> str:
return "dummy_stoat_emoji_id"
async def create_sticker(self, **kwargs) -> str:
return "dummy_stoat_sticker_id"
async def update_guild_metadata(self, **kwargs) -> None:
pass
async def remove_community_logo_and_banner(self) -> dict:
return {"icon": "SKIP", "banner": "SKIP"}
async def delete_all_channels(self, **kwargs) -> int:
return 0
async def reset_channel_permissions(self, **kwargs) -> int:
return 0
async def delete_all_roles(self, **kwargs) -> int:
return 0
async def delete_all_emojis_and_stickers(self, **kwargs) -> dict:
return {"emojis": 0, "stickers": 0}
async def close(self):
pass

View file

@ -55,15 +55,16 @@ console = Console()
class MigrationCLI:
"""Standard CLI app to manage the Discord to Fluxer migration."""
def __init__(self, config_path="fluxer.config.yaml"):
def __init__(self, target_platform="fluxer", config_path="config.yaml"):
self.config_path = config_path
self.target_platform = target_platform
try:
self.config = load_config(self.config_path)
self.engine = MigrationContext(self.config, self.target_platform)
except Exception as e:
console.print(f"[bold red]Failed to load config: {e}[/bold red]")
sys.exit(1)
self.engine = MigrationContext(self.config)
self.progress_callback_task = None
self.tokens_valid = False
@ -80,25 +81,34 @@ class MigrationCLI:
"fluxer_token": False, "fluxer_bot_name": None,
"fluxer_community": False, "fluxer_community_name": None,
"fluxer_permissions": {},
"discord_timeout": False, "fluxer_timeout": False
"stoat_token": False, "stoat_bot_name": None,
"stoat_server": False, "stoat_server_name": None,
"stoat_permissions": {},
"discord_timeout": False, "fluxer_timeout": False, "stoat_timeout": False
}
self.tokens_valid = False
d_token = self.config.discord_bot_token
f_token = self.config.fluxer_bot_token
s_token = self.config.stoat_bot_token
discord_dummy = d_token in ["YOUR_DISCORD_TOKEN", "DISCORD_BOT_TOKEN", ""]
fluxer_dummy = f_token in ["YOUR_FLUXER_TOKEN", "FLUXER_BOT_TOKEN", "YOUR_STOAT_TOKEN", "STOAT_BOT_TOKEN", ""]
stoat_dummy = s_token in ["YOUR_STOAT_TOKEN", "STOAT_BOT_TOKEN", "YOUR_FLUXER_TOKEN", "FLUXER_BOT_TOKEN", ""]
discord_task = None
if not discord_dummy:
discord_task = asyncio.create_task(self.engine.discord_reader.validate())
fluxer_task = None
if not fluxer_dummy:
fluxer_task = asyncio.create_task(self.engine.fluxer_writer.validate())
if not fluxer_dummy and self.target_platform == "fluxer":
fluxer_task = asyncio.create_task(self.engine.writer.validate())
tasks_to_wait = [t for t in [discord_task, fluxer_task] if t is not None]
stoat_task = None
if not stoat_dummy and self.target_platform == "stoat":
stoat_task = asyncio.create_task(self.engine.writer.validate())
tasks_to_wait = [t for t in [discord_task, fluxer_task, stoat_task] if t is not None]
try:
with console.status("[yellow]Validating tokens...[/yellow]"):
@ -139,8 +149,9 @@ class MigrationCLI:
discord_task.cancel()
# Process Fluxer Result
if self.target_platform == "fluxer":
if fluxer_dummy:
console.print("[bold yellow]Target Platform setup incomplete (Using default token).[/bold yellow]")
console.print("[dim yellow]Fluxer platform setup incomplete (Using default token).[/dim yellow]")
elif fluxer_task in done:
try:
res = fluxer_task.result()
@ -153,9 +164,7 @@ class MigrationCLI:
elif not res.get("community"):
console.print("[bold red]Fluxer Community ID validation failed (Invalid/Inaccessible ID).[/bold red]")
else:
# Store permissions
self.validation_results["fluxer_permissions"] = res.get("permissions", {})
except Exception as e:
console.print(f"[bold red]Fluxer validation failed with error: {e}[/bold red]")
else:
@ -163,13 +172,39 @@ class MigrationCLI:
self.validation_results["fluxer_timeout"] = True
fluxer_task.cancel()
# Process Stoat Result
if self.target_platform == "stoat":
if stoat_dummy:
console.print("[dim yellow]Stoat platform setup incomplete (Using default token).[/dim yellow]")
elif stoat_task in done:
try:
res = stoat_task.result()
self.validation_results["stoat_token"] = res.get("token", False)
self.validation_results["stoat_bot_name"] = res.get("bot_name")
self.validation_results["stoat_server"] = res.get("community", False)
self.validation_results["stoat_server_name"] = res.get("community_name")
if not res.get("token"):
console.print("[bold red]Stoat Token validation failed (Invalid Token).[/bold red]")
elif not res.get("community"):
console.print("[bold red]Stoat Server ID validation failed (Invalid/Inaccessible ID).[/bold red]")
else:
self.validation_results["stoat_permissions"] = res.get("permissions", {})
except Exception as e:
console.print(f"[bold red]Stoat validation failed with error: {e}[/bold red]")
else:
console.print("[bold red]Stoat bot token validation timed out after 10 seconds.[/bold red]")
self.validation_results["stoat_timeout"] = True
stoat_task.cancel()
# Only tokens and server/community existence are strictly required for 'tokens_valid'
self.tokens_valid = (
self.validation_results.get("discord_token") and
self.validation_results.get("discord_server") and
self.validation_results.get("fluxer_token") and
self.validation_results.get("fluxer_community")
)
discord_valid = self.validation_results.get("discord_token") and self.validation_results.get("discord_server")
if self.target_platform == "fluxer":
target_valid = self.validation_results.get("fluxer_token") and self.validation_results.get("fluxer_community")
else:
target_valid = self.validation_results.get("stoat_token") and self.validation_results.get("stoat_server")
self.tokens_valid = discord_valid and target_valid
# Check if all permissions are actually granted
self.permissions_complete = True
@ -189,7 +224,7 @@ class MigrationCLI:
console.print(f"[bold red]Validation system failure: {e}[/bold red]")
finally:
# Ensure tasks are cleaned up
for t in [discord_task, fluxer_task]:
for t in [discord_task, fluxer_task, stoat_task]:
if t is not None and not t.done(): t.cancel()
async def run(self):
@ -202,7 +237,10 @@ class MigrationCLI:
"fluxer_token": False, "fluxer_bot_name": None,
"fluxer_community": False, "fluxer_community_name": None,
"fluxer_permissions": {},
"discord_timeout": False, "fluxer_timeout": False
"stoat_token": False, "stoat_bot_name": None,
"stoat_server": False, "stoat_server_name": None,
"stoat_permissions": {},
"discord_timeout": False, "fluxer_timeout": False, "stoat_timeout": False
}
self.tokens_valid = False
self.permissions_complete = False
@ -219,8 +257,15 @@ class MigrationCLI:
f_name = self.validation_results.get("fluxer_community_name")
f_display = f"[bold green]\"{f_name}\"[/bold green]" if f_name else ("[bold yellow]TIMEOUT ERROR[/bold yellow]" if self.validation_results.get("fluxer_timeout") else "[bold red]NOT SET UP[/bold red]")
s_name = self.validation_results.get("stoat_server_name")
s_display = f"[bold green]\"{s_name}\"[/bold green]" if s_name else ("[bold yellow]TIMEOUT ERROR[/bold yellow]" if self.validation_results.get("stoat_timeout") else "[bold red]NOT SET UP[/bold red]")
console.print(f"[bold cyan]Discord Server:[/bold cyan] {d_display}")
if self.target_platform == "fluxer":
console.print(f"[bold #4641D9]Fluxer Community:[/bold #4641D9] {f_display}")
else:
console.print(f"[bold #FF8C00]Stoat Server:[/bold #FF8C00] {s_display}")
console.print("[bold]Main Menu[/bold]")
console.print("(1) Clone Server Template (Channels & Categories)")
console.print("(2) Copy Roles & Permissions")
@ -283,14 +328,22 @@ class MigrationCLI:
f"• [bold]Intents:[/bold] Server Members, {fmt('Message Content', d_intents.get('message_content'))}\n"
f"• [bold]Permissions:[/bold] {fmt('View Channels', d_perms.get('view_channel'))}, {fmt('Read Message History', d_perms.get('read_message_history'))}"
)
if self.target_platform == "fluxer":
perm_table.add_row(
"[bold #4641D9]Fluxer Bot[/bold #4641D9]",
f"• [bold]Permissions:[/bold] {fmt('Manage Channels', f_perms.get('manage_channels'))}, {fmt('Manage Messages', f_perms.get('manage_messages'))},\n"
f" {fmt('Manage Roles', f_perms.get('manage_roles'))}, {fmt('Manage Emojis/Stickers', f_perms.get('manage_emojis_stickers'))}, {fmt('Manage Webhooks', f_perms.get('manage_webhooks'))}"
)
else:
s_perms = self.validation_results.get("stoat_permissions", {})
perm_table.add_row(
"[bold #FF8C00]Stoat Bot[/bold #FF8C00]",
f"• [bold]Permissions:[/bold] {fmt('Manage Channels', s_perms.get('manage_channels'))}, {fmt('Manage Messages', s_perms.get('manage_messages'))},\n"
f" {fmt('Manage Roles', s_perms.get('manage_roles'))}, {fmt('Manage Emojis/Stickers', s_perms.get('manage_emojis_stickers'))}, {fmt('Manage Webhooks', s_perms.get('manage_webhooks'))}"
)
console.print("\n") # Add spacing before panel
console.print(Panel(perm_table, title="[bold]Required Bot Permissions[/bold]", expand=False, border_style="dim"))
console.print(Panel(perm_table, title=f"[bold]Required Bot Permissions (Target: {self.target_platform.capitalize()})[/bold]", expand=False, border_style="dim"))
console.print("\n[bold]Configuration Status:[/bold]")
@ -302,8 +355,13 @@ class MigrationCLI:
console.print(f"Discord Bot Token {get_status_str(self.validation_results.get('discord_token', False), self.validation_results.get('discord_bot_name'))}")
console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False), self.validation_results.get('discord_server_name'))}")
if self.target_platform == "fluxer":
console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False), self.validation_results.get('fluxer_bot_name'))}")
console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False), self.validation_results.get('fluxer_community_name'))}")
else:
console.print(f"Stoat Bot Token {get_status_str(self.validation_results.get('stoat_token', False), self.validation_results.get('stoat_bot_name'))}")
console.print(f"Stoat Server ID {get_status_str(self.validation_results.get('stoat_server', False), self.validation_results.get('stoat_server_name'))}")
console.print("\n(1) Edit tokens")
console.print("(2) Edit API url (for self hosted instances)")
@ -322,23 +380,36 @@ class MigrationCLI:
d_token = Prompt.ask("Discord Bot Token", default=self.config.discord_bot_token)
d_server = Prompt.ask("Discord Server ID", default=self.config.discord_server_id)
if self.target_platform == "fluxer":
f_token = Prompt.ask("Fluxer Bot Token", default=self.config.fluxer_bot_token)
f_comm = Prompt.ask("Fluxer Community ID", default=self.config.fluxer_community_id)
s_token = self.config.stoat_bot_token
s_comm = self.config.stoat_server_id
else:
s_token = Prompt.ask("Stoat Bot Token", default=self.config.stoat_bot_token)
s_comm = Prompt.ask("Stoat Server ID", default=self.config.stoat_server_id)
f_token = self.config.fluxer_bot_token
f_comm = self.config.fluxer_community_id
# Only rewrite if changed
if (d_token != self.config.discord_bot_token or
f_token != self.config.fluxer_bot_token or
d_server != self.config.discord_server_id or
f_comm != self.config.fluxer_community_id):
f_comm != self.config.fluxer_community_id or
s_token != self.config.stoat_bot_token or
s_comm != self.config.stoat_server_id):
self.config.discord_bot_token = d_token
self.config.fluxer_bot_token = f_token
self.config.discord_server_id = d_server
self.config.fluxer_community_id = f_comm
self.config.stoat_bot_token = s_token
self.config.stoat_server_id = s_comm
save_config(self.config, self.config_path)
# Recreate engine with new config
self.engine = MigrationContext(self.config)
self.engine = MigrationContext(self.config, self.target_platform)
# Re-validate
console.print("[yellow]Validating new configuration...[/yellow]")
@ -348,6 +419,8 @@ class MigrationCLI:
console.print(f"Discord Server ID {get_status_str(self.validation_results.get('discord_server', False))}")
console.print(f"Fluxer Bot Token {get_status_str(self.validation_results.get('fluxer_token', False))}")
console.print(f"Fluxer Community ID {get_status_str(self.validation_results.get('fluxer_community', False))}")
console.print(f"Stoat Bot Token {get_status_str(self.validation_results.get('stoat_token', False))}")
console.print(f"Stoat Server ID {get_status_str(self.validation_results.get('stoat_server', False))}")
console.print(f"[bold green]Configuration updated and saved to {self.config_path}![/bold green]")
else:
@ -884,7 +957,7 @@ class MigrationCLI:
# 2. Select Target Fluxer Channel
with console.status("[yellow]Fetching Fluxer channels...[/yellow]"):
f_channels = await self.engine.fluxer_writer.get_channels()
f_channels = await self.engine.writer.get_channels()
if not f_channels:
console.print("[yellow]No channels found in Fluxer community.[/yellow]")
return
@ -941,7 +1014,7 @@ class MigrationCLI:
nsfw = getattr(source_channel, 'nsfw', False)
slowmode = getattr(source_channel, 'slowmode_delay', 0)
new_id = await self.engine.fluxer_writer.create_channel(
new_id = await self.engine.writer.create_channel(
name=source_channel.name,
topic=topic,
type=0,
@ -952,7 +1025,7 @@ class MigrationCLI:
if new_id:
self.engine.state.set_channel_mapping(str(source_channel.id), new_id)
# Refresh list to get the channel object
f_channels = await self.engine.fluxer_writer.get_channels()
f_channels = await self.engine.writer.get_channels()
target_channel = next((c for c in f_channels if str(c.get('id')) == new_id), None)
else:
console.print("[bold red]Failed to create channel.[/bold red]")
@ -1201,7 +1274,7 @@ class MigrationCLI:
if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
await self.engine.start_target_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
@ -1222,7 +1295,7 @@ class MigrationCLI:
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
await self.engine.close_target_only()
# ---- (2) Reset Channel & Category Permissions ----
elif choice == "2":
@ -1234,7 +1307,7 @@ class MigrationCLI:
if not Confirm.ask("[bold red]Last chance \u2013 all custom permission overrides will be wiped. Continue?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
await self.engine.start_target_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
@ -1255,7 +1328,7 @@ class MigrationCLI:
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
await self.engine.close_target_only()
# ---- (3) Delete all Roles ----
elif choice == "3":
@ -1268,7 +1341,7 @@ class MigrationCLI:
if not Confirm.ask("[bold red]Last chance \u2013 confirm permanent role deletion?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
await self.engine.start_target_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
@ -1289,7 +1362,7 @@ class MigrationCLI:
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
await self.engine.close_target_only()
# ---- (4) Delete all Emojis & Stickers ----
elif choice == "4":
@ -1301,7 +1374,7 @@ class MigrationCLI:
if not Confirm.ask("[bold red]Last chance \u2013 this cannot be undone. Continue?[/bold red]"):
return
try:
await self.engine.start_fluxer_only()
await self.engine.start_target_only()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
@ -1323,10 +1396,10 @@ class MigrationCLI:
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
finally:
await self.engine.close_fluxer_only()
await self.engine.close_target_only()
async def run_cli(config_path="fluxer.config.yaml"):
cli = MigrationCLI(config_path)
async def run_cli(target_platform="fluxer", config_path="config.yaml"):
cli = MigrationCLI(target_platform, config_path)
await cli.run()

View file

@ -1,9 +0,0 @@
discord_bot_token: DISCORD_BOT_TOKEN # Token used to connect the Discord Bot
discord_server_id: 'DISCORD_SERVER_ID' # ID of the source Discord Server
stoat_bot_token: STOAT_BOT_TOKEN # Token used to connect the Stoat Bot
stoat_server_id: 'STOAT_SERVER_ID' # ID of the target Stoat Server
stoat_api_url: 'default' # URL of the Stoat API (default is https://api.stoat.chat)
migration:
batch_size: 50
rate_limit_delay_seconds: 2
log_level: INFO