add rate limit notification

This commit is contained in:
rambros 2026-02-21 21:38:23 +05:30
parent a89347f8ee
commit d836f49f6e
3 changed files with 200 additions and 44 deletions

View file

@ -331,7 +331,7 @@ class MigrationEngine:
for idx, role in enumerate(roles): for idx, role in enumerate(roles):
if not self.is_running: break if not self.is_running: break
fluxer_id = self.state.get_fluxer_channel_id(f"role_{role.id}") # reusing mapping method fluxer_id = self.state.get_fluxer_role_id(str(role.id))
if not fluxer_id: if not fluxer_id:
fluxer_id = await self.fluxer_writer.create_role( fluxer_id = await self.fluxer_writer.create_role(
name=role.name, name=role.name,
@ -340,7 +340,7 @@ class MigrationEngine:
mentionable=role.mentionable mentionable=role.mentionable
) )
if fluxer_id: if fluxer_id:
self.state.set_channel_mapping(f"role_{role.id}", fluxer_id) self.state.set_role_mapping(str(role.id), fluxer_id)
if progress_callback: await progress_callback(role.name, idx + 1, total) if progress_callback: await progress_callback(role.name, idx + 1, total)
await asyncio.sleep(self.config.migration.rate_limit_delay_seconds) await asyncio.sleep(self.config.migration.rate_limit_delay_seconds)
@ -364,8 +364,11 @@ class MigrationEngine:
for idx, (obj, obj_type) in enumerate(objs): for idx, (obj, obj_type) in enumerate(objs):
if not self.is_running: break if not self.is_running: break
state_key = f"{obj_type.lower()}_{obj.id}" if obj_type == "Emoji":
fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key) fluxer_id = None if force else self.state.get_fluxer_emoji_id(str(obj.id))
else:
fluxer_id = None if force else self.state.get_fluxer_sticker_id(str(obj.id))
if not fluxer_id: if not fluxer_id:
try: try:
if obj_type == "Emoji": if obj_type == "Emoji":
@ -374,15 +377,16 @@ class MigrationEngine:
name=obj.name, name=obj.name,
image_bytes=img_data image_bytes=img_data
) )
if fluxer_id:
self.state.set_emoji_mapping(str(obj.id), fluxer_id)
else: else:
img_data = await self.discord_reader.download_sticker(obj) img_data = await self.discord_reader.download_sticker(obj)
fluxer_id = await self.fluxer_writer.create_sticker( fluxer_id = await self.fluxer_writer.create_sticker(
name=obj.name, name=obj.name,
image_bytes=img_data image_bytes=img_data
) )
if fluxer_id: if fluxer_id:
self.state.set_channel_mapping(state_key, fluxer_id) self.state.set_sticker_mapping(str(obj.id), fluxer_id)
except Exception as e: except Exception as e:
logger.error(f"Error downloading/uploading {obj_type.lower()} {obj.name}: {e}") logger.error(f"Error downloading/uploading {obj_type.lower()} {obj.name}: {e}")

View file

