disco-reaper/src/ui/modals.py
2026-03-08 03:42:39 +05:30

888 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Shared modals used by backup and shuttle operations.
"""
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical, VerticalScroll, Container, Center
from textual.widgets import Button, Label, Input, ProgressBar, RichLog, Rule, RadioButton, LoadingIndicator, Header, Footer, RadioSet, OptionList
from textual.widgets.option_list import Option
from textual.screen import ModalScreen, Screen
import time
import logging
import asyncio
from typing import Any, Optional, Union, List, Dict, Callable
class UILogHandler(logging.Handler):
"""Custom logging handler to send logs to the Textual UI RichLog."""
def __init__(self, callback):
super().__init__()
self.callback = callback
self.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s', '%H:%M:%S'))
def emit(self, record):
try:
msg = self.format(record)
self.callback(msg)
except Exception:
pass
# ---------------------------------------------------------------------------
# ProgressScreen unified full-screen progress / log / stats display
# ---------------------------------------------------------------------------
class ProgressScreen(Screen[None]):
"""Screen to display progress for any operation, with stats and logs."""
DEFAULT_CSS = """
ProgressScreen { align: center middle; }
#prog_outer { width: 100%; height: 100%; align: center top; }
#prog_dialog {
width: 80%;
height: 100%;
layout: vertical;
border: solid green;
padding: 1 2;
margin: 2 0;
background: $surface;
}
#prog_header { height: 3; margin-bottom: 1; dock: top; align: left middle; }
#prog_status { text-style: bold; width: 1fr; content-align: left middle; }
#prog_loader { width: 10; height: 1; margin: 0 1; }
#prog_timer { text-style: bold; width: 15; content-align: right middle; color: yellow; }
#prog_stats {
height: auto;
layout: horizontal;
border: solid cyan;
padding: 1;
margin-bottom: 1;
display: none;
}
.stat_label { width: 1fr; content-align: center middle; text-style: bold; }
#prog_log { height: 1fr; margin-bottom: 1; border: solid $primary; }
#live_log { height: 10; margin-bottom: 1; border: solid yellow; }
#prog_item_status { margin-bottom: 1; text-style: bold; color: cyan; width: 100%; text-align: center; }
#info_container { height: auto; layout: vertical; border: solid cyan; padding: 1; margin-bottom: 1; display: none; }
.info_label { text-style: bold; content-align: center middle; width: 100%; color: cyan; }
#prog_actions { height: auto; margin-top: 0; dock: bottom; margin-bottom: 0; layout: vertical; }
.action_row { height: auto; layout: horizontal; }
.action_row Button { width: 1fr; margin: 0 1; }
#prog_actions_row1, #prog_actions_row2 { display: none; }
#footer_rule { margin: 0; }
"""
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="prog_outer"):
with Container(id="prog_dialog"):
with Horizontal(id="prog_header"):
yield Label("Operation Status...", id="prog_status")
yield LoadingIndicator(id="prog_loader")
yield Label("00:00", id="prog_timer")
with Horizontal(id="prog_stats"):
yield Label("Messages: 0", id="stat_messages", classes="stat_label")
yield Label("Threads: 0", id="stat_threads", classes="stat_label")
yield Label("Files: 0", id="stat_files", classes="stat_label")
with Vertical(id="info_container"):
yield Label("", id="info_migration_status", classes="info_label")
yield Label("", id="info_new_items", classes="info_label")
yield Label("", id="prog_item_status")
yield RichLog(id="prog_log", highlight=True, markup=True)
yield RichLog(id="live_log", highlight=True, markup=True)
yield Rule(id="footer_rule")
with Vertical(id="prog_actions"):
with Horizontal(classes="action_row", id="prog_actions_row1"):
yield Button("Start from First", id="btn_start_first", disabled=True, variant="primary", tooltip="Start the operation from the beginning")
yield Button("Continue Migration", id="btn_continue", disabled=True, variant="success", tooltip="Resume the operation from the last saved state")
yield Button("Start from ID", id="btn_start_id", disabled=True, variant="warning", tooltip="Start or resume from a specific Discord Message ID")
with Horizontal(classes="action_row", id="prog_actions_row2"):
yield Button("Back", id="btn_back", disabled=False)
yield Button("Main Menu", id="btn_main_menu", disabled=False)
with Horizontal(classes="action_row", id="prog_actions_cancel"):
yield Button("Cancel", id="btn_cancel", variant="error", tooltip="Stop current operation")
yield Footer()
def __init__(self, log_level: str = "INFO", *args, **kwargs):
super().__init__(*args, **kwargs)
self.log_level = log_level.upper()
self.confirm_future = None
self.cancel_callback = None
self.start_time = time.time()
self.timer_event = self.set_interval(1.0, self.update_timer)
# Intercept Python logs and pipe to the #live_log
self.log_handler = UILogHandler(self.write_live)
# Set level based on config
level = getattr(logging, self.log_level, logging.INFO)
# Attach to root logger
root_logger = logging.getLogger()
root_logger.addHandler(self.log_handler)
root_logger.setLevel(level)
# Also let's capture discord.py logs specifically if they aren't propagating
discord_logger = logging.getLogger("discord")
discord_logger.addHandler(self.log_handler)
discord_logger.setLevel(level)
def on_unmount(self):
# Detach log handler when UI is cleanly removed
if hasattr(self, "log_handler"):
logging.getLogger().removeHandler(self.log_handler)
logging.getLogger("discord").removeHandler(self.log_handler)
def update_timer(self):
elapsed = int(time.time() - self.start_time)
mins, secs = divmod(elapsed, 60)
try:
self.query_one("#prog_timer", Label).update(f"Elapsed: {mins:02d}:{secs:02d}")
except Exception:
pass
def on_button_pressed(self, event: Button.Pressed):
btn_id = event.button.id
if self.confirm_future and not self.confirm_future.done():
self.confirm_future.set_result(btn_id)
return
# If Cancel is pressed during operation, invoke callback and stay on screen
if btn_id == "btn_cancel":
if self.cancel_callback:
self.cancel_callback()
# Show cancelling message and disable button
self.set_status("[bold red]Cancelling... waiting for tasks to finish...[/bold red]")
try:
event.button.disabled = True
event.button.label = "Stopping..."
except Exception:
pass
return
# If operation is done (report phase), just dismiss with the action
if btn_id in ["btn_back", "btn_main_menu"]:
if self.timer_event:
self.timer_event.stop()
self.dismiss(btn_id)
def write(self, message: str):
try:
self.query_one("#prog_log", RichLog).write(message)
except Exception:
pass
def write_live(self, message: str):
try:
self.query_one("#live_log", RichLog).write(message)
except Exception:
pass
write_to_log = write
def set_status(self, status: str):
try:
self.query_one("#prog_status", Label).update(status)
except Exception:
pass
def set_progress(self, current: int, total: int):
try:
# Keep loader visible during progress next to timer
self.query_one("#prog_loader", LoadingIndicator).display = True
# Ensure the container is visible
self.query_one("#info_container", Vertical).display = True
except Exception:
pass
def set_item_status(self, status: str):
try:
self.query_one("#prog_item_status", Label).update(status)
self.query_one("#info_container", Vertical).display = True
except Exception:
pass
def show_stats(self):
try:
self.query_one("#prog_stats", Horizontal).display = True
except Exception:
pass
def update_stats(self, **kwargs):
# kwargs can be messages, threads, files
for key, val in kwargs.items():
try:
self.query_one(f"#stat_{key}", Label).update(f"{key.capitalize()}: {val}")
except Exception:
pass
async def phase_wait_confirm(
self,
show_continue: bool = False,
show_id: bool = True,
btn_start_label: str = "Start from First",
btn_continue_label: str = "Continue Migration",
btn_id_label: str = "Start from ID",
btn_start_variant: str = "primary",
btn_start_tooltip: str | None = None,
btn_continue_tooltip: str | None = None,
btn_id_tooltip: str | None = None
):
"""Phase 2: Wait for user confirmation after analysis."""
try: self.query_one("#prog_loader", LoadingIndicator).display = False
except Exception: pass
try: self.query_one("#prog_timer", Label).display = False
except Exception: pass
# Update button labels, variants and tooltips
try:
btn_start = self.query_one("#btn_start_first", Button)
btn_start.label = btn_start_label
btn_start.variant = btn_start_variant
if btn_start_tooltip:
btn_start.tooltip = btn_start_tooltip
except Exception: pass
try:
btn_cont = self.query_one("#btn_continue", Button)
btn_cont.label = btn_continue_label
if btn_continue_tooltip:
btn_cont.tooltip = btn_continue_tooltip
except Exception: pass
try:
btn_id = self.query_one("#btn_start_id", Button)
btn_id.label = btn_id_label
if btn_id_tooltip:
btn_id.tooltip = btn_id_tooltip
except Exception: pass
# Show confirmation buttons
try: self.query_one("#prog_actions_row1", Horizontal).display = True
except Exception: pass
try: self.query_one("#prog_actions_row2", Horizontal).display = True
except Exception: pass
try: self.query_one("#prog_actions_cancel", Horizontal).display = False
except Exception: pass
try: self.query_one("#btn_start_first", Button).disabled = False
except Exception: pass
try:
btn_id = self.query_one("#btn_start_id", Button)
btn_id.disabled = not show_id
btn_id.display = show_id
except Exception: pass
try:
if show_continue:
self.query_one("#btn_continue", Button).disabled = False
self.query_one("#btn_continue", Button).display = True
else:
self.query_one("#btn_continue", Button).display = False
except Exception:
pass
loop = asyncio.get_running_loop()
self.confirm_future = loop.create_future()
# Wait until the user clicks one of the buttons
choice = await self.confirm_future
return choice
def phase_progress(self):
"""Phase 3: The actual operation begins. Only Cancel visible."""
try: self.query_one("#prog_actions_row1", Horizontal).display = False
except Exception: pass
try: self.query_one("#prog_actions_row2", Horizontal).display = False
except Exception: pass
# Show Cancel button
try: self.query_one("#prog_actions_cancel", Horizontal).display = True
except Exception: pass
try: self.query_one("#prog_loader", LoadingIndicator).display = True
except Exception: pass
# Show timer and reset it for the operation
try: self.query_one("#prog_timer", Label).display = True
except Exception: pass
self.start_time = time.time()
try: self.query_one("#info_migration_status", Label).display = False
except Exception: pass
try: self.query_one("#info_new_items", Label).display = False
except Exception: pass
def phase_report(self, operation_name: str, status: str = "complete", show_back: bool = True):
"""Phase 4: Operation is done. Show Back + Main Menu.
status can be: 'complete', 'stopped', 'error'
"""
try:
if self.timer_event:
self.timer_event.stop()
except Exception:
pass
# Format status title with color
status_map = {
"complete": ("[bold green]", "Complete"),
"stopped": ("[bold yellow]", "Stopped"),
"error": ("[bold red]", "Error"),
}
color, label = status_map.get(status, ("[bold]", status.capitalize()))
self.set_status(f"{color}{operation_name} {label}![/{color.split(']')[0].lstrip('[')}]")
# Hide loader
try: self.query_one("#prog_loader", LoadingIndicator).display = False
except Exception: pass
# Hide progress bar (no need to show 100% bar)
# Hide Cancel, show Back + Main Menu
try: self.query_one("#prog_actions_cancel", Horizontal).display = False
except Exception: pass
try: self.query_one("#prog_actions_row1", Horizontal).display = False
except Exception: pass
try: self.query_one("#prog_actions_row2", Horizontal).display = True
except Exception: pass
try:
back_btn = self.query_one("#btn_back", Button)
back_btn.disabled = not show_back
back_btn.display = show_back
except Exception:
pass
def show_info(self, migration_status: str, items_status: str):
try:
info = self.query_one("#info_container", Vertical)
info.display = True
self.query_one("#info_migration_status", Label).update(migration_status)
self.query_one("#info_new_items", Label).update(items_status)
except Exception:
pass
def allow_close(self):
"""Legacy fallback: show Back + Main Menu buttons for early exit."""
self.phase_report("Operation", "error")
# ---------------------------------------------------------------------------
# SubMenuModal generic labelled-button list
# ---------------------------------------------------------------------------
class SubMenuModal(ModalScreen[str]):
"""A generic sub-menu modal that presents a list of labelled buttons."""
DEFAULT_CSS = """
SubMenuModal { align: center middle; }
#footer_rule { margin: 0; }
"""
def __init__(self, title: str, options: list[tuple[str, str, str]]):
"""options: list of (button_id, label, variant)"""
super().__init__()
self._title = title
self._options = options
def compose(self) -> ComposeResult:
with Vertical(id="submenu_dialog"):
yield Label(self._title, id="submenu_title")
for btn_id, label, variant in self._options:
yield Button(label, id=btn_id, variant=variant)
yield Rule(id="footer_rule")
yield Button("Cancel", id="btn_cancel_sub")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "btn_cancel_sub":
self.dismiss(None)
else:
self.dismiss(event.button.id)
# ---------------------------------------------------------------------------
# OptionSelectModal radio-button selection list
# ---------------------------------------------------------------------------
class OptionSelectModal(ModalScreen[list[str]]):
"""A modal that presents a list of options using RadioButtons (for multi-select) and a Proceed button."""
DEFAULT_CSS = """
OptionSelectModal { align: center middle; }
#opt_dialog {
width: 60;
height: 80%;
border: solid green;
background: $surface;
padding: 1 2;
}
#opt_title { text-style: bold; margin-bottom: 1; text-align: center; width: 100%; }
#opt_scroll { margin-bottom: 1; height: 1fr; overflow-y: auto; }
#opt_buttons { height: auto; }
#opt_buttons Button { width: 1fr; margin: 0 1; }
#opt_batch_buttons { height: auto; margin-bottom: 1; }
#opt_batch_buttons Button { width: 1fr; margin: 0 1; }
RadioButton { background: transparent; }
RadioButton:focus { background: $accent 20%; }
#footer_rule { margin: 0; }
"""
def __init__(self, title: str, options: list[tuple[str, str]]):
"""options: list of (option_id, label)"""
super().__init__()
self._title = title
self._options = options
def compose(self) -> ComposeResult:
with Vertical(id="opt_dialog"):
yield Label(self._title, id="opt_title")
with Horizontal(id="opt_batch_buttons"):
yield Button("Select All", id="btn_opt_all", flat=True, tooltip="Select all available options")
yield Button("Deselect All", id="btn_opt_none", flat=True, tooltip="Deselect all options")
with Vertical(id="opt_scroll"):
for opt_id, label in self._options:
yield RadioButton(label, id=f"opt_{opt_id}")
yield Rule(id="footer_rule")
with Horizontal(id="opt_buttons"):
yield Button("Proceed", variant="success", id="btn_opt_ok", tooltip="Proceed with the selected options")
yield Button("Back", id="btn_opt_back", tooltip="Return to the previous menu")
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "btn_opt_back":
self.dismiss(None)
elif event.button.id == "btn_opt_ok":
selected = []
for rb in self.query(RadioButton):
if rb.value:
selected.append(rb.id.split("_", 1)[1])
if selected:
self.dismiss(selected)
else:
self.app.notify("Please select at least one option.", severity="warning")
elif event.button.id == "btn_opt_all":
for rb in self.query(RadioButton):
rb.value = True
elif event.button.id == "btn_opt_none":
for rb in self.query(RadioButton):
rb.value = False
# ---------------------------------------------------------------------------
# ChannelPickerModal single-channel selection (shuttle)
# ---------------------------------------------------------------------------
class ChannelPickerScreen(Screen[tuple]):
"""Screen listing Discord channels (left) and Target platforms channels (right) for dual selection."""
DEFAULT_CSS = """
ChannelPickerScreen { align: center middle; }
#cp_outer { width: 100%; height: 100%; align: center top; }
#chanpick_dialog {
width: 80%;
height: 100%;
layout: vertical;
border: solid green;
padding: 1 2;
margin: 1 0;
background: $surface;
}
#chanpick_title { text-style: bold; margin-bottom: 1; content-align: center middle; width: 100%; }
#chanpick_split {
height: 1fr;
layout: horizontal;
margin-bottom: 1;
}
.split_pane {
width: 1fr;
height: 1fr;
border: solid $primary;
margin: 0 1;
padding: 0 1;
}
.split_pane:blur {
border: solid $primary; /* Prevent border color change on blur */
}
.pane_title { text-style: bold; margin-bottom: 1; color: cyan; margin-left: 1; }
OptionList {
background: $surface;
border: none;
height: 1fr;
}
OptionList:blur .option-list--option-highlighted {
background: $primary 100%; /* Brighter blurred highlight */
}
OptionList .option-list--option-highlighted {
background: $primary 100%;
text-style: bold;
}
.category_header {
margin-top: 1;
background: $primary 10%;
text-style: bold;
padding-left: 1;
color: cyan;
}
#footer_rule { margin: 0; }
#chanpick_buttons { height: auto; margin-top: 0; margin-bottom: 0; }
#chanpick_buttons Button { width: 1fr; margin: 0 1; }
"""
def __init__(self, src_channels: list, src_cat_map: dict, tgt_channels: list, tgt_cat_map: dict, tgt_name: str = "Fluxer"):
super().__init__()
self.src_channels = src_channels
self.src_cat_map = src_cat_map
self.tgt_channels = tgt_channels
self.tgt_cat_map = tgt_cat_map
self.tgt_name = tgt_name
def _render_pane(self, channels, categories, pane_id, prefix):
cat_grouped: dict[int | None, list] = {}
seen_ids = set()
for c in channels:
cat_id = getattr(c, "category_id", None) if not isinstance(c, dict) else c.get("parent_id")
cid = c.get("id") if isinstance(c, dict) else c.id
if cid in seen_ids:
continue
seen_ids.add(cid)
cat_grouped.setdefault(cat_id, []).append(c)
options = []
for cat_id in sorted(cat_grouped, key=lambda k: categories.get(k, "") if k else ""):
if cat_id is not None and cat_id in categories:
# Category header as a bold, non-selectable option
options.append(Option(f"[bold cyan]{categories[cat_id]}[/bold cyan]", id=f"header_{cat_id}", disabled=True))
for c in cat_grouped[cat_id]:
if isinstance(c, dict):
name = c.get("name", "Unnamed")
cid = c.get("id")
else:
name = c.name
cid = c.id
options.append(Option(name, id=f"{prefix}_{cid}"))
yield OptionList(*options, id=f"{prefix}_list", classes="split_pane")
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="cp_outer"):
with Container(id="chanpick_dialog"):
yield Label("Select Source and Target Channels", id="chanpick_title")
with Horizontal(id="chanpick_split"):
with Vertical():
yield Label("Source: Discord", classes="pane_title")
yield from self._render_pane(self.src_channels, self.src_cat_map, "pane_src", "src")
with Vertical():
yield Label(f"Target: {self.tgt_name}", classes="pane_title")
yield from self._render_pane(self.tgt_channels, self.tgt_cat_map, "pane_tgt", "tgt")
yield Rule(id="footer_rule")
with Horizontal(id="chanpick_buttons"):
yield Button("Select", variant="success", id="btn_pick_ok", tooltip="Confirm selection and start migration")
yield Button("Back", id="btn_pick_back", tooltip="Cancel selection")
yield Footer()
def on_mount(self) -> None:
"""Set initial highlights after mounting."""
for lid in ["#src_list", "#tgt_list"]:
lst = self.query_one(lid, OptionList)
if lst.option_count > 0:
# Find first selectable (non-disabled) option
for i in range(lst.option_count):
opt = lst.get_option_at_index(i)
if not opt.disabled:
lst.highlighted = i
break
def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
"""Handle selection in either list."""
if event.option_list.id == "src_list":
# QoL: Auto-select target if name matches
src_name = str(event.option.prompt).strip().lower()
tgt_list = self.query_one("#tgt_list", OptionList)
for i in range(tgt_list.option_count):
opt = tgt_list.get_option_at_index(i)
if opt.id and opt.id.startswith("tgt_"):
if str(opt.prompt).strip().lower() == src_name:
tgt_list.highlighted = i
tgt_list.scroll_to_highlight()
break
def on_button_pressed(self, event: Button.Pressed):
if event.button.id == "btn_pick_back":
self.dismiss(None)
elif event.button.id == "btn_pick_ok":
src_list = self.query_one("#src_list", OptionList)
tgt_list = self.query_one("#tgt_list", OptionList)
src_val = None
tgt_val = None
if src_list.highlighted is not None:
opt = src_list.get_option_at_index(src_list.highlighted)
if opt.id and opt.id.startswith("src_"):
src_val = opt.id.split("_", 1)[1]
if tgt_list.highlighted is not None:
opt = tgt_list.get_option_at_index(tgt_list.highlighted)
if opt.id and opt.id.startswith("tgt_"):
tgt_val = opt.id.split("_", 1)[1]
if src_val and tgt_val:
self.dismiss((int(src_val), tgt_val)) # Source is guaranteed Discord ID (int), Target could be UUID/string
else:
self.notify("Please select one channel from both lists.", severity="warning")
# ---------------------------------------------------------------------------
# ChannelSelectModal multi-channel selection with sync/force (backup)
# ---------------------------------------------------------------------------
class ChannelSelectScreen(Screen[dict]):
"""Screen for selecting channels using a simple checkbox list."""
DEFAULT_CSS = """
ChannelSelectScreen { align: center middle; }
#cs_outer { width: 100%; height: 100%; align: center top; }
#channel_dialog {
width: 80%;
height: 100%;
layout: vertical;
border: solid green;
padding: 1 2;
margin: 2 0;
background: $surface;
}
#cs_header { height: auto; margin-bottom: 1; }
#chan_title { text-style: bold; }
#chan_warning { padding-left: 1; color: yellow; text-style: bold; }
#channel_list_scroll {
height: 1fr;
border: solid $primary;
margin-bottom: 1;
padding: 0 1;
}
.category_header {
margin-top: 1;
background: $primary 10%;
text-style: bold;
padding-left: 1;
}
.label_warning {
padding-top: 1;
padding-right: 1;
color: yellow;
}
#select_all_buttons { height: auto; margin-bottom: 1; }
#select_all_buttons Button { width: auto; margin-right: 1; }
#confirm_buttons { height: auto; margin-top: 0; dock: bottom; margin-bottom: 0; }
#confirm_buttons Button { width: 1fr; margin: 0 1; }
#footer_rule { margin: 0; }
"""
def __init__(self, channels: list, categories: dict, backed_up_ids: set, any_found: bool):
super().__init__()
# Group channels by category
self.channels_by_category = {}
for c in channels:
cat_id = getattr(c, 'category_id', None)
if cat_id not in self.channels_by_category:
self.channels_by_category[cat_id] = []
self.channels_by_category[cat_id].append(c)
self.categories = categories # id -> name
self.backed_up_ids = backed_up_ids
self.any_found = any_found
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="cs_outer"):
with Container(id="channel_dialog"):
with Horizontal(id="cs_header"):
yield Label("Select Channels to Backup", id="chan_title")
if self.any_found:
yield Label(" (Existing backups found)", id="chan_warning")
with VerticalScroll(id="channel_list_scroll"):
cat_ids = sorted(
[k for k in self.channels_by_category.keys() if k is not None],
key=lambda k: self.categories.get(k, ""),
)
# No category channels
if None in self.channels_by_category:
for c in sorted(self.channels_by_category[None], key=lambda x: x.position if hasattr(x, 'position') else 0):
label = f"{c.name}"
color = "green" if c.id in self.backed_up_ids else "white"
yield RadioButton(f"[{color}]{label}[/]", value=False, id=f"chan_{c.id}")
for cat_id in cat_ids:
cat_name = self.categories.get(cat_id, "Unknown Category")
yield Label(f"[cyan]{cat_name}[/cyan]", classes="category_header")
for c in sorted(self.channels_by_category[cat_id], key=lambda x: x.position if hasattr(x, 'position') else 0):
label = f"{c.name}"
color = "green" if c.id in self.backed_up_ids else "white"
yield RadioButton(f"[{color}]{label}[/]", value=False, id=f"chan_{c.id}")
if self.any_found:
yield Label("", classes="label_warning")
yield Label("Note: Channels shown in green have existing backups", classes="label_warning")
with Horizontal(id="select_all_buttons"):
yield Button("Select All", id="btn_all", tooltip="Select all channels for backup")
yield Button("Deselect All", id="btn_none", tooltip="Deselect all channels")
yield Rule(id="footer_rule")
with Horizontal(id="confirm_buttons"):
if self.any_found:
yield Button("Sync", variant="success", id="btn_sync", tooltip="Backup new channels\n& update existing backups")
yield Button("Force Overwrite", variant="warning", id="btn_force", tooltip="Overwrite existing backups\nwith fresh data")
else:
yield Button("Backup", variant="success", id="btn_backup", tooltip="Start backing up selected channels")
yield Button("Back", id="btn_cancel_chan", tooltip="Cancel and go back")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_all":
for cb in self.query(RadioButton):
cb.value = True
elif event.button.id == "btn_none":
for cb in self.query(RadioButton):
cb.value = False
elif event.button.id in ["btn_sync", "btn_force", "btn_backup"]:
selected = []
for cb in self.query(RadioButton):
if cb.value and cb.id and cb.id.startswith("chan_"):
chan_id = int(cb.id.split("_")[1])
selected.append(chan_id)
if not selected:
return
force = event.button.id == "btn_force"
self.dismiss({"channels": selected, "force": force})
elif event.button.id == "btn_cancel_chan":
self.dismiss(None)
# ---------------------------------------------------------------------------
# MessageIDInputModal Input a Discord Message ID and verify it
# ---------------------------------------------------------------------------
class MessageIDInputModal(ModalScreen[int | None]):
"""Modal to input a message ID, verify it exists, and then confirm start."""
DEFAULT_CSS = """
MessageIDInputModal { align: center middle; }
#msg_id_dialog {
width: 80%;
height: auto;
border: solid yellow;
padding: 1 2;
background: $surface;
}
#msg_preview_container {
border: solid $primary;
padding: 1 2;
margin: 1 0;
height: auto;
min-height: 5;
}
#msg_id_buttons {
height: auto;
dock: bottom;
margin-top: 0;
}
#msg_id_buttons Button { width: 1fr; margin: 0 1; }
"""
def __init__(self, reader, channel_id: int):
super().__init__()
self.reader = reader
self.channel_id = channel_id
self.verified_id: int | None = None
def compose(self) -> ComposeResult:
with Container(id="msg_id_dialog"):
yield Label("[bold yellow]Start from specific Message ID[/bold yellow]")
yield Input(placeholder="Enter Discord Message ID (e.g., 123456789012345678)", id="input_msg_id", type="number")
with Container(id="msg_preview_container"):
yield Label("Enter an ID and click Verify to preview.", id="lbl_msg_preview")
with Horizontal(id="msg_id_buttons"):
yield Button("Verify", variant="primary", id="btn_verify_start", disabled=True, tooltip="Check if this message ID exists in the channel")
yield Button("Back", variant="warning", id="btn_cancel_msg_id", tooltip="Cancel and go back")
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "input_msg_id":
btn = self.query_one("#btn_verify_start", Button)
self.verified_id = None
btn.label = "Verify"
btn.variant = "primary"
val = event.input.value.strip()
if val and val.isdigit():
btn.disabled = False
else:
btn.disabled = True
preview = self.query_one("#lbl_msg_preview", Label)
preview.update("Click Verify to fetch message.")
async def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_cancel_msg_id":
self.dismiss(None)
elif event.button.id == "btn_verify_start":
# If already verified, we are starting
if self.verified_id is not None:
self.dismiss(self.verified_id)
return
# Otherwise we verify
btn = event.button
inp = self.query_one("#input_msg_id", Input)
preview = self.query_one("#lbl_msg_preview", Label)
msg_id_str = inp.value.strip()
try:
msg_id = int(msg_id_str)
except ValueError:
preview.update("[bold red]Invalid ID format. Must be numeric.[/bold red]")
return
btn.disabled = True
preview.update("[cyan]Fetching message...[/cyan]")
try:
msg = await self.reader.get_message(self.channel_id, msg_id)
if not msg:
preview.update("[bold red]Message not found in this channel.[/bold red]")
else:
self.verified_id = msg_id
content = msg.content or (f"[dim]({len(msg.attachments)} attachments)[/dim]" if msg.attachments else "[dim](no content)[/dim]")
preview.update(f"[bold green]Message Found![/bold green]\n\n[bold]{msg.author.display_name}:[/bold] {content[:300]}")
btn.label = "Start Migration"
btn.variant = "success"
except Exception as e:
preview.update(f"[bold red]Error fetching message:[/bold red] {e}")
finally:
btn.disabled = False