add resumability for waterfall mode

This commit is contained in:
rambros 2026-03-28 19:21:28 +05:30
parent 220f97aad4
commit 9074582a27
5 changed files with 269 additions and 69 deletions

View file

@ -451,34 +451,48 @@ class MigrationDatabase:
return dict(row) return dict(row)
return {"last_msg_id": None, "last_msg_ts": None, "msg_count": 0, "file_count": 0} return {"last_msg_id": None, "last_msg_ts": None, "msg_count": 0, "file_count": 0}
def get_global_min_last_message_id(self, mapped_channel_ids: List[str]) -> Optional[str]:
"""Returns the minimum last_msg_id across all mapped channels. If any mapped channel has NO last_msg_id, returns None.""" def get_global_min_last_message_id(self, all_mapped_ids: List[str]) -> Optional[int]:
if not mapped_channel_ids: """
Returns the minimum last_msg_id successfully migrated across all mapped channels/threads.
If any mapped entity has no progress record, it is treated as ID 0.
Returns None only if NO progress has been made across ANY entity.
"""
if not all_mapped_ids:
return None return None
conn = self._get_conn() conn = self._get_conn()
placeholders = ",".join(["?"] * len(mapped_channel_ids)) placeholders = ",".join(["?"] * len(all_mapped_ids))
rows = conn.execute(f"SELECT last_msg_id FROM channel_tracking WHERE channel_id IN ({placeholders})", mapped_channel_ids).fetchall()
# If the number of tracked channels is less than mapped, it means some mapped channels haven't started. # 1. Get last message IDs from channel tracking
if len(rows) < len(mapped_channel_ids): c_rows = conn.execute(f"SELECT channel_id, last_msg_id FROM channel_tracking WHERE channel_id IN ({placeholders})", all_mapped_ids).fetchall()
return None c_map = {r["channel_id"]: r["last_msg_id"] for r in c_rows}
# Parse all ids # 2. Get last message IDs from thread tracking
t_rows = conn.execute(f"SELECT thread_id, last_msg_id FROM thread_tracking WHERE thread_id IN ({placeholders})", all_mapped_ids).fetchall()
t_map = {r["thread_id"]: r["last_msg_id"] for r in t_rows}
# Combine maps
progress_map = {**c_map, **t_map}
# 3. Aggregate IDs
ids = [] ids = []
for r in rows: has_any_progress = False
val = r["last_msg_id"] for mid in all_mapped_ids:
if not val: last_id = progress_map.get(mid)
return None # One channel has no messages yet if not last_id:
try: ids.append(0) # Unmigrated entity
ids.append(int(val)) else:
except ValueError: try:
pass ids.append(int(last_id))
has_any_progress = True
except (ValueError, TypeError):
ids.append(0)
if not ids: if not has_any_progress:
return None return None
return str(min(ids)) return min(ids)
# Thread methods similar to channel methods # Thread methods similar to channel methods
def set_thread_message_mapping(self, channel_id: str, thread_id: str, source_id: str, target_id: str, timestamp: str = None): def set_thread_message_mapping(self, channel_id: str, thread_id: str, source_id: str, target_id: str, timestamp: str = None):
@ -525,6 +539,18 @@ class MigrationDatabase:
return dict(row) return dict(row)
return {"last_msg_id": None, "last_msg_ts": None, "msg_count": 0, "file_count": 0} return {"last_msg_id": None, "last_msg_ts": None, "msg_count": 0, "file_count": 0}
def get_all_channel_tracking_ids(self) -> Dict[str, str]:
"""Returns a map of channel_id -> last_msg_id for all tracked channels."""
conn = self._get_conn()
rows = conn.execute("SELECT channel_id, last_msg_id FROM channel_tracking WHERE last_msg_id IS NOT NULL").fetchall()
return {str(row["channel_id"]): str(row["last_msg_id"]) for row in rows}
def get_all_thread_tracking_ids(self) -> Dict[str, str]:
"""Returns a map of thread_id -> last_msg_id for all tracked threads."""
conn = self._get_conn()
rows = conn.execute("SELECT thread_id, last_msg_id FROM thread_tracking WHERE last_msg_id IS NOT NULL").fetchall()
return {str(row["thread_id"]): str(row["last_msg_id"]) for row in rows}
def clear_channel_data(self, channel_id: str): def clear_channel_data(self, channel_id: str):
"""Purge all mappings and tracking data for a specific channel and its threads.""" """Purge all mappings and tracking data for a specific channel and its threads."""
conn = self._get_conn() conn = self._get_conn()