@ -10,6 +10,8 @@ class MigrationState:
# mappings: discord_id -> fluxer_id # mappings: discord_id -> fluxer_id
self.channel_map: Dict[str, str] = {} self.channel_map: Dict[str, str] = {}
self.role_map: Dict[str, str] = {} self.role_map: Dict[str, str] = {}
self.emoji_map: Dict[str, str] = {}
self.sticker_map: Dict[str, str] = {}
self.user_map: Dict[str, str] = {} self.user_map: Dict[str, str] = {}
self.message_map: Dict[str, str] = {} self.message_map: Dict[str, str] = {}
@ -24,17 +26,41 @@ class MigrationState:
data = json.load(f) data = json.load(f)
self.channel_map = data.get("channels", {}) self.channel_map = data.get("channels", {})
self.role_map = data.get("roles", {}) self.role_map = data.get("roles", {})
self.emoji_map = data.get("emojis", {})
self.sticker_map = data.get("stickers", {})
self.user_map = data.get("users", {}) self.user_map = data.get("users", {})
self.message_map = data.get("messages", {}) self.message_map = data.get("messages", {})
self.last_message_timestamps = data.get("last_message_timestamps", {}) self.last_message_timestamps = data.get("last_message_timestamps", {})
# Legacy Migration: Move role_, emoji_, sticker_ from channel_map to dedicated maps
migrated = False
legacy_keys = list(self.channel_map.keys())
for k in legacy_keys:
if k.startswith("role_"):
discord_id = k.replace("role_", "")
self.role_map[discord_id] = self.channel_map.pop(k)
migrated = True
elif k.startswith("emoji_"):
discord_id = k.replace("emoji_", "")
self.emoji_map[discord_id] = self.channel_map.pop(k)
migrated = True
elif k.startswith("sticker_"):
discord_id = k.replace("sticker_", "")
self.sticker_map[discord_id] = self.channel_map.pop(k)
migrated = True
if migrated:
self.save()
def save(self): def save(self):
data = { data = {
"channels": self.channel_map, "channels": self.channel_map,
"roles": self.role_map, "roles": self.role_map,
"emojis": self.emoji_map,
"stickers": self.sticker_map,
"users": self.user_map, "users": self.user_map,
"messages": self.message_map, "last_message_timestamps": self.last_message_timestamps,
"last_message_timestamps": self.last_message_timestamps "messages": self.message_map
} }
with open(self.state_file, "w", encoding="utf-8") as f: with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4) json.dump(data, f, indent=4)
@ -57,26 +83,45 @@ class MigrationState:
self.last_message_timestamps[str(channel_id)] = timestamp self.last_message_timestamps[str(channel_id)] = timestamp
self.save() self.save()
# --- Type Specific Getters/Setters ---
def set_role_mapping(self, discord_id: str, fluxer_id: str):
self.role_map[str(discord_id)] = str(fluxer_id)
self.save()
def get_fluxer_role_id(self, discord_id: str) -> str | None:
return self.role_map.get(str(discord_id))
def set_emoji_mapping(self, discord_id: str, fluxer_id: str):
self.emoji_map[str(discord_id)] = str(fluxer_id)
self.save()
def get_fluxer_emoji_id(self, discord_id: str) -> str | None:
return self.emoji_map.get(str(discord_id))
def set_sticker_mapping(self, discord_id: str, fluxer_id: str):
self.sticker_map[str(discord_id)] = str(fluxer_id)
self.save()
def get_fluxer_sticker_id(self, discord_id: str) -> str | None:
return self.sticker_map.get(str(discord_id))
# --- Danger Zone Clearing ---
def clear_channel_mappings(self): def clear_channel_mappings(self):
"""Clears all channel and category mappings (excludes roles/emojis/stickers).""" """Clears all channel and category mappings."""
to_remove = [k for k in self.channel_map.keys() if k.isdigit()] self.channel_map.clear()
for k in to_remove:
del self.channel_map[k]
self.save() self.save()
def clear_role_mappings(self): def clear_role_mappings(self):
"""Clears all role mappings.""" """Clears all role mappings."""
to_remove = [k for k in self.channel_map.keys() if k.startswith("role_")]
for k in to_remove:
del self.channel_map[k]
self.role_map.clear() self.role_map.clear()
self.save() self.save()
def clear_asset_mappings(self): def clear_asset_mappings(self):
"""Clears all emoji and sticker mappings.""" """Clears all emoji and sticker mappings."""
to_remove = [k for k in self.channel_map.keys() if k.startswith("emoji_") or k.startswith("sticker_")] self.emoji_map.clear()
for k in to_remove: self.sticker_map.clear()
del self.channel_map[k]
self.save() self.save()
def clear_message_history(self): def clear_message_history(self):

View file

