migration auto test in tui

This commit is contained in:
rambros 2026-03-30 22:16:32 +05:30
parent 2ef141776c
commit bd80760667
6 changed files with 377 additions and 95 deletions

View file

@ -12,7 +12,7 @@ def setup_logging():
except Exception:
level = logging.INFO
handlers = [RotatingFileHandler('.reaper.log', mode='a', maxBytes=10*1024*1024, backupCount=3)]
handlers = [RotatingFileHandler('.reaper.log', mode='w', maxBytes=10*1024*1024, backupCount=3)]
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',

View file

@ -15,41 +15,52 @@ async def sync_channel_state(context: MigrationContext):
channels = await context.discord_reader.get_channels()
fluxer_channels = await context.fluxer_writer.get_channels()
# Build name -> id map and ID set for Fluxer for fast lookup
fluxer_name_map = {c.get("name"): str(c.get("id")) for c in fluxer_channels if c.get("name")}
fluxer_id_set = {str(c.get("id")) for c in fluxer_channels}
# Build maps for Fluxer lookup
# {name: id} for categories
fluxer_cats = {c.get("name"): str(c.get("id")) for c in fluxer_channels if c.get("type") == 4}
# {parent_id: {name: id}} for channels
fluxer_structure = {}
for c in fluxer_channels:
if c.get("type") == 4: continue
p_id = str(c.get("parent_id")) if c.get("parent_id") else "root"
if p_id not in fluxer_structure: fluxer_structure[p_id] = {}
fluxer_structure[p_id][c.get("name")] = str(c.get("id"))
fluxer_id_set = {str(c.get("id")) for c in fluxer_channels}
updates = 0
removals = 0
# 1. Verify and Sync Categories
# 1. Sync Categories
for cat in categories:
discord_id = str(cat.id)
fluxer_id = context.state.get_fluxer_category_id(discord_id)
if fluxer_id:
if fluxer_id not in fluxer_id_set:
context.state.remove_category_mapping(discord_id)
removals += 1
elif cat.name in fluxer_name_map:
context.state.set_category_mapping(discord_id, fluxer_name_map[cat.name])
elif cat.name in fluxer_cats:
context.state.set_category_mapping(discord_id, fluxer_cats[cat.name])
updates += 1
# 2. Verify and Sync Channels
# 2. Sync Channels (parent-aware)
for ch in channels:
discord_id = str(ch.id)
fluxer_id = context.state.get_fluxer_channel_id(discord_id)
if fluxer_id:
if fluxer_id not in fluxer_id_set:
context.state.remove_channel_mapping(discord_id)
removals += 1
elif ch.name in fluxer_name_map:
context.state.set_channel_mapping(discord_id, fluxer_name_map[ch.name])
else:
# Try to match by name within the mapped parent category
p_discord_id = str(ch.category_id) if ch.category_id else "root"
p_fluxer_id = context.state.get_fluxer_category_id(p_discord_id) if p_discord_id != "root" else "root"
if p_fluxer_id in fluxer_structure and ch.name in fluxer_structure[p_fluxer_id]:
context.state.set_channel_mapping(discord_id, fluxer_structure[p_fluxer_id][ch.name])
updates += 1
if updates > 0 or removals > 0:
logger.info(f"Channel sync: {updates} mapped, {removals} stale mappings removed")
logger.info(f"Fluxer Channel sync: {updates} mapped, {removals} stale mappings removed")
async def migrate_channels(context: MigrationContext, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, force: bool = False) -> dict:

View file

@ -343,14 +343,18 @@ class FluxerWriter:
logger.debug(f"Fluxer: Sending message via bot for user '{author_name}'")
try:
kwargs = {
"channel_id": channel_id,
"content": final_bot_content,
"embeds": normalized_embeds
}
if fluxer_files:
kwargs["files"] = fluxer_files
if message_reference:
kwargs["message_reference"] = message_reference
msg_data = await asyncio.wait_for(
self.client.send_message(
channel_id=channel_id,
content=final_bot_content,
files=fluxer_files,
embeds=normalized_embeds,
message_reference=message_reference
),
self.client.send_message(**kwargs),
timeout=45.0
)
logger.debug(f"Fluxer: Bot send complete, msg_id={msg_data.get('id') if msg_data else 'None'}")
@ -374,19 +378,23 @@ class FluxerWriter:
fluxer_files = None
if files:
fluxer_files = [File(io.BytesIO(f["data"]), filename=f["filename"]) for f in files]
fluxer_files = [f if hasattr(f, "filename") else File(io.BytesIO(f["data"]), filename=f["filename"]) for f in files]
message_reference = None
if reply_to_message_id:
message_reference = {"message_id": str(reply_to_message_id), "channel_id": str(channel_id)}
try:
msg_data = await self.client.send_message(
channel_id=channel_id,
content=content,
files=fluxer_files,
message_reference=message_reference
)
kwargs = {
"channel_id": channel_id,
"content": content
}
if fluxer_files:
kwargs["files"] = fluxer_files
if message_reference:
kwargs["message_reference"] = message_reference
msg_data = await self.client.send_message(**kwargs)
return str(msg_data["id"]) if msg_data else None
except Exception as e:
print(f"Failed to send marker: {e}")