View file

@ -38,6 +38,13 @@ class MigrationState:
if self.db: if self.db:
self.db.delete_server_mapping("channel", str(discord_id)) self.db.delete_server_mapping("channel", str(discord_id))
def remove_target_channel_mapping(self, discord_id: int | str):
if self.db:
self.db.delete_server_mapping("channel", str(discord_id))
def set_target_channel_id(self, discord_id: int | str, target_id: str, *args):
"""Alias for set_channel_mapping to handle legacy calls."""
self.set_channel_mapping(discord_id, target_id)
get_fluxer_channel_id = get_target_channel_id get_fluxer_channel_id = get_target_channel_id
set_target_channel_mapping = set_channel_mapping set_target_channel_mapping = set_channel_mapping
@ -58,6 +65,10 @@ class MigrationState:
if self.db: if self.db:
self.db.delete_server_mapping("category", str(discord_id)) self.db.delete_server_mapping("category", str(discord_id))
def set_target_category_id(self, discord_id: int | str, target_id: str, *args):
"""Alias for set_category_mapping to handle legacy calls."""
self.set_category_mapping(discord_id, target_id)
get_fluxer_category_id = get_category_mapping get_fluxer_category_id = get_category_mapping
get_target_category_id = get_category_mapping get_target_category_id = get_category_mapping
set_target_category_mapping = set_category_mapping set_target_category_mapping = set_category_mapping
@ -78,6 +89,10 @@ class MigrationState:
if self.db: if self.db:
self.db.delete_server_mapping("role", str(discord_id)) self.db.delete_server_mapping("role", str(discord_id))
def set_target_role_id(self, discord_id: int | str, target_id: str, *args):
"""Alias for set_role_mapping to handle legacy calls."""
self.set_role_mapping(discord_id, target_id)
get_fluxer_role_id = get_role_mapping get_fluxer_role_id = get_role_mapping
get_target_role_id = get_role_mapping get_target_role_id = get_role_mapping
set_target_role_mapping = set_role_mapping set_target_role_mapping = set_role_mapping
@ -210,17 +225,31 @@ class MigrationState:
if self._ensure_db(): if self._ensure_db():
self.db.update_channel_tracking(str(target_channel_id), last_msg_id=str(message_id)) self.db.update_channel_tracking(str(target_channel_id), last_msg_id=str(message_id))
def get_last_message_id(self, target_channel_id: str) -> str | None:
def get_global_min_last_message_id(self, all_mapped_ids: List[str]) -> int | None:
"""Returns the absolute minimum last_msg_id among the given list of mapped target IDs (channels and threads)."""
if self._ensure_db(): if self._ensure_db():
return self.db.get_channel_tracking(str(target_channel_id)).get("last_msg_id") return self.db.get_global_min_last_message_id(all_mapped_ids)
return None return None
def get_global_min_last_message_id(self, mapped_channel_ids: List[str]) -> str | None: def set_waterfall_last_id(self, last_id: str | int):
"""Returns the absolute minimum last_msg_id among the given list of mapped target channel IDs.""" if self.db:
if self._ensure_db(): self.db.set_metadata("waterfall_last_id", str(last_id))
return self.db.get_global_min_last_message_id(mapped_channel_ids)
def get_waterfall_last_id(self) -> int | None:
if self.db:
val = self.db.get_metadata("waterfall_last_id")
return int(val) if val else None
return None return None
def get_all_last_message_ids(self) -> Dict[str, str]:
"""Returns a combined map of channel_id/thread_id -> last_msg_id."""
if self._ensure_db():
c_map = self.db.get_all_channel_tracking_ids()
t_map = self.db.get_all_thread_tracking_ids()
return {**c_map, **t_map}
return {}
def get_thread_last_message_id(self, target_channel_id: str, thread_id: str) -> str | None: def get_thread_last_message_id(self, target_channel_id: str, thread_id: str) -> str | None:
if self._ensure_db(): if self._ensure_db():
return self.db.get_thread_tracking(str(target_channel_id), str(thread_id)).get("last_msg_id") return self.db.get_thread_tracking(str(target_channel_id), str(thread_id)).get("last_msg_id")

View file

