From 5b315ab2bfc08577c44f33c9f9171a2f2d0fd1e2 Mon Sep 17 00:00:00 2001 From: rambros Date: Mon, 30 Mar 2026 03:03:00 +0530 Subject: [PATCH] improve waterfall resume operation --- src/core/base.py | 27 +++++++++++++++++++++++++-- src/core/database.py | 9 +++++++++ src/core/state.py | 14 +++++--------- src/fluxer/migrate_message.py | 3 ++- src/stoat/migrate_message.py | 3 ++- src/ui/shuttle_ops.py | 15 +++++++++------ 6 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/core/base.py b/src/core/base.py index 81943f8..cbf6145 100644 --- a/src/core/base.py +++ b/src/core/base.py @@ -97,11 +97,17 @@ class MigrationContext: } # CONSISTENCY: Once target metadata is known, initialize the flat SQLite DB. - if results["target_community"] and results["target_community_name"]: + if results["target_community"]: tid = self.config.fluxer_server_id if self.target_platform == "fluxer" else self.config.stoat_server_id + + # Prefer the original discord community name for the DB file if available (e.g. from live load or backup) + db_name = results.get("discord_server_name") + if not db_name or db_name == "Not Found" or db_name == "Unknown": + db_name = results.get("target_community_name") or "Unknown" + self.ensure_state_initialized( str(tid or ""), - results["target_community_name"] + db_name ) return results @@ -120,6 +126,23 @@ class MigrationContext: return import re + import json + + # Override the target name explicitly with the original Discord source name if available. + # This fixes naming collisions and UI confusion like "Fluxer-123456.db" instead of "MyServer-123456.db" + try: + if hasattr(self.discord_reader, "guild") and getattr(self.discord_reader, "guild", None): + community_name = getattr(self.discord_reader, "guild").name + elif getattr(self, "source_mode", "live") == "backup" and hasattr(self.discord_reader, "backup_dir"): + b_dir = getattr(self.discord_reader, "backup_dir") + if b_dir and b_dir.exists(): + meta_file = b_dir / "metadata.json" + if meta_file.exists(): + data = json.loads(meta_file.read_text()) + community_name = data.get("name", community_name) + except Exception: + pass + clean_name = re.sub(r'[^\w\s-]', '', community_name).strip() clean_name = re.sub(r'[-\s]+', '_', clean_name) diff --git a/src/core/database.py b/src/core/database.py index 94ffbc0..a956224 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -560,6 +560,15 @@ class MigrationDatabase: conn.execute("DELETE FROM thread_tracking WHERE channel_id = ?", (str(channel_id),)) conn.commit() logger.info(f"Cleared all tracking and mapping data for channel: {channel_id}") + def clear_all_migration_data(self): + """Purge all mappings and tracking data for ALL channels and threads.""" + conn = self._get_conn() + conn.execute("DELETE FROM message_mappings") + conn.execute("DELETE FROM thread_mappings") + conn.execute("DELETE FROM channel_tracking") + conn.execute("DELETE FROM thread_tracking") + conn.commit() + logger.info("Cleared ALL tracking and message mapping data globally.") def close(self): if hasattr(self._local, "conn"): diff --git a/src/core/state.py b/src/core/state.py index 7388c40..997d310 100644 --- a/src/core/state.py +++ b/src/core/state.py @@ -238,16 +238,12 @@ class MigrationState: return self.db.get_global_min_last_message_id(all_mapped_ids) return None - def set_waterfall_last_id(self, last_id: str | int): - if self.db: - self.db.set_metadata("waterfall_last_id", str(last_id)) + + def clear_all_migration_data(self): + """Clears all message mapping and tracking state globally.""" + if self._ensure_db(): + self.db.clear_all_migration_data() - def get_waterfall_last_id(self) -> int | None: - if self.db: - val = self.db.get_metadata("waterfall_last_id") - return int(val) if val else None - return None - def get_all_last_message_ids(self) -> Dict[str, str]: """Returns a combined map of channel_id/thread_id -> last_msg_id.""" if self._ensure_db(): diff --git a/src/fluxer/migrate_message.py b/src/fluxer/migrate_message.py index cf141c0..c2b0aea 100644 --- a/src/fluxer/migrate_message.py +++ b/src/fluxer/migrate_message.py @@ -905,7 +905,8 @@ async def migrate_global_messages( if fluxer_msg_id: context.state.set_target_message_mapping(target_channel_id, msg.id, fluxer_msg_id) context.state.update_last_message_id(target_channel_id, msg.id) - context.state.set_waterfall_last_id(msg.id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) stats["attachments"] += len(files) if files else 0 stats["messages"] += 1 diff --git a/src/stoat/migrate_message.py b/src/stoat/migrate_message.py index 6861876..4a3d9a3 100644 --- a/src/stoat/migrate_message.py +++ b/src/stoat/migrate_message.py @@ -896,7 +896,8 @@ async def migrate_global_messages( if stoat_msg_id: context.state.set_target_message_mapping(target_channel_id, msg.id, stoat_msg_id) context.state.update_last_message_id(target_channel_id, msg.id) - context.state.set_waterfall_last_id(msg.id) + context.state.update_last_message_timestamp(target_channel_id, str(msg.created_at)) + context.state.increment_stats(target_channel_id, messages=1, files=len(files) if files else 0) stats["attachments"] += len(files) if files else 0 stats["messages"] += 1 diff --git a/src/ui/shuttle_ops.py b/src/ui/shuttle_ops.py index fc17583..2ca44a7 100644 --- a/src/ui/shuttle_ops.py +++ b/src/ui/shuttle_ops.py @@ -1551,10 +1551,8 @@ class OperationPane(Container): if filtered_tgt_ids: all_mapped_tgt_ids = filtered_tgt_ids - # 2.6 Resume Point: Prioritize Global waterfall tracker, fallback to channel minimums - min_last_id = self.engine.state.get_waterfall_last_id() - if min_last_id is None: - min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids) + # 2.6 Resume Point: Calculate from global channel minimums + min_last_id = self.engine.state.get_global_min_last_message_id(all_mapped_tgt_ids) modal.write(f"\n[bold cyan]Waterfall Migration Resume Point:[/bold cyan]") if min_last_id is not None: @@ -1566,7 +1564,7 @@ class OperationPane(Container): show_continue=min_last_id is not None, show_id=False, btn_start_label="Start From Beginning", - btn_start_tooltip="Safe, skips duplicates automatically", + btn_start_tooltip="Wipes migration progress and restarts from the beginning; may create duplicates", btn_start_variant="default" if min_last_id is not None else "primary", btn_continue_label=f"Continue from ID {min_last_id if min_last_id is not None else 0}" if min_last_id is not None else "Continue Migration", btn_continue_tooltip="Fastest" @@ -1582,7 +1580,11 @@ class OperationPane(Container): return after_id = None - if choice == "btn_continue" and min_last_id is not None: + if choice == "btn_start_first": + logger.info("Proceeding with 'Start from Beginning' (global clean sink).") + self.engine.state.clear_all_migration_data() + after_id = None + elif choice == "btn_continue" and min_last_id is not None: after_id = int(min_last_id) # Phase 3: Progress @@ -1599,6 +1601,7 @@ class OperationPane(Container): tid = self.config.fluxer_server_id self.engine.ensure_state_initialized(str(tid or ""), platform_name) + modal.show_stats() modal.write("Scanning global footprint for totals ...") stats_analysis = await migrate_mod.analyze_global_migration(self.engine, after_message_id=after_id) total_messages = stats_analysis["messages"]