View file

@ -16,41 +16,52 @@ async def sync_channel_state(context: MigrationContext):
channels = await context.discord_reader.get_channels()
target_channels = await context.writer.get_channels()
# Build name -> id map and ID set for Stoat for fast lookup
target_name_map = {c.get("name"): str(c.get("id")) for c in target_channels if c.get("name")}
target_id_set = {str(c.get("id")) for c in target_channels}
# Build maps for Stoat lookup
# {name: id} for categories (type 4)
target_cats = {c.get("name"): str(c.get("id")) for c in target_channels if c.get("type") == 4}
# {parent_id: {name: id}} for channels
target_structure = {}
for c in target_channels:
if c.get("type") == 4: continue
p_id = str(c.get("parent_id")) if c.get("parent_id") else "root"
if p_id not in target_structure: target_structure[p_id] = {}
target_structure[p_id][c.get("name")] = str(c.get("id"))
target_id_set = {str(c.get("id")) for c in target_channels}
updates = 0
removals = 0
# 1. Verify and Sync Categories
# 1. Sync Categories
for cat in categories:
discord_id = str(cat.id)
target_id = context.state.get_target_category_id(discord_id)
if target_id:
if target_id not in target_id_set:
context.state.remove_category_mapping(discord_id)
removals += 1
elif cat.name in target_name_map:
context.state.set_target_category_mapping(discord_id, target_name_map[cat.name])
elif cat.name in target_cats:
context.state.set_target_category_mapping(discord_id, target_cats[cat.name])
updates += 1
# 2. Verify and Sync Channels
# 2. Sync Channels (parent-aware)
for ch in channels:
discord_id = str(ch.id)
target_id = context.state.get_target_channel_id(discord_id)
if target_id:
if target_id not in target_id_set:
context.state.remove_channel_mapping(discord_id)
removals += 1
elif ch.name in target_name_map:
context.state.set_target_channel_mapping(discord_id, target_name_map[ch.name])
else:
# Try to match by name within the mapped parent category
p_discord_id = str(ch.category_id) if ch.category_id else "root"
p_target_id = context.state.get_target_category_id(p_discord_id) if p_discord_id != "root" else "root"
if p_target_id in target_structure and ch.name in target_structure[p_target_id]:
context.state.set_target_channel_mapping(discord_id, target_structure[p_target_id][ch.name])
updates += 1
if updates > 0 or removals > 0:
logger.info(f"Channel sync: {updates} mapped, {removals} stale mappings removed")
logger.info(f"Stoat Channel sync: {updates} mapped, {removals} stale mappings removed")
async def migrate_channels(context: MigrationContext, progress_callback: Callable[[str, str, int, int], Awaitable[None]] | None = None, force: bool = False) -> dict:
@ -107,7 +118,22 @@ async def migrate_channels(context: MigrationContext, progress_callback: Callabl
if total == 0:
return cloned_info
# 1. Create missing channels (unparented for now)
# 1. Create missing categories first
for cat in missing_categories:
if not context.is_running: break
state_key = str(cat.id)
target_id = await context.writer.create_channel(cat.name, type=4)
if target_id:
context.state.set_target_category_id(state_key, target_id)
cloned_info["categories_created"].append(cat.name)
if cat.name not in cloned_info["structure"]:
cloned_info["structure"][cat.name] = []
current_idx += 1
if progress_callback: await progress_callback(cat.name, "Copying", current_idx, total)
# 2. Create missing channels (now with parent_id available)
for channel in channels_to_create:
if not context.is_running: break
@ -119,7 +145,6 @@ async def migrate_channels(context: MigrationContext, progress_callback: Callabl
logger.debug(f"Creating channel {channel.name}: topic={topic}, nsfw={nsfw}, slowmode={slowmode}")
# Map Discord-specific types to target-supported types
# 5 (News) -> 0 (Text), and fallback any unknown non-voice types to text
raw_type = channel.type.value if hasattr(channel.type, 'value') else 0
if raw_type == context.discord_reader.CHANNEL_TYPE_VOICE.value:
ch_type = 2
@ -128,20 +153,22 @@ async def migrate_channels(context: MigrationContext, progress_callback: Callabl
ch_type = 0
is_voice = False
else:
# Fallback for Stage channels (13) etc. to Text for safety
ch_type = 0
is_voice = False
# Resolve parent category
parent_id = context.state.get_target_category_id(str(channel.category_id)) if channel.category_id else None
target_id = await context.writer.create_channel(
name=channel.name,
topic=topic if not is_voice else "",
type=ch_type,
parent_id=None,
parent_id=parent_id,
nsfw=nsfw if not is_voice else False,
slowmode_delay=slowmode if not is_voice else 0
)
if target_id:
context.state.set_target_channel_mapping(state_key, target_id)
context.state.set_target_channel_id(state_key, target_id)
cloned_info["channels_created"].append(channel.name)
parent_name = cat_name_map.get(str(channel.category_id), "No Category") if channel.category_id else "No Category"
@ -156,7 +183,8 @@ async def migrate_channels(context: MigrationContext, progress_callback: Callabl
name=channel.name,
topic=topic,
nsfw=nsfw,
slowmode_delay=slowmode
slowmode_delay=slowmode,
parent_id=parent_id
)
current_idx += 1
@ -172,32 +200,21 @@ async def migrate_channels(context: MigrationContext, progress_callback: Callabl
logger.debug(f"Syncing existing channel {channel.name} ({target_id}): topic={topic}, nsfw={nsfw}, slowmode={slowmode}")
parent_id = context.state.get_target_category_id(str(channel.category_id)) if channel.category_id else None
await context.writer.modify_channel(
channel_id=target_id,
name=channel.name,
topic=topic,
nsfw=nsfw,
slowmode_delay=slowmode
slowmode_delay=slowmode,
parent_id=parent_id
)
cloned_info["channels_synced"].append(channel.name)
cloned_info["cloned_info" if "cloned_info" in locals() else "channels_synced"].append(channel.name)
current_idx += 1
if progress_callback: await progress_callback(channel.name, "Syncing", current_idx, total)
# 3. Create missing categories
for cat in missing_categories:
if not context.is_running: break
state_key = str(cat.id)
target_id = await context.writer.create_channel(cat.name, type=4)
if target_id:
context.state.set_target_category_mapping(state_key, target_id)
cloned_info["categories_created"].append(cat.name)
if cat.name not in cloned_info["structure"]:
cloned_info["structure"][cat.name] = []
current_idx += 1
if progress_callback: await progress_callback(f"Cat: {cat.name}", "Copying", current_idx, total)
# 4. Final step: Parent the channels into categories via mass server.edit()

View file

@ -164,6 +164,9 @@ class OperationPane(Container):
yield Rule(id="footer_rule")
yield Button("Danger Zone ⚠", id="op_danger", variant="error", disabled=True, flat=True, tooltip="Dangerous operations:\ndelete channels, roles, emojis on target\n(use with caution)")
if self.cfg_name == "AutoTest":
yield Button("RUN AUTO TEST", id="op_autotest", variant="warning", flat="false", disabled=True, tooltip="Execute automated test sequence for the AutoTest profile")
def on_mount(self) -> None:
self._rebuild_engine()
self._update_info_labels()
@ -395,7 +398,7 @@ class OperationPane(Container):
lbl.update(f"{t_status}")
# Buttons
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_waterfall", "#op_danger"):
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_waterfall", "#op_danger", "#op_autotest"):
for btn in self.query(bid): btn.disabled = not self.tokens_valid
# ── validation ────────────────────────────────────────────────────────
@ -416,10 +419,10 @@ class OperationPane(Container):
# Disable all operation buttons while validation is in progress
if self.view_mode == "shuttle":
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_waterfall", "#op_danger"):
for bid in ("#op_clone", "#op_sync", "#op_messages", "#op_waterfall", "#op_danger", "#op_autotest"):
for btn in self.query(bid): btn.disabled = True
elif self.view_mode == "backup":
for bid in ("#op_backup_msgs", "#op_backup_sync"):
for bid in ("#op_backup_msgs", "#op_backup_sync", "#op_autotest"):
for btn in self.query(bid): btn.disabled = True
except Exception as e:
logger.error(f"Error in run_validate setup: {e}")
@ -590,6 +593,136 @@ class OperationPane(Container):
from src.ui.backup_stats import BackupStatsScreen
target_dir = Path(self._base_dir()) / f"DISCORD_BACKUP-{self.config.discord_server_id}"
self.app.push_screen(BackupStatsScreen(self.cfg_name, target_dir))
elif bid == "op_autotest":
self.run_autotest_sequence()
@work(exclusive=True)
async def run_autotest_sequence(self) -> None:
"""Entry point for the AUTO TEST sequence."""
if not self.tokens_valid:
return
modal = ProgressScreen(log_level=self.config.log_level)
self.app.push_screen(modal)
await asyncio.sleep(0.1)
try:
if self.view_mode == "shuttle":
await self._run_migration_autotest_logic(modal)
elif self.view_mode == "backup":
await self._run_backup_autotest_logic(modal)
modal.phase_report("AUTO TEST Complete", show_back=False)
modal.write("[bold green]Full automated test sequence finished successfully![/bold green]")
except Exception as e:
logger.error(f"Auto-Test Error: {e}\n{traceback.format_exc()}")
modal.write(f"[bold red]Error: {e}[/bold red]")
modal.phase_report("Auto-Test", "error", show_back=False)
finally:
self.engine.is_running = False
await self.engine.close_connections()
async def _run_migration_autotest_logic(self, modal: ProgressScreen) -> None:
"""Executes the full migration test sequence."""
modal.set_status("AUTO TEST: Launching Migration Sequence...")
modal.write("[bold yellow]Starting Migration Auto-Test...[/bold yellow]")
migrate_mod = fluxer_migrate if self.target_platform == "fluxer" else stoat_migrate
# 1. Connect and initialize state
modal.set_status("Connecting and Initializing State...")
await self.engine.start_connections()
self.engine.is_running = True
# Initialize state database early to avoid Warnings
tid = self.config.fluxer_server_id if self.target_platform == "fluxer" else self.config.stoat_server_id
tgt_info = await self.engine.writer.validate()
self.engine.ensure_state_initialized(str(tid or ""), tgt_info.get("community_name", "Target"))
# 2. Danger Zone Clean
modal.write("\n[bold red]Phase 1: Danger Zone Cleanup[/bold red]")
await self._logic_dz_delete_channels(modal)
await self._logic_dz_reset_perms(modal)
await self._logic_dz_delete_roles(modal)
await self._logic_dz_delete_assets(modal)
# 3. Clone Roles & Permissions
modal.write("\n[bold cyan]Phase 2: Cloning Roles & Permissions[/bold cyan]")
await self._logic_clone_roles(modal, force=True)
# 4. Clone Assets (Logo, Banner, Emojis, Stickers)
modal.write("\n[bold cyan]Phase 3: Syncing Metadata & Assets[/bold cyan]")
await self._logic_copy_assets(modal, ["Emoji", "Sticker"], force=True)
await self._logic_sync_metadata(modal, ["name", "icon", "banner"])
# 5. Clone Template (Structure)
modal.write("\n[bold cyan]Phase 4: Cloning Server Structure (1/2)[/bold cyan]")
await self._logic_clone_channels(modal, force=True)
await self._logic_sync_permissions(modal)
# 6. Waterfall Migration (Only for backups)
if self.engine.source_mode == "backup" :
modal.write("\n[bold cyan]Phase 5: Waterfall Message Migration[/bold cyan]")
await self._logic_waterfall_migration(modal=modal, is_autotest=True)
else:
modal.write("\n[bold yellow]Phase 5: Skipping Waterfall (Live mode selected)[/bold yellow]")
# 7. Individual Channel Migration (Automated)
modal.write("\n[bold cyan]Phase 7: Individual Channel Migration[/bold cyan]")
await self._logic_autotest_migrate_all_channels(modal=modal)
async def _run_backup_autotest_logic(self, modal: ProgressScreen) -> None:
"""Executes the full backup test sequence."""
modal.set_status("AUTO TEST: Launching Backup Sequence...")
modal.write("[bold yellow]Starting Backup Auto-Test...[/bold yellow]")
# 1. Clear old backup
modal.set_status("Clearing old backup database...")
db_path = Path(self._base_dir()) / f"DISCORD_BACKUP-{self.config.discord_server_id}" / "backup.db"
if db_path.exists():
modal.write(f"[yellow]Deleting existing database: {db_path.name}[/yellow]")
db_path.unlink()
# 2. Setup exporter
await self.engine.discord_reader.start()
await self.exporter.setup()
self.exporter.is_running = True
# 3. Full Backup
modal.write("\n[bold cyan]Phase 1: Full Server Backup[/bold cyan]")
modal.show_stats()
await self.exporter.export_metadata()
await self.exporter.download_server_assets()
await self.exporter.export_channels_structure()
await self.exporter.export_roles()
await self.exporter.export_assets()
eligible_channels = [
c for c in await self.engine.discord_reader.get_channels()
if c.type in [
self.engine.discord_reader.CHANNEL_TYPE_TEXT,
self.engine.discord_reader.CHANNEL_TYPE_NEWS,
self.engine.discord_reader.CHANNEL_TYPE_FORUM
]
]
total_chans = len(eligible_channels)
for i, chan in enumerate(eligible_channels):
if not self.exporter.is_running: break
modal.set_item_status(f"Backing up ({i+1}/{total_chans}): #{chan.name}")
modal.set_progress(i, total_chans)
async def update_backup(name, count, author=None, preview=None, threads=0, files=0):
modal.update_stats(messages=str(count), threads=str(threads), files=str(files))
await self.exporter.export_channel_messages(
chan.id, progress_callback=update_backup, force=True
)
modal.write(f"[green]Completed: #{chan.name}[/green]")
modal.set_progress(total_chans, total_chans)
# ── (1) clone server template (combined) ─────────────────────────────
@ -1012,13 +1145,64 @@ class OperationPane(Container):
# ── (5) message migration ─────────────────────────────────────────────
@work(exclusive=True)
async def run_migrate_messages(self) -> None:
async def run_migrate_messages(self, modal: ProgressScreen | None = None) -> None:
await self._logic_migrate_messages(modal)
async def _logic_autotest_migrate_all_channels(self, modal: ProgressScreen) -> None:
"""Automated name-based channel migration for Auto-Test."""
migrate_mod = fluxer_migrate if self.target_platform == "fluxer" else stoat_migrate
# 1. Matching
modal.set_status("Auto-matching channels by name...")
await self._perform_auto_matching()
# 2. Get channels
d_channels = await self.engine.discord_reader.get_channels()
text_channels = [c for c in d_channels if c.type in [
self.engine.discord_reader.CHANNEL_TYPE_TEXT,
self.engine.discord_reader.CHANNEL_TYPE_NEWS
]]
modal.write(f"[bold cyan]Auto-Test: Found {len(text_channels)} channels to migrate.[/bold cyan]")
# 3. Migrate loop
for i, ch in enumerate(text_channels):
if not self.engine.is_running: break
tgt_id = self.engine.state.get_target_channel_id(str(ch.id))
if not tgt_id:
modal.write(f"[yellow]Skipping #{ch.name} (no target mapping found)[/yellow]")
continue
modal.write(f"\n[bold]Migrating #{ch.name} ({i+1}/{len(text_channels)}) -> Target ID {tgt_id}[/bold]")
# Analyze (silent)
stats = await migrate_mod.analyze_migration(self.engine, source_channel_id=ch.id, after_message_id=None)
ch_total = stats["messages"]
async def update_indiv(curr):
c = curr["messages"]
modal.set_progress(c, ch_total or 100)
modal.set_item_status(f"#{ch.name}: {c}/{ch_total} messages")
await migrate_mod.migrate_messages(
self.engine,
source_channel_id=ch.id,
target_channel_id=tgt_id,
after_message_id=None,
progress_callback=update_indiv
)
modal.write("\n[bold green]Automated channel migration complete.[/bold green]")
async def _logic_migrate_messages(self, modal: ProgressScreen | None = None, is_autotest: bool = False) -> None:
if not self.tokens_valid:
return
migrate_mod = fluxer_migrate if self.target_platform == "fluxer" else stoat_migrate
platform_name = self.target_platform.capitalize()
if not modal:
modal = ProgressScreen(log_level=self.config.log_level)
self.app.push_screen(modal)
await asyncio.sleep(0.1)
@ -1418,13 +1602,17 @@ class OperationPane(Container):
await self.engine.close_connections()
@work(exclusive=True)
async def run_waterfall_migration(self) -> None:
async def run_waterfall_migration(self, modal: ProgressScreen | None = None) -> None:
await self._logic_waterfall_migration(modal)
async def _logic_waterfall_migration(self, modal: ProgressScreen | None = None, is_autotest: bool = False) -> None:
if not self.tokens_valid:
return
migrate_mod = fluxer_migrate if self.target_platform == "fluxer" else stoat_migrate
platform_name = self.target_platform.capitalize()
if not modal:
modal = ProgressScreen(log_level=self.config.log_level)
self.app.push_screen(modal)
await asyncio.sleep(0.1)
@ -1434,6 +1622,9 @@ class OperationPane(Container):
modal.set_status("Connecting to Servers...")
await self.engine.start_connections()
# Ensure writer is validated before auto-matching (fixes NoneType role fetch error)
await self.engine.writer.validate()
modal.set_status("Synchronizing entity mappings...")
await self._perform_auto_matching()
@ -1460,6 +1651,10 @@ class OperationPane(Container):
prefix = "[bold cyan]📁[/bold cyan] " if mc.type == 4 else "[bold white]#[/bold white] "
modal.write(f" {prefix}{mc.name}")
if is_autotest:
choice = "btn_start_first"
modal.write("[bold cyan]Auto-Test: Automatically cloning missing channels.[/bold cyan]")
else:
choice = await modal.phase_wait_confirm(
show_continue=False,
show_id=True,
@ -1560,6 +1755,10 @@ class OperationPane(Container):
else:
modal.write("No previous migration state found. Starting from the beginning.")
if is_autotest:
choice = "btn_continue" if min_last_id is not None else "btn_start_first"
modal.write(f"[bold cyan]Auto-Test: Automatically choosing {choice.replace('btn_', '').replace('_', ' ')}.[/bold cyan]")
else:
choice = await modal.phase_wait_confirm(
show_continue=min_last_id is not None,
show_id=False,

View file

@ -8,7 +8,7 @@ from src.ui.mode_screen import ModeScreen
from src.core.configuration import AppConfig
import os
from textual.widgets import ListItem, ListView, Input, Button
from textual.widgets import ListItem, ListView, Input, Button, Label
@ -18,6 +18,10 @@ def mock_configs(tmp_path, log):
reaper_dir.mkdir()
(reaper_dir / "reaper_config.yaml").write_text("discord_bot_token: 'fake'\ndiscord_server_id: '123'\ntool_mode: 'backup_only'")
autotest_dir = tmp_path / "ReaperFiles-AutoTest"
autotest_dir.mkdir()
(autotest_dir / "reaper_config.yaml").write_text("discord_bot_token: 'fake'\ndiscord_server_id: '123'\ntool_mode: 'backup_transfer'")
old_cwd = os.getcwd()
os.chdir(tmp_path)
log(f"CWD changed to: {tmp_path}")
@ -115,3 +119,46 @@ async def test_ui_operation_trigger(mock_configs, log):
except Exception as e:
log(f"test_ui_operation_trigger FAILED: {e}")
raise
@pytest.mark.asyncio
async def test_ui_autotest_button(mock_configs, log):
"""Verify visibility and trigger of the AUTO TEST button."""
log("Running test_ui_autotest_button")
from src.ui.shuttle_ops import OperationPane
try:
with patch("src.ui.main_app.ConfigSelectionScreen.check_updates", AsyncMock()):
with patch.object(OperationPane, "run_validate", AsyncMock()):
app = ReaperApp()
async with app.run_test() as pilot:
await wait_for_screen(app, ConfigSelectionScreen)
# Find the AutoTest item in the list
lv = app.screen.query_one(ListView)
autotest_index = -1
for idx, item in enumerate(lv.children):
if item.name == "AutoTest":
autotest_index = idx
break
assert autotest_index != -1, "AutoTest profile not found in ListView"
target_item = lv.children[autotest_index]
await pilot.click(target_item)
assert await wait_for_screen(app, ModeScreen), "Timed out waiting for ModeScreen"
# Verify button is present
pane = app.screen.query_one(OperationPane)
btn = pane.query_one("#op_autotest", Button)
assert btn.display is True
assert "AUTO TEST" in str(btn.label)
# Mock the sequence and trigger it
with patch.object(OperationPane, "run_autotest_sequence", AsyncMock()) as mock_seq:
btn.disabled = False
await pilot.pause(0.1)
btn.focus()
await pilot.press("enter")
await pilot.pause(0.1)
assert mock_seq.called
log("test_ui_autotest_button PASSED")
except Exception as e:
log(f"test_ui_autotest_button FAILED: {e}")
raise