@ -543,8 +543,11 @@ async def migrate_messages(
except Exception as e: except Exception as e:
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
# Check for existing mapping to avoid duplicates when resuming
if context.state.get_target_message_id(target_channel_id, str(msg.id)):
continue
try: try:
# Check if this message is a reply
reply_to_fluxer_id = None reply_to_fluxer_id = None
if msg.reference and msg.reference.message_id: if msg.reference and msg.reference.message_id:
reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id)) reply_to_fluxer_id = context.state.get_fluxer_message_id(target_channel_id, str(msg.reference.message_id))
@ -676,10 +679,27 @@ async def analyze_global_migration(context: MigrationContext, after_message_id:
# In global mode, thread messages are returned natively in timestamp order by global fetch if they're in the DB # In global mode, thread messages are returned natively in timestamp order by global fetch if they're in the DB
# However we just count them if the fetcher yields them. # However we just count them if the fetcher yields them.
# Fetch global progress map to skip migrated messages efficiently
progress_map = context.state.get_all_last_message_ids()
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id): async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running: if not context.is_running:
break break
# Determine target channel to check for existing mapping
if not msg.channel:
continue
target_channel_id = context.state.get_target_channel_id(str(msg.channel.id))
if not target_channel_id:
continue
# Efficient skip: if message ID is <= last migrated ID for this channel/thread
# This is the primary resume mechanism: wait until we pass the last migrated ID for this channel
last_id = progress_map.get(str(msg.channel.id))
if last_id and msg.id <= int(last_id):
continue
if msg.type not in [ if msg.type not in [
context.discord_reader.MESSAGE_TYPE_DEFAULT, context.discord_reader.MESSAGE_TYPE_DEFAULT,
context.discord_reader.MESSAGE_TYPE_REPLY, context.discord_reader.MESSAGE_TYPE_REPLY,
@ -738,6 +758,9 @@ async def migrate_global_messages(
db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {} db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {}
target_server_id = getattr(context.fluxer_writer, "server_id", None) target_server_id = getattr(context.fluxer_writer, "server_id", None)
# Fetch global progress map to skip migrated messages efficiently
progress_map = context.state.get_all_last_message_ids()
try: try:
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id): async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running: if not context.is_running:
@ -757,9 +780,17 @@ async def migrate_global_messages(
continue continue
# Determine target channel # Determine target channel
if not msg.channel:
continue
target_channel_id = context.state.get_target_channel_id(str(msg.channel.id)) target_channel_id = context.state.get_target_channel_id(str(msg.channel.id))
if not target_channel_id: if not target_channel_id:
logger.debug(f"Skipping msg {msg.id}: channel {msg.channel.id} not mapped.") continue
# Efficient skip: if message ID is <= last migrated ID for this channel/thread
# This ensures we only resume a channel once we reach its last known progress point
last_id = progress_map.get(str(target_channel_id))
if last_id and msg.id <= int(last_id):
continue continue
# If it's a thread message, we need to handle it based on if it's the thread starter or a reply # If it's a thread message, we need to handle it based on if it's the thread starter or a reply
@ -874,6 +905,7 @@ async def migrate_global_messages(
if fluxer_msg_id: if fluxer_msg_id:
context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id) context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id)
context.state.update_last_message_id(target_channel_id, msg.id) context.state.update_last_message_id(target_channel_id, msg.id)
context.state.set_waterfall_last_id(msg.id)
stats["attachments"] += len(files) if files else 0 stats["attachments"] += len(files) if files else 0
stats["messages"] += 1 stats["messages"] += 1

View file

@ -547,6 +547,10 @@ async def migrate_messages(
logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}") logger.error(f"Failed to download sticker {getattr(s, 'name', 'unknown')}: {e}")
try: try:
# Check for existing mapping to avoid duplicates when resuming
if context.state.get_target_message_id(target_channel_id, str(msg.id)):
continue
# Check if this message is a reply # Check if this message is a reply
reply_to_stoat_id = None reply_to_stoat_id = None
if msg.reference and msg.reference.message_id: if msg.reference and msg.reference.message_id:
@ -678,10 +682,27 @@ async def analyze_global_migration(context: MigrationContext, after_message_id:
""" """
stats = {"messages": 0, "threads": 0, "attachments": 0} stats = {"messages": 0, "threads": 0, "attachments": 0}
# Fetch global progress map to skip migrated messages efficiently
progress_map = context.state.get_all_last_message_ids()
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id): async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
if not context.is_running: if not context.is_running:
break break
# Determine target channel to check for existing mapping
if not msg.channel:
continue
target_channel_id = context.state.get_target_channel_id(str(msg.channel.id))
if not target_channel_id:
continue
# Efficient skip: if message ID is <= last migrated ID for this channel/thread
# This is the primary resume mechanism: wait until we pass the last migrated ID for this channel
last_id = progress_map.get(str(msg.channel.id))
if last_id and msg.id <= int(last_id):
continue
if msg.type not in [ if msg.type not in [
context.discord_reader.MESSAGE_TYPE_DEFAULT, context.discord_reader.MESSAGE_TYPE_DEFAULT,
context.discord_reader.MESSAGE_TYPE_REPLY, context.discord_reader.MESSAGE_TYPE_REPLY,
@ -732,7 +753,8 @@ async def migrate_global_messages(
emoji_map = context.state.emoji_map emoji_map = context.state.emoji_map
db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {} db_media = context.discord_reader.db.get_all_media() if context.discord_reader.db else {}
target_server_id = getattr(context.stoat_writer, "community_id", None) # Fetch global progress map to skip migrated messages efficiently
progress_map = context.state.get_all_last_message_ids()
try: try:
async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id): async for msg in context.discord_reader.fetch_global_message_history(after_id=after_message_id):
@ -752,9 +774,18 @@ async def migrate_global_messages(
]: ]:
continue continue
# Determine target channel
if not msg.channel:
continue
target_channel_id = context.state.get_target_channel_id(str(msg.channel.id)) target_channel_id = context.state.get_target_channel_id(str(msg.channel.id))
if not target_channel_id: if not target_channel_id:
logger.debug(f"Skipping msg {msg.id}: channel {msg.channel.id} not mapped.") continue
# Efficient skip: if message ID is <= last migrated ID for this channel/thread
# This ensures we only resume a channel once we reach its last known progress point
last_id = progress_map.get(str(target_channel_id))
if last_id and msg.id <= int(last_id):
continue continue
parent_target_id = None parent_target_id = None
@ -858,6 +889,7 @@ async def migrate_global_messages(
if stoat_msg_id: if stoat_msg_id:
context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id) context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id)
context.state.update_last_message_id(target_channel_id, msg.id) context.state.update_last_message_id(target_channel_id, msg.id)
context.state.set_waterfall_last_id(msg.id)
stats["attachments"] += len(files) if files else 0 stats["attachments"] += len(files) if files else 0
stats["messages"] += 1 stats["messages"] += 1

View file

@ -724,7 +724,6 @@ class OperationPane(Container):
return return
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal.dismiss() modal.dismiss()
self.app.switch_screen("config_selection")
return return
force_mode = (choice == "btn_start_id") force_mode = (choice == "btn_start_id")
@ -806,7 +805,6 @@ class OperationPane(Container):
return return
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal.dismiss() modal.dismiss()
self.app.switch_screen("config_selection")
return return
force_mode = (choice == "btn_start_id") force_mode = (choice == "btn_start_id")
@ -1251,7 +1249,6 @@ class OperationPane(Container):
continue # Return to channel picker continue # Return to channel picker
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal.dismiss() modal.dismiss()
self.app.switch_screen("config_selection")
self.engine.is_running = False self.engine.is_running = False
await self.engine.close_connections() await self.engine.close_connections()
return return
@ -1415,7 +1412,6 @@ class OperationPane(Container):
else: else:
modal.write(f"[bold red]Error: {err}[/bold red]") modal.write(f"[bold red]Error: {err}[/bold red]")
modal.phase_report("Message Migration", "error", show_back=False) modal.phase_report("Message Migration", "error", show_back=False)
import traceback
logger.error(f"Migration Error: {traceback.format_exc()}") logger.error(f"Migration Error: {traceback.format_exc()}")
finally: finally:
self.engine.is_running = False self.engine.is_running = False
@ -1442,82 +1438,137 @@ class OperationPane(Container):
await self._perform_auto_matching() await self._perform_auto_matching()
# 1. Missing channels check # 1. Missing channels check
full_d = await self.engine.discord_reader.get_channels() if hasattr(self.engine.discord_reader, "get_all_channels"):
if hasattr(self.engine.discord_reader, "get_backed_up_channel_ids"): full_d = await self.engine.discord_reader.get_all_channels()
valid_ids = await self.engine.discord_reader.get_backed_up_channel_ids() # Include TEXT (0), CATEGORY (4), and NEWS (5)
d_channels = [c for c in full_d if c.id in valid_ids and c.type in [0, 5]] d_channels = [c for c in full_d if c.type in [0, 4, 5]]
else: else:
d_channels = [c for c in full_d if c.type in [0, 5]] full_d = await self.engine.discord_reader.get_channels()
d_channels = [c for c in full_d if c.type in [0, 4, 5]]
missing_channels = [] missing_channels = []
for d in d_channels: for d in d_channels:
tgt_id = self.engine.state.get_target_channel_id(str(d.id)) if d.type == 4:
tgt_id = self.engine.state.get_target_category_id(str(d.id))
else:
tgt_id = self.engine.state.get_target_channel_id(str(d.id))
if not tgt_id: if not tgt_id:
missing_channels.append(d) missing_channels.append(d)
if missing_channels: if missing_channels:
modal.write(f"\n[bold yellow]Found {len(missing_channels)} channels with backups but no target mapping.[/bold yellow]") modal.write(f"\n[bold yellow]Found {len(missing_channels)} backed-up channels/categories missing from target platform:[/bold yellow]")
modal.write("[dim]Do you want to automatically create these missing channels now?[/dim]") for mc in missing_channels:
prefix = "[bold cyan]📁[/bold cyan] " if mc.type == 4 else "[bold white]#[/bold white] "
modal.write(f" {prefix}{mc.name}")
choice = await modal.phase_wait_confirm( choice = await modal.phase_wait_confirm(
show_continue=False, show_continue=False,
show_id=True, show_id=True,
btn_start_label=f"Yes, Create {len(missing_channels)} Missing Channels", btn_start_label="Clone missing channels",
btn_id_label="No, Skip Them", btn_id_label="Skip missing channels",
btn_start_variant="primary", btn_start_variant="primary",
btn_start_tooltip="Create channels and map them", btn_start_tooltip=f"Automatically create {len(missing_channels)} entities on target",
btn_id_tooltip="Skip them (Warning: may cause broken mentions)" btn_id_tooltip="Start migration without these channels"
) )
if choice == "btn_back": if choice == "btn_back":
modal.dismiss() modal.dismiss()
await self.engine.close_connections()
return return
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal.dismiss() modal.dismiss()
self.app.switch_screen("config_selection") await self.engine.close_connections()
return return
if choice == "btn_start_first": if choice == "btn_start_first":
modal.set_status("Creating missing channels...") modal.set_status("Cloning missing categories and channels...")
# Sort so categories (type 4) come first
missing_channels.sort(key=lambda x: 0 if x.type == 4 else 1)
for mc in missing_channels: for mc in missing_channels:
try: try:
modal.write(f"Creating channel '#{mc.name}'...") parent_target_id = None
new_id = await self.engine.writer.create_channel(name=mc.name) if mc.type == 4:
# Link them modal.write(f"Creating category '[bold cyan]{mc.name}[/bold cyan]'...")
self.engine.state.set_target_channel_id(str(mc.id), new_id, self.engine.platform) new_id = await self.engine.writer.create_channel(name=mc.name, type=4)
modal.write(f"[green]Created {mc.name} ({new_id})[/green]") self.engine.state.set_target_category_mapping(str(mc.id), new_id)
modal.write(f"[green]Created Category {mc.name} ({new_id})[/green]")
else:
if hasattr(mc, 'category_id') and mc.category_id:
parent_target_id = self.engine.state.get_target_category_id(str(mc.category_id))
modal.write(f"Creating channel '#{mc.name}'...")
new_id = await self.engine.writer.create_channel(name=mc.name, parent_id=parent_target_id)
self.engine.state.set_target_channel_id(str(mc.id), new_id, self.engine.target_platform)
modal.write(f"[green]Created Channel {mc.name} ({new_id})[/green]")
except Exception as e: except Exception as e:
logger.error(f"Failed to create {mc.name}: {e}\n{traceback.format_exc()}")
modal.write(f"[red]Failed to create {mc.name}: {e}[/red]") modal.write(f"[red]Failed to create {mc.name}: {e}[/red]")
elif choice == "btn_id":
# Skip missing channels: remove them from the active list
missing_ids = {str(c.id) for c in missing_channels}
d_channels = [c for c in d_channels if str(c.id) not in missing_ids]
# 2. Resumption check # 2. Resumption check
all_mapped_tgt_ids = [] all_mapped_tgt_ids = []
# Check regular channels # Check regular text channels (exclude categories for resume check)
for did in [str(c.id) for c in d_channels]: for c in d_channels:
if c.type == 4: continue
did = str(c.id)
tid = self.engine.state.get_target_channel_id(did) tid = self.engine.state.get_target_channel_id(did)
if tid: all_mapped_tgt_ids.append(tid) if tid: all_mapped_tgt_ids.append(tid)
# Also check threads # Also check threads (filtering to only include those belonging to active channels)
active_channel_ids = {str(c.id) for c in d_channels}
if hasattr(self.engine.discord_reader, "get_active_threads"): if hasattr(self.engine.discord_reader, "get_active_threads"):
threads = await self.engine.discord_reader.get_active_threads() threads = await self.engine.discord_reader.get_active_threads()
for t in threads: for t in threads:
pid = str(getattr(t, 'parent_id', getattr(t, 'channel_id', None)))
if pid not in active_channel_ids: continue
tid = self.engine.state.get_target_channel_id(str(t.id)) tid = self.engine.state.get_target_channel_id(str(t.id))
if tid: all_mapped_tgt_ids.append(tid) if tid: all_mapped_tgt_ids.append(tid)
min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids) # 2.5 Filter by actual content (Only for BackupReader)
# If a channel has NO messages in the backup, it will always be at 0 progress.
# We exclude those from the global MIN calculation to avoid pulling it to 0.
if hasattr(self.engine.discord_reader, "get_backed_up_channel_ids"):
backed_up_src_ids = await self.engine.discord_reader.get_backed_up_channel_ids()
backed_up_src_ids_str = {str(sid) for sid in backed_up_src_ids}
filtered_tgt_ids = []
# Find which target IDs belong to source channels that HAVE messages
for c in d_channels: # (d_channels is already filtered for skipped)
if str(c.id) in backed_up_src_ids_str:
tid = self.engine.state.get_target_channel_id(str(c.id))
if tid: filtered_tgt_ids.append(tid)
# Also check threads
if hasattr(self.engine.discord_reader, "threads"):
for t in self.engine.discord_reader.threads:
if str(t.id) in backed_up_src_ids_str:
tid = self.engine.state.get_target_channel_id(str(t.id))
if tid: filtered_tgt_ids.append(tid)
if filtered_tgt_ids:
all_mapped_tgt_ids = filtered_tgt_ids
# 2.6 Resume Point: Prioritize Global waterfall tracker, fallback to channel minimums
min_last_id = self.engine.state.get_waterfall_last_id()
if min_last_id is None:
min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids)
modal.write(f"\n[bold cyan]Waterfall Migration Resume Point:[/bold cyan]") modal.write(f"\n[bold cyan]Waterfall Migration Resume Point:[/bold cyan]")
if min_last_id: if min_last_id is not None:
modal.write(f"Minimum unmigrated message ID found: [green]{min_last_id}[/green]") modal.write(f"Minimum unmigrated message ID found: [green]{min_last_id}[/green]")
else: else:
modal.write("No previous migration state found. Starting from the beginning.") modal.write("No previous migration state found. Starting from the beginning.")
choice = await modal.phase_wait_confirm( choice = await modal.phase_wait_confirm(
show_continue=bool(min_last_id), show_continue=min_last_id is not None,
show_id=False, show_id=False,
btn_start_label="Start From Beginning", btn_start_label="Start From Beginning",
btn_start_tooltip="Safe, skips duplicates automatically", btn_start_tooltip="Safe, skips duplicates automatically",
btn_start_variant="default" if min_last_id else "primary", btn_start_variant="default" if min_last_id is not None else "primary",
btn_continue_label=f"Continue from ID {min_last_id}" if min_last_id else "Continue Migration", btn_continue_label=f"Continue from ID {min_last_id if min_last_id is not None else 0}" if min_last_id is not None else "Continue Migration",
btn_continue_tooltip="Fastest" btn_continue_tooltip="Fastest"
) )
@ -1528,11 +1579,10 @@ class OperationPane(Container):
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal.dismiss() modal.dismiss()
await self.engine.close_connections() await self.engine.close_connections()
self.app.switch_screen("config_selection")
return return
after_id = None after_id = None
if choice == "btn_continue" and min_last_id: if choice == "btn_continue" and min_last_id is not None:
after_id = int(min_last_id) after_id = int(min_last_id)
# Phase 3: Progress # Phase 3: Progress
@ -1594,7 +1644,6 @@ class OperationPane(Container):
modal.write(f"[bold red]Error: {err}[/bold red]") modal.write(f"[bold red]Error: {err}[/bold red]")
modal.phase_report("Waterfall Migration", "error", show_back=False) modal.phase_report("Waterfall Migration", "error", show_back=False)
import traceback
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
finally: finally:
self.engine.is_running = False self.engine.is_running = False
@ -1682,7 +1731,6 @@ class OperationPane(Container):
return return
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal.dismiss() modal.dismiss()
self.app.switch_screen("config_selection")
return return
modal.cancel_callback = lambda: setattr(self.engine, "is_running", False) modal.cancel_callback = lambda: setattr(self.engine, "is_running", False)
@ -1841,6 +1889,39 @@ class OperationPane(Container):
logger.warning(f"Auto-matching: failed to fetch target data: {e}") logger.warning(f"Auto-matching: failed to fetch target data: {e}")
return # Cannot match without target data return # Cannot match without target data
# 1.5 Cleanup deleted entities from mapping database
# This prevents "Ghost" mappings to channels/roles that were deleted on target
valid_chan_ids = {str(c.get("id")) for c in target_chans_raw}
valid_cat_ids = {str(c.get("id")) for c in target_chans_raw if c.get("type") == 4}
valid_role_ids = set(target_roles_map.values())
valid_emoji_ids = set(target_emojis_map.values())
# Channels
for src_id, tgt_id in self.engine.state.channel_map.items():
if str(tgt_id) not in valid_chan_ids:
logger.info(f"Auto-matching: clearing deleted channel mapping {src_id} -> {tgt_id}")
self.engine.state.remove_target_channel_mapping(src_id)
# Categories
for src_id, tgt_id in self.engine.state.category_map.items():
if str(tgt_id) not in valid_cat_ids:
logger.info(f"Auto-matching: clearing deleted category mapping {src_id} -> {tgt_id}")
self.engine.state.remove_category_mapping(src_id)
# Roles
for src_id, tgt_id in self.engine.state.role_map.items():
if str(tgt_id) not in valid_role_ids:
logger.info(f"Auto-matching: clearing deleted role mapping {src_id} -> {tgt_id}")
self.engine.state.remove_role_mapping(src_id)
# Emojis
for src_id, tgt_id in self.engine.state.emoji_map.items():
if str(tgt_id) not in valid_emoji_ids:
# Emojis might be URLs in some platforms, but we check if they are IDs first
if isinstance(tgt_id, str) and tgt_id.isdigit():
logger.info(f"Auto-matching: clearing deleted emoji mapping {src_id} -> {tgt_id}")
self.engine.state.remove_emoji_mapping(src_id)
# 2. Match entities # 2. Match entities
try: try:
# Roles # Roles
@ -1884,6 +1965,7 @@ class OperationPane(Container):
logger.info(f"Auto-matched Sticker: {s.name} -> {target_stickers_map[name_l]}") logger.info(f"Auto-matched Sticker: {s.name} -> {target_stickers_map[name_l]}")
self.engine.state.set_target_sticker_mapping(s.id, target_stickers_map[name_l]) self.engine.state.set_target_sticker_mapping(s.id, target_stickers_map[name_l])
except Exception as e: except Exception as e:
logger.error(f"Auto-matching error: {e}\n{traceback.format_exc()}")
logger.warning(f"Auto-matching error: {e}") logger.warning(f"Auto-matching error: {e}")
return { return {
@ -2148,7 +2230,6 @@ class OperationPane(Container):
after_id = verified_id after_id = verified_id
elif choice == "btn_main_menu": elif choice == "btn_main_menu":
modal_prog.dismiss() modal_prog.dismiss()
self.app.switch_screen("config_selection")
return return
# If we are here, proceeding either via Start First or Start from ID (after_id) # If we are here, proceeding either via Start First or Start from ID (after_id)
@ -2259,7 +2340,7 @@ class OperationPane(Container):
self.engine.is_running = False self.engine.is_running = False
await self.engine.close_connections() await self.engine.close_connections()
if choice == "btn_main_menu": if choice == "btn_main_menu":
self.app.switch_screen("config_selection") pass
return return
modal_prog.cancel_callback = lambda: setattr(self.engine, "is_running", False) modal_prog.cancel_callback = lambda: setattr(self.engine, "is_running", False)