update state handling for deduplicated syncing

This commit is contained in:
rambros 2026-02-21 22:40:10 +05:30
parent d836f49f6e
commit 667cb4e902
4 changed files with 213 additions and 38 deletions

View file

@ -1,5 +1,5 @@
discord.py>=2.3.2
git+https://github.com/akarealemil/fluxer.py
fluxer.py
rich>=13.7.0
PyYAML>=6.0.1
pydantic>=2.5.3 # Good for configuration validation and mapping

View file

@ -110,6 +110,99 @@ class MigrationEngine:
except Exception:
await progress_callback("Server Banner", "ERROR")
async def sync_channel_state(self):
"""
Scans Fluxer for channels matching Discord names and updates state.json mappings.
This prevents duplicate creation when the state.json is empty but channels exist in Fluxer.
"""
categories = await self.discord_reader.get_categories()
channels = await self.discord_reader.get_channels()
fluxer_channels = await self.fluxer_writer.get_channels()
# Build name -> id map and ID set for Fluxer for fast lookup
fluxer_name_map = {c.get("name"): str(c.get("id")) for c in fluxer_channels if c.get("name")}
fluxer_id_set = {str(c.get("id")) for c in fluxer_channels}
updates = 0
removals = 0
# 1. Verify and Sync Categories
for cat in categories:
discord_id = str(cat.id)
fluxer_id = self.state.get_fluxer_category_id(discord_id)
if fluxer_id:
if fluxer_id not in fluxer_id_set:
self.state.remove_category_mapping(discord_id)
removals += 1
elif cat.name in fluxer_name_map:
self.state.set_category_mapping(discord_id, fluxer_name_map[cat.name])
updates += 1
# 2. Verify and Sync Channels
for ch in channels:
discord_id = str(ch.id)
fluxer_id = self.state.get_fluxer_channel_id(discord_id)
if fluxer_id:
if fluxer_id not in fluxer_id_set:
self.state.remove_channel_mapping(discord_id)
removals += 1
elif ch.name in fluxer_name_map:
self.state.set_channel_mapping(discord_id, fluxer_name_map[ch.name])
updates += 1
if updates > 0 or removals > 0:
logger.info(f"Channel sync: {updates} mapped, {removals} stale mappings removed")
async def sync_assets_state(self):
"""
Scans Fluxer for emojis and stickers matching Discord names and updates state.json mappings.
"""
discord_emojis = await self.discord_reader.get_emojis()
discord_stickers = await self.discord_reader.get_stickers()
fluxer_emojis = await self.fluxer_writer.client.get_guild_emojis(self.config.fluxer_community_id)
fluxer_stickers = await self.fluxer_writer.client.get_guild_stickers(self.config.fluxer_community_id)
# Build name -> id maps and ID sets for Fluxer for fast lookup
fluxer_emoji_map = {e.get("name"): str(e.get("id")) for e in fluxer_emojis if e.get("name")}
fluxer_sticker_map = {s.get("name"): str(s.get("id")) for s in fluxer_stickers if s.get("name")}
fluxer_emoji_ids = {str(e.get("id")) for e in fluxer_emojis}
fluxer_sticker_ids = {str(s.get("id")) for s in fluxer_stickers}
updates = 0
removals = 0
# 1. Verify and Sync Emojis
for emoji in discord_emojis:
discord_id = str(emoji.id)
fluxer_id = self.state.get_fluxer_emoji_id(discord_id)
if fluxer_id:
if fluxer_id not in fluxer_emoji_ids:
self.state.remove_emoji_mapping(discord_id)
removals += 1
elif emoji.name in fluxer_emoji_map:
self.state.set_emoji_mapping(discord_id, fluxer_emoji_map[emoji.name])
updates += 1
# 2. Verify and Sync Stickers
for sticker in discord_stickers:
discord_id = str(sticker.id)
fluxer_id = self.state.get_fluxer_sticker_id(discord_id)
if fluxer_id:
if fluxer_id not in fluxer_sticker_ids:
self.state.remove_sticker_mapping(discord_id)
removals += 1
elif sticker.name in fluxer_sticker_map:
self.state.set_sticker_mapping(discord_id, fluxer_sticker_map[sticker.name])
updates += 1
if updates > 0 or removals > 0:
logger.info(f"Asset sync: {updates} mapped, {removals} stale mappings removed")
async def migrate_channels(self, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, force: bool = False):
"""Clones categories and text channels.
@ -128,13 +221,13 @@ class MigrationEngine:
if not self.is_running: break
state_key = str(cat.id)
fluxer_id = None if force else self.state.get_fluxer_channel_id(state_key)
fluxer_id = None if force else self.state.get_fluxer_category_id(state_key)
status = "Copying"
if not fluxer_id:
# 4 corresponds to Category type in Discord/Fluxer typically
fluxer_id = await self.fluxer_writer.create_channel(cat.name, type=4)
self.state.set_channel_mapping(state_key, fluxer_id)
self.state.set_category_mapping(state_key, fluxer_id)
else:
status = "Skipping"
@ -152,7 +245,7 @@ class MigrationEngine:
if not fluxer_id:
topic = channel.topic if channel.topic else ""
parent_id = self.state.get_fluxer_channel_id(str(channel.category_id)) if channel.category_id else None
parent_id = self.state.get_fluxer_category_id(str(channel.category_id)) if channel.category_id else None
fluxer_id = await self.fluxer_writer.create_channel(
name=channel.name,

View file

@ -9,10 +9,10 @@ class MigrationState:
self.state_file = Path(state_file)
# mappings: discord_id -> fluxer_id
self.channel_map: Dict[str, str] = {}
self.category_map: Dict[str, str] = {}
self.role_map: Dict[str, str] = {}
self.emoji_map: Dict[str, str] = {}
self.sticker_map: Dict[str, str] = {}
self.user_map: Dict[str, str] = {}
self.message_map: Dict[str, str] = {}
# tracking last message timestamp per channel to resume
@ -25,10 +25,10 @@ class MigrationState:
with open(self.state_file, "r", encoding="utf-8") as f:
data = json.load(f)
self.channel_map = data.get("channels", {})
self.category_map = data.get("categories", {})
self.role_map = data.get("roles", {})
self.emoji_map = data.get("emojis", {})
self.sticker_map = data.get("stickers", {})
self.user_map = data.get("users", {})
self.message_map = data.get("messages", {})
self.last_message_timestamps = data.get("last_message_timestamps", {})
@ -55,10 +55,10 @@ class MigrationState:
def save(self):
data = {
"channels": self.channel_map,
"categories": self.category_map,
"roles": self.role_map,
"emojis": self.emoji_map,
"stickers": self.sticker_map,
"users": self.user_map,
"last_message_timestamps": self.last_message_timestamps,
"messages": self.message_map
}
@ -72,6 +72,21 @@ class MigrationState:
def get_fluxer_channel_id(self, discord_id: str) -> str | None:
return self.channel_map.get(str(discord_id))
def remove_channel_mapping(self, discord_id: str):
self.channel_map.pop(str(discord_id), None)
self.save()
def set_category_mapping(self, discord_id: str, fluxer_id: str):
self.category_map[str(discord_id)] = str(fluxer_id)
self.save()
def get_fluxer_category_id(self, discord_id: str) -> str | None:
return self.category_map.get(str(discord_id))
def remove_category_mapping(self, discord_id: str):
self.category_map.pop(str(discord_id), None)
self.save()
def set_message_mapping(self, discord_id: str, fluxer_id: str):
self.message_map[str(discord_id)] = str(fluxer_id)
self.save()
@ -99,6 +114,10 @@ class MigrationState:
def get_fluxer_emoji_id(self, discord_id: str) -> str | None:
return self.emoji_map.get(str(discord_id))
def remove_emoji_mapping(self, discord_id: str):
self.emoji_map.pop(str(discord_id), None)
self.save()
def set_sticker_mapping(self, discord_id: str, fluxer_id: str):
self.sticker_map[str(discord_id)] = str(fluxer_id)
self.save()
@ -106,11 +125,16 @@ class MigrationState:
def get_fluxer_sticker_id(self, discord_id: str) -> str | None:
return self.sticker_map.get(str(discord_id))
def remove_sticker_mapping(self, discord_id: str):
self.sticker_map.pop(str(discord_id), None)
self.save()
# --- Danger Zone Clearing ---
def clear_channel_mappings(self):
"""Clears all channel and category mappings."""
self.channel_map.clear()
self.category_map.clear()
self.save()
def clear_role_mappings(self):

View file

@ -239,6 +239,8 @@ class MigrationCLI:
channels = []
try:
await self.engine.start_connections()
with console.status("[yellow]Syncing Fluxer channel state...[/yellow]"):
await self.engine.sync_channel_state()
categories = await self.engine.discord_reader.get_categories()
channels = await self.engine.discord_reader.get_channels()
except Exception as e:
@ -286,18 +288,62 @@ class MigrationCLI:
console.print("")
# Check for existing mappings to determine if we should suggest a force re-copy
all_ids = [str(cat.id) for cat in categories] + [str(ch.id) for ch in channels]
cached_count = sum(1 for k in all_ids if self.engine.state.get_fluxer_channel_id(k))
cached_count = sum(1 for cat in categories if self.engine.state.get_fluxer_category_id(str(cat.id)))
cached_count += sum(1 for ch in channels if self.engine.state.get_fluxer_channel_id(str(ch.id)))
all_ids_len = len(categories) + len(channels)
# Prompt for confirmation
force = False
if cached_count > 0:
console.print(f"[yellow]\u26a0 {cached_count}/{len(all_ids)} item(s) already in state.json cache.[/yellow]")
force = Confirm.ask("Force re-clone anyway?", default=False)
if not force:
if not Confirm.ask("Continue with only missing items?", default=True):
console.print(f"[yellow]\u26a0 {cached_count}/{all_ids_len} item(s) already in state.json cache.[/yellow]")
# List missing items
missing_categories = [cat for cat in categories if not self.engine.state.get_fluxer_category_id(str(cat.id))]
missing_channels = [ch for ch in channels if not self.engine.state.get_fluxer_channel_id(str(ch.id))]
if missing_categories or missing_channels:
console.print("\n[bold red]The following channels/categories are missing in your fluxer server:[/bold red]")
# Group missing channels by their categories
missing_by_cat = {}
missing_uncategorized = []
for ch in missing_channels:
cat_id = str(ch.category_id) if ch.category_id else None
if cat_id:
if cat_id not in missing_by_cat: missing_by_cat[cat_id] = []
missing_by_cat[cat_id].append(ch)
else:
missing_uncategorized.append(ch)
# Iterate through all categories to print missing ones or categories with missing children
for cat in categories:
cat_id_str = str(cat.id)
is_cat_missing = not self.engine.state.get_fluxer_category_id(cat_id_str)
child_missing_channels = missing_by_cat.get(cat_id_str, [])
if is_cat_missing or child_missing_channels:
footer = " [dim](Category itself is missing)[/dim]" if is_cat_missing else ""
console.print(f"[bold yellow]{cat.name}[/bold yellow]{footer}")
for ch in child_missing_channels:
print_channel(ch)
if missing_uncategorized:
console.print(f"[bold yellow]Uncategorized[/bold yellow]")
for ch in missing_uncategorized:
print_channel(ch)
console.print("")
console.print("[bold green](Y) Continue with only missing items[/bold green]")
console.print("[bold red](F) Force re-clone, creates duplicate channels![/bold red]")
console.print("[bold yellow](B) Back[/bold yellow]")
choice = Prompt.ask("Select an option", choices=["Y", "F", "B"], default="Y").upper()
if choice == "B":
await self.engine.close_connections()
return
elif choice == "F":
force = True
# if 'Y', force remains False and we continue
else:
if not Confirm.ask("Clone channels and categories?", default=True):
await self.engine.close_connections()
@ -404,6 +450,10 @@ class MigrationCLI:
console.print("\n[yellow]Fetching emojis and stickers...[/yellow]")
try:
await self.engine.start_connections()
with console.status("[yellow]Checking Fluxer for existing emojis and stickers...[/yellow]"):
await self.engine.sync_assets_state()
emojis = await self.engine.discord_reader.get_emojis()
stickers = await self.engine.discord_reader.get_stickers()
@ -426,13 +476,11 @@ class MigrationCLI:
total_items = len(emojis) + len(stickers)
cached_count = cached_emojis + cached_stickers
if cached_count > 0:
console.print(f"\n[yellow]\u26a0 {cached_count}/{total_items} item(s) marked as already copied in state.json.[/yellow]")
console.print("[yellow] If the target community was reset, choose Force Re-copy.[/yellow]")
console.print("\n(1) Copy Emojis only")
console.print("(2) Copy Stickers only")
console.print("(3) Copy Emojis and Stickers")
console.print("\n(1) Sync Emojis only")
console.print("(2) Sync Stickers only")
console.print("(3) Sync Emojis and Stickers")
console.print("(B) Back")
choice = Prompt.ask("Select an option", choices=["1", "2", "3", "B", "b"], default="B").upper()
@ -457,10 +505,18 @@ class MigrationCLI:
force = False
if cached_in_scope > 0:
force = Confirm.ask(
f"[yellow]{cached_in_scope} item(s) already in state cache. Force re-copy anyway?[/yellow]",
default=False
)
console.print(f"\n[yellow]{cached_in_scope} item(s) already in state cache.[/yellow]")
console.print("[bold green](Y) Copy missing items only[/bold green]")
console.print("[bold red](F) Force Overwrite[/bold red]")
console.print("[bold yellow](B) Back[/bold yellow]")
choice = Prompt.ask("Select an option", choices=["Y", "F", "B"], default="Y").upper()
if choice == "B":
return
elif choice == "F":
force = True
# if 'Y', force remains False and we continue
console.print("\n[bold green]Starting Migration...[/bold green]")
with Progress(
@ -548,13 +604,10 @@ class MigrationCLI:
return
try:
# Note: We don't call start_connections here because engine methods handles it
# or we handle it manually if we need to fetch data before migration.
# Actually, to get channels, we need connections.
with console.status("[yellow]Fetching Discord channels...[/yellow]"):
await self.engine.start_connections()
# 1. Select Source Discord Channel
d_channels = await self.engine.discord_reader.get_channels()
if not d_channels:
console.print("[yellow]No text channels found in Discord server.[/yellow]")
return
@ -575,6 +628,7 @@ class MigrationCLI:
source_channel = d_channels[int(d_choice) - 1]
# 2. Select Target Fluxer Channel
with console.status("[yellow]Fetching Fluxer channels...[/yellow]"):
f_channels = await self.engine.fluxer_writer.get_channels()
if not f_channels:
console.print("[yellow]No channels found in Fluxer community.[/yellow]")
@ -626,7 +680,7 @@ class MigrationCLI:
with console.status(f"[yellow]Creating Fluxer channel #{source_channel.name}...[/yellow]"):
parent_id = None
if source_channel.category_id:
parent_id = self.engine.state.get_fluxer_channel_id(str(source_channel.category_id))
parent_id = self.engine.state.get_fluxer_category_id(str(source_channel.category_id))
topic = getattr(source_channel, 'topic', "") or ""
new_id = await self.engine.fluxer_writer.create_channel(
@ -647,6 +701,7 @@ class MigrationCLI:
target_channel = f_channels[int(f_choice) - 1]
# 3. Handle Starting Message
with console.status("[yellow]Fetching first message...[/yellow]"):
first_msg = await self.engine.discord_reader.get_first_message(source_channel.id)
after_id = None
@ -658,12 +713,15 @@ class MigrationCLI:
console.print(f"Author: [blue]{first_msg.author.name}[/blue]")
console.print(f"Content Preview: {first_msg.content[:100]}...")
prompt_text = "Start migration from this oldest message? (Y for Yes, S for Specific Link, B to Back)"
start_mode = Prompt.ask(prompt_text, choices=["Y", "S", "B"], default="Y").upper()
console.print("\n(Y) [green]Yes, start from oldest message[/green]")
console.print("(M) Provide Specific message link or ID")
console.print("(B) Back")
start_mode = Prompt.ask("Start migration", choices=["Y", "M", "B"], default="Y").upper()
if start_mode == "B":
return
elif start_mode == "S":
elif start_mode == "M":
custom_link = Prompt.ask("Enter Discord Message Link")
try:
# Extract message ID from end of link