@ -1,5 +1,7 @@
import sys import sys
import asyncio import asyncio
import logging
import re
from rich.console import Console from rich.console import Console
from rich.prompt import Prompt, Confirm from rich.prompt import Prompt, Confirm
from rich.panel import Panel from rich.panel import Panel
@ -7,6 +9,39 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskPr
from src.config import load_config, save_config from src.config import load_config, save_config
from src.core.engine import MigrationEngine from src.core.engine import MigrationEngine
class RateLimitHandler(logging.Handler):
"""Intersects library logs to print clean rate limit messages."""
def __init__(self):
super().__init__()
self._last_print = ""
def emit(self, record):
try:
msg = record.getMessage()
# Detect rate limit messages from discord.py or fluxer.py
if "retry" in msg.lower() and ("rate limit" in msg.lower() or "429" in msg):
# Extract seconds using regex: supports "retry in 5.50s" and "Retrying in 5.50 seconds"
match = re.search(r"in ([\d.]+)\s*(?:seconds?|s)", msg, re.IGNORECASE)
if match:
seconds = match.group(1)
platform = "discord" if "discord" in record.name.lower() else "fluxer"
# Format the message
new_msg = f"{platform} API rate limit: will retry after {seconds}"
# Avoid spamming the exact same message if nothing changed
if new_msg == self._last_print:
return
self._last_print = new_msg
# Use rich console to print on the same line.
# end="\r" works with rich's internal live-update handling.
# We add some padding to clear old text.
console.print(f"{new_msg} ", end="\r")
except Exception:
pass
console = Console() console = Console()
class MigrationCLI: class MigrationCLI:
@ -23,6 +58,11 @@ class MigrationCLI:
self.progress_callback_task = None self.progress_callback_task = None
self.tokens_valid = False self.tokens_valid = False
# Register rate limit interceptor for both libraries
rl_handler = RateLimitHandler()
logging.getLogger("discord").addHandler(rl_handler)
logging.getLogger("fluxer").addHandler(rl_handler)
async def validate_config(self): async def validate_config(self):
self.validation_results = { self.validation_results = {
"discord_token": False, "discord_bot_name": None, "discord_token": False, "discord_bot_name": None,
@ -249,16 +289,20 @@ class MigrationCLI:
all_ids = [str(cat.id) for cat in categories] + [str(ch.id) for ch in channels] all_ids = [str(cat.id) for cat in categories] + [str(ch.id) for ch in channels]
cached_count = sum(1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k)) cached_count = sum(1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k))
# Prompt for confirmation
force = False force = False
if cached_count > 0: if cached_count > 0:
console.print(f"[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) already in state.json cache.[/yellow]") console.print(f"[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) already in state.json cache.[/yellow]")
force = Confirm.ask("Force re-clone anyway?", default=False) force = Confirm.ask("Force re-clone anyway?", default=False)
elif not Confirm.ask("Are you sure you want to clone channels and categories?"): if not force:
if not Confirm.ask("Continue with only missing items?", default=True):
await self.engine.close_connections() await self.engine.close_connections()
return return
else:
if cached_count == 0 or force: if not Confirm.ask("Clone channels and categories?", default=True):
if not force and not Confirm.ask("Are you sure you want to clone channels and categories?"): await self.engine.close_connections()
return
if not Confirm.ask("Are you sure?", default=True):
await self.engine.close_connections() await self.engine.close_connections()
return return
@ -365,24 +409,25 @@ class MigrationCLI:
console.print(f"\n[bold]Custom emojis found: {len(emojis)}[/bold]") console.print(f"\n[bold]Custom emojis found: {len(emojis)}[/bold]")
for e in emojis: for e in emojis:
already = self.engine.state.get_fluxer_channel_id(f"emoji_{e.id}") already = self.engine.state.get_fluxer_emoji_id(str(e.id))
tag = " [dim](already copied)[/dim]" if already else "" tag = " [dim](already copied)[/dim]" if already else ""
console.print(f" - Emoji: {e.name}{tag}") console.print(f" - Emoji: {e.name}{tag}")
console.print(f"[bold]Custom stickers found: {len(stickers)}[/bold]") console.print(f"[bold]Custom stickers found: {len(stickers)}[/bold]")
for s in stickers: for s in stickers:
already = self.engine.state.get_fluxer_channel_id(f"sticker_{s.id}") already = self.engine.state.get_fluxer_sticker_id(str(s.id))
tag = " [dim](already copied)[/dim]" if already else "" tag = " [dim](already copied)[/dim]" if already else ""
console.print(f" - Sticker: {s.name}{tag}") console.print(f" - Sticker: {s.name}{tag}")
# Warn if everything is already in the state cache # Warn if everything is already in the state cache
all_ids = ([f"emoji_{e.id}" for e in emojis] + force = False
[f"sticker_{s.id}" for s in stickers]) cached_emojis = sum(1 for e in emojis if self.engine.state.get_fluxer_emoji_id(str(e.id)))
cached_count = sum( cached_stickers = sum(1 for s in stickers if self.engine.state.get_fluxer_sticker_id(str(s.id)))
1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k) total_items = len(emojis) + len(stickers)
) cached_count = cached_emojis + cached_stickers
if cached_count > 0: if cached_count > 0:
console.print(f"\n[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) marked as already copied in state.json.[/yellow]") console.print(f"\n[yellow]\u26a0 {cached_count}/{total_items} item(s) marked as already copied in state.json.[/yellow]")
console.print("[yellow] If the target community was reset, choose Force Re-copy.[/yellow]") console.print("[yellow] If the target community was reset, choose Force Re-copy.[/yellow]")
console.print("\n(1) Copy Emojis only") console.print("\n(1) Copy Emojis only")
@ -404,14 +449,11 @@ class MigrationCLI:
types_to_include = ["Emoji", "Sticker"] types_to_include = ["Emoji", "Sticker"]
# Ask about force re-copy only if there are cached items in scope # Ask about force re-copy only if there are cached items in scope
in_scope_keys = [] cached_in_scope = 0
if "Emoji" in types_to_include: if "Emoji" in types_to_include:
in_scope_keys += [f"emoji_{e.id}" for e in emojis] cached_in_scope += sum(1 for e in emojis if self.engine.state.get_fluxer_emoji_id(str(e.id)))
if "Sticker" in types_to_include: if "Sticker" in types_to_include:
in_scope_keys += [f"sticker_{s.id}" for s in stickers] cached_in_scope += sum(1 for s in stickers if self.engine.state.get_fluxer_sticker_id(str(s.id)))
cached_in_scope = sum(
1 for k in in_scope_keys if self.engine.state.get_fluxer_channel_id(k)
)
force = False force = False
if cached_in_scope > 0: if cached_in_scope > 0:
@ -521,8 +563,15 @@ class MigrationCLI:
for i, ch in enumerate(d_channels): for i, ch in enumerate(d_channels):
console.print(f"({i+1}) {ch.name}") console.print(f"({i+1}) {ch.name}")
d_choices = [str(i+1) for i in range(len(d_channels))] console.print("(B) Back")
d_choice = Prompt.ask("Select Discord Channel", choices=d_choices) d_choices = [str(i+1) for i in range(len(d_channels))] + ["B", "b"]
prompt_msg = f"Select Discord Channel [[bold cyan](1-{len(d_channels)})/B[/bold cyan]]"
d_choice = Prompt.ask(prompt_msg, choices=d_choices, show_choices=False).upper()
if d_choice == "B":
return
source_channel = d_channels[int(d_choice) - 1] source_channel = d_channels[int(d_choice) - 1]
# 2. Select Target Fluxer Channel # 2. Select Target Fluxer Channel
@ -531,12 +580,70 @@ class MigrationCLI:
console.print("[yellow]No channels found in Fluxer community.[/yellow]") console.print("[yellow]No channels found in Fluxer community.[/yellow]")
return return
# Determine recommended channel
recommended_channel = None
# Priority 1: Check state.json mapping
mapped_id = self.engine.state.get_fluxer_channel_id(str(source_channel.id))
if mapped_id:
recommended_channel = next((c for c in f_channels if str(c.get('id', '')) == mapped_id), None)
# Priority 2: Check name match
if not recommended_channel:
recommended_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None)
console.print("\n[bold]Select Target Fluxer Channel:[/bold]") console.print("\n[bold]Select Target Fluxer Channel:[/bold]")
for i, ch in enumerate(f_channels): for i, ch in enumerate(f_channels):
console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}") console.print(f"({i+1}) {ch.get('name', 'Unnamed Channel')}")
f_choices = [str(i+1) for i in range(len(f_channels))] f_choices = [str(i+1) for i in range(len(f_channels))] + ["B", "b", "N", "n"]
f_choice = Prompt.ask("Select Fluxer Channel", choices=f_choices)
if recommended_channel:
console.print(f"(Y) [bold green]{recommended_channel.get('name')}[/bold green] (auto-matched)")
f_choices += ["Y", "y"]
console.print("(N) Create new channel")
console.print("(B) Back")
# Build simplified prompt string
prompt_parts = [f"(1-{len(f_channels)})", "B", "N"]
if recommended_channel:
prompt_parts.append("Y")
prompt_msg = f"Select Fluxer Channel [[bold cyan]{'/'.join(prompt_parts)}[/bold cyan]]"
f_choice = Prompt.ask(prompt_msg, choices=f_choices, show_choices=False, default="Y" if recommended_channel else None).upper()
if f_choice == "B":
return
if f_choice == "Y" and recommended_channel:
target_channel = recommended_channel
elif f_choice == "N":
# Check if a channel with this name already exists
target_channel = next((c for c in f_channels if c.get('name') == source_channel.name), None)
if not target_channel:
with console.status(f"[yellow]Creating Fluxer channel #{source_channel.name}...[/yellow]"):
parent_id = None
if source_channel.category_id:
parent_id = self.engine.state.get_fluxer_channel_id(str(source_channel.category_id))
topic = getattr(source_channel, 'topic', "") or ""
new_id = await self.engine.fluxer_writer.create_channel(
name=source_channel.name,
topic=topic,
type=0,
parent_id=parent_id
)
if new_id:
self.engine.state.set_channel_mapping(str(source_channel.id), new_id)
# Refresh list to get the channel object
f_channels = await self.engine.fluxer_writer.get_channels()
target_channel = next((c for c in f_channels if str(c.get('id')) == new_id), None)
else:
console.print("[bold red]Failed to create channel.[/bold red]")
return
else:
target_channel = f_channels[int(f_choice) - 1] target_channel = f_channels[int(f_choice) - 1]
# 3. Handle Starting Message # 3. Handle Starting Message