auto test scripts
This commit is contained in:
parent
2ddc4424cd
commit
2ef141776c
8 changed files with 650 additions and 2 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -41,8 +41,8 @@ reaper_config.yaml
|
||||||
*.log
|
*.log
|
||||||
|
|
||||||
# Temporary Test Scripts
|
# Temporary Test Scripts
|
||||||
|
#test_*.py
|
||||||
tmp/
|
tmp/
|
||||||
test_*.py
|
|
||||||
test_release.zip
|
test_release.zip
|
||||||
test_release/
|
test_release/
|
||||||
DiscoReaper
|
DiscoReaper
|
||||||
|
|
|
||||||
|
|
@ -9,3 +9,6 @@ lottie # Lottie file manipulation and conversion
|
||||||
Pillow # Image processing (required for GIF rendering)
|
Pillow # Image processing (required for GIF rendering)
|
||||||
cairosvg # SVG rendering (required for Lottie conversion)
|
cairosvg # SVG rendering (required for Lottie conversion)
|
||||||
psutil # System information (CPU, RAM, etc.)
|
psutil # System information (CPU, RAM, etc.)
|
||||||
|
pytest # Testing framework
|
||||||
|
pytest-asyncio # Async testing for pytest
|
||||||
|
pytest-mock # Mocking for pytest
|
||||||
21
runtests.sh
Executable file
21
runtests.sh
Executable file
|
|
@ -0,0 +1,21 @@
|
||||||
|
#!/bin/bash
|
||||||
|
# Convenient script to run the Discord Reaper test suite.
|
||||||
|
# Automatically handles VENV activation and PYTHONPATH.
|
||||||
|
|
||||||
|
# Change to the project root directory (where the script is located)
|
||||||
|
cd "$(dirname "$0")"
|
||||||
|
|
||||||
|
# Activate the virtual environment if it exists
|
||||||
|
if [ -d "venv" ]; then
|
||||||
|
source venv/bin/activate
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Set PYTHONPATH to the current directory so src is discoverable
|
||||||
|
export PYTHONPATH=.
|
||||||
|
|
||||||
|
# Run pytest with any arguments passed to this script, defaulting to tests/
|
||||||
|
if [ $# -eq 0 ]; then
|
||||||
|
pytest -v -s -p no:warnings tests/
|
||||||
|
else
|
||||||
|
pytest -s "$@"
|
||||||
|
fi
|
||||||
170
tests/conftest.py
Normal file
170
tests/conftest.py
Normal file
|
|
@ -0,0 +1,170 @@
|
||||||
|
import pytest
|
||||||
|
import shutil
|
||||||
|
import yaml
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
from src.core.backup_database import BackupDatabase
|
||||||
|
from src.core.configuration import AppConfig
|
||||||
|
|
||||||
|
LOG_FILE = (Path(__file__).parent.parent / ".reaper_tests.log").resolve()
|
||||||
|
|
||||||
|
def _log(msg: str):
|
||||||
|
"""Write a message to the shared test log file."""
|
||||||
|
with open(LOG_FILE, "a") as f:
|
||||||
|
f.write(f"{msg}\n")
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def log():
|
||||||
|
"""Fixture to provide the logging function to tests."""
|
||||||
|
return _log
|
||||||
|
|
||||||
|
def pytest_configure(config):
|
||||||
|
"""Register global warning filters and marks."""
|
||||||
|
# Register marks if needed
|
||||||
|
config.addinivalue_line("markers", "asyncio: mark test as asyncio")
|
||||||
|
|
||||||
|
# Silence benign async mock and ResourceWarnings globally
|
||||||
|
config.addinivalue_line("filterwarnings", "ignore::RuntimeWarning")
|
||||||
|
config.addinivalue_line("filterwarnings", "ignore::ResourceWarning")
|
||||||
|
|
||||||
|
def pytest_sessionstart(session):
|
||||||
|
"""Clear the log file at the beginning of the test session."""
|
||||||
|
with open(LOG_FILE, "w") as f:
|
||||||
|
f.write("--- Reaper Test Session Started ---\n")
|
||||||
|
|
||||||
|
def pytest_report_header(config):
|
||||||
|
"""Print data source status to the console header."""
|
||||||
|
test_data_dir = (Path(__file__).parent.parent / "ReaperFiles-AutoTest").resolve()
|
||||||
|
if test_data_dir.exists():
|
||||||
|
return f"[DATA_SOURCE] Automated Test Directory: FOUND ({test_data_dir.name})"
|
||||||
|
else:
|
||||||
|
return "[DATA_SOURCE] Automated Test Directory: NOT FOUND (Falling back to MOCKS)"
|
||||||
|
|
||||||
|
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
||||||
|
def pytest_runtest_makereport(item, call):
|
||||||
|
"""Hook to catch test results and log them to .reaper_tests.log."""
|
||||||
|
outcome = yield
|
||||||
|
rep = outcome.get_result()
|
||||||
|
|
||||||
|
# We only log on the 'call' phase (the actual test run)
|
||||||
|
if rep.when == "call":
|
||||||
|
status = rep.outcome.upper()
|
||||||
|
_log(f"RESULT: {item.nodeid} -> {status}")
|
||||||
|
if rep.failed:
|
||||||
|
_log(f"--- FAILURE DETAILS for {item.name} ---\n{rep.longreprtext}\n---------------------")
|
||||||
|
elif not rep.passed:
|
||||||
|
# Catch setup/teardown failures
|
||||||
|
_log(f"ERROR in {item.nodeid} [{rep.when}]: {rep.outcome.upper()}")
|
||||||
|
if rep.failed:
|
||||||
|
_log(f"--- ERROR DETAILS ---\n{rep.longreprtext}\n---------------------")
|
||||||
|
|
||||||
|
def pytest_warning_recorded(warning_message, when, nodeid, location):
|
||||||
|
"""Hook to catch and log warnings to .reaper_tests.log."""
|
||||||
|
msg = f"WARNING: {warning_message.message}"
|
||||||
|
if nodeid:
|
||||||
|
msg = f"WARNING in {nodeid} [{when}]: {warning_message.message}"
|
||||||
|
_log(msg)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_data_dir():
|
||||||
|
return (Path(__file__).parent.parent / "ReaperFiles-AutoTest").resolve()
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def reaper_config(test_data_dir):
|
||||||
|
config_path = test_data_dir / "reaper_config.yaml"
|
||||||
|
if config_path.exists():
|
||||||
|
with open(config_path, "r") as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
return data
|
||||||
|
# Fallback mock config
|
||||||
|
return {
|
||||||
|
"discord_bot_token": "mock_discord_token",
|
||||||
|
"discord_server_id": "123456789012345678",
|
||||||
|
"tool_mode": "backup_transfer",
|
||||||
|
"target_platform": "fluxer",
|
||||||
|
"fluxer_bot_token": "mock_fluxer_token",
|
||||||
|
"fluxer_server_id": "987654321098765432",
|
||||||
|
"stoat_bot_token": "mock_stoat_token",
|
||||||
|
"stoat_server_id": "MOCK_STOAT_COMMUNITY",
|
||||||
|
"anonymize_users": False,
|
||||||
|
"log_level": "DEBUG"
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_db(test_data_dir, tmp_path, reaper_config):
|
||||||
|
# If the real test data exists, use it
|
||||||
|
if test_data_dir.exists():
|
||||||
|
target_id = reaper_config.get("fluxer_server_id") if reaper_config.get("target_platform") == "fluxer" else reaper_config.get("stoat_server_id")
|
||||||
|
db_files = list(test_data_dir.glob(f"*-{target_id}.db"))
|
||||||
|
if not db_files:
|
||||||
|
db_files = list(test_data_dir.glob("*.db"))
|
||||||
|
|
||||||
|
if db_files:
|
||||||
|
original_db = db_files[0]
|
||||||
|
temp_db_path = tmp_path / "test_migration.db"
|
||||||
|
print(f"[DATA_SOURCE] USE_SAMPLE_DB: {original_db.name}")
|
||||||
|
_log(f"[DATA_SOURCE] USE_SAMPLE_DB: {original_db.name}")
|
||||||
|
shutil.copy(original_db, temp_db_path)
|
||||||
|
return temp_db_path
|
||||||
|
|
||||||
|
# Fallback: Create an empty mock database with the required schema
|
||||||
|
temp_db_path = tmp_path / "mock_migration.db"
|
||||||
|
print("[DATA_SOURCE] USE_MOCK_DB: Fallback to empty schema")
|
||||||
|
_log("[DATA_SOURCE] USE_MOCK_DB: Fallback to empty schema")
|
||||||
|
db = BackupDatabase(temp_db_path)
|
||||||
|
# The BackupDatabase constructor already initializes the schema
|
||||||
|
return temp_db_path
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def backup_db(temp_db):
|
||||||
|
db = BackupDatabase(temp_db)
|
||||||
|
yield db
|
||||||
|
# No explicit close needed for now as it's a persistent connection
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def backup_reader(test_data_dir, reaper_config, tmp_path):
|
||||||
|
sid = reaper_config.get("discord_server_id")
|
||||||
|
backup_path = test_data_dir / f"DISCORD_BACKUP-{sid}"
|
||||||
|
|
||||||
|
if not test_data_dir.exists() or not backup_path.exists():
|
||||||
|
# Fallback: create mock backup structure
|
||||||
|
mock_path = tmp_path / f"DISCORD_BACKUP-{sid}"
|
||||||
|
print(f"[DATA_SOURCE] USE_MOCK_BACKUP: {mock_path.name}")
|
||||||
|
_log(f"[DATA_SOURCE] USE_MOCK_BACKUP: {mock_path.name}")
|
||||||
|
mock_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
db_path = mock_path / "backup.db"
|
||||||
|
db = BackupDatabase(db_path)
|
||||||
|
# Populate with minimal mock data for BackupReader to work
|
||||||
|
db._conn.execute("INSERT OR IGNORE INTO guild_profile (id, name) VALUES (?, ?)", (int(sid), "Mock Guild"))
|
||||||
|
db._conn.execute("INSERT OR IGNORE INTO channels (id, name, type) VALUES (?, ?, ?)", (123, "mock-channel", 0))
|
||||||
|
db._conn.commit()
|
||||||
|
from src.core.backup_reader import BackupReader
|
||||||
|
return BackupReader(mock_path)
|
||||||
|
|
||||||
|
print(f"[DATA_SOURCE] USE_SAMPLE_BACKUP: {backup_path.name}")
|
||||||
|
_log(f"[DATA_SOURCE] USE_SAMPLE_BACKUP: {backup_path.name}")
|
||||||
|
from src.core.backup_reader import BackupReader
|
||||||
|
return BackupReader(backup_path)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_discord_reader():
|
||||||
|
reader = MagicMock()
|
||||||
|
reader.guild = MagicMock()
|
||||||
|
reader.fetch_message_history = AsyncMock()
|
||||||
|
reader.download_attachment = AsyncMock(return_value=b"fake_data")
|
||||||
|
reader.download_sticker = AsyncMock(return_value=b"fake_sticker_data")
|
||||||
|
return reader
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_fluxer_writer():
|
||||||
|
writer = MagicMock()
|
||||||
|
writer.send_message = AsyncMock(return_value="fluxer_msg_123")
|
||||||
|
writer.send_marker = AsyncMock()
|
||||||
|
return writer
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_stoat_writer():
|
||||||
|
writer = MagicMock()
|
||||||
|
writer.send_message = AsyncMock(return_value="stoat_msg_123")
|
||||||
|
writer.send_marker = AsyncMock()
|
||||||
|
return writer
|
||||||
139
tests/test_database.py
Normal file
139
tests/test_database.py
Normal file
|
|
@ -0,0 +1,139 @@
|
||||||
|
import pytest
|
||||||
|
from src.core.backup_database import BackupDatabase
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def db():
|
||||||
|
from tests.conftest import _log
|
||||||
|
msg = "[DATA_SOURCE] UNIT_TEST: Using in-memory database"
|
||||||
|
print(msg)
|
||||||
|
_log(msg)
|
||||||
|
return BackupDatabase(":memory:")
|
||||||
|
|
||||||
|
|
||||||
|
def test_db_initialization(db):
|
||||||
|
res = db._conn.execute(
|
||||||
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='guild_profile'"
|
||||||
|
).fetchone()
|
||||||
|
assert res is not None
|
||||||
|
assert res["name"] == "guild_profile"
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_get_guild_profile(db):
|
||||||
|
profile = {
|
||||||
|
"id": "123456789",
|
||||||
|
"name": "Test Server",
|
||||||
|
"description": "Testing",
|
||||||
|
"owner_id": "987654321",
|
||||||
|
"ignore_channels": ["111", "222"],
|
||||||
|
}
|
||||||
|
db.set_guild_profile(profile)
|
||||||
|
got = db.get_guild_profile()
|
||||||
|
assert got["name"] == "Test Server"
|
||||||
|
assert got["id"] == 123456789
|
||||||
|
assert got["ignore_channels"] == ["111", "222"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_get_roles(db):
|
||||||
|
roles = [
|
||||||
|
{"id": "1", "name": "Admin", "color": 0xFF0000, "position": 1, "permissions": 8, "hoist": True, "mentionable": True},
|
||||||
|
{"id": "2", "name": "User", "color": 0x0000FF, "position": 2, "permissions": 0, "hoist": False, "mentionable": False},
|
||||||
|
]
|
||||||
|
db.save_roles(roles)
|
||||||
|
got = db.get_all_roles()
|
||||||
|
assert len(got) == 2
|
||||||
|
assert {r["name"] for r in got} == {"Admin", "User"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_get_channels(db):
|
||||||
|
channels = [
|
||||||
|
{"id": 10, "name": "general", "type": 0, "position": 0, "category_id": None,
|
||||||
|
"topic": "Talk", "nsfw": 0, "bitrate": None, "slowmode_delay": None},
|
||||||
|
{"id": 20, "name": "voice", "type": 2, "position": 1, "category_id": None,
|
||||||
|
"topic": None, "nsfw": 0, "bitrate": 64000, "slowmode_delay": None},
|
||||||
|
]
|
||||||
|
db.save_channels(channels)
|
||||||
|
got = db.get_all_channels()
|
||||||
|
assert len(got) == 2
|
||||||
|
assert {c["name"] for c in got} == {"general", "voice"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_messages_and_attachments(db):
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"id": 101, "channel_id": 10, "author_id": 999, "content": "Hello",
|
||||||
|
"timestamp": "2023-01-01T00:00:00Z", "type": 0,
|
||||||
|
"message_reference": None, "is_pinned": 0, "extra_data": None,
|
||||||
|
"attachments": [
|
||||||
|
{"id": 1, "filename": "file.png", "size": 100,
|
||||||
|
"url": "http://cdn.test/file.png", "content_type": "image/png", "local_hash": "abc"}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
db.save_messages_batch(messages)
|
||||||
|
msg = db._conn.execute("SELECT content FROM messages WHERE id=101").fetchone()
|
||||||
|
assert msg["content"] == "Hello"
|
||||||
|
att = db._conn.execute("SELECT filename FROM attachments WHERE message_id=101").fetchone()
|
||||||
|
assert att["filename"] == "file.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_last_message_id(db):
|
||||||
|
msgs = [
|
||||||
|
{"id": 200, "channel_id": 10, "author_id": 1, "content": "a",
|
||||||
|
"timestamp": "2023-01-01T00:00:00Z", "type": 0,
|
||||||
|
"message_reference": None, "is_pinned": 0, "extra_data": None},
|
||||||
|
{"id": 201, "channel_id": 10, "author_id": 1, "content": "b",
|
||||||
|
"timestamp": "2023-01-01T00:01:00Z", "type": 0,
|
||||||
|
"message_reference": None, "is_pinned": 0, "extra_data": None},
|
||||||
|
]
|
||||||
|
db.save_messages_batch(msgs)
|
||||||
|
assert db.get_last_message_id("10") == 201
|
||||||
|
|
||||||
|
|
||||||
|
def test_stats_by_channel(db):
|
||||||
|
msgs = [
|
||||||
|
{"id": 101, "channel_id": 10, "author_id": 1, "content": "Hi",
|
||||||
|
"timestamp": "2023-01-01T00:00:00Z", "type": 0,
|
||||||
|
"message_reference": None, "is_pinned": 0, "extra_data": None},
|
||||||
|
{"id": 102, "channel_id": 10, "author_id": 1, "content": "Bye",
|
||||||
|
"timestamp": "2023-01-01T00:01:00Z", "type": 0,
|
||||||
|
"message_reference": None, "is_pinned": 0, "extra_data": None},
|
||||||
|
]
|
||||||
|
db.save_messages_batch(msgs)
|
||||||
|
stats = db.get_stats_by_channel()
|
||||||
|
assert stats[10]["message_count"] == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_threads(db):
|
||||||
|
threads = [
|
||||||
|
{"id": 300, "name": "thread-1", "type": 11, "parent_id": 10,
|
||||||
|
"message_count": 5, "member_count": 2, "archived": 0,
|
||||||
|
"archive_timestamp": None, "auto_archive_duration": 1440,
|
||||||
|
"locked": 0, "applied_tags": None},
|
||||||
|
]
|
||||||
|
db.save_threads(threads)
|
||||||
|
got = db.get_threads_by_parent("10")
|
||||||
|
assert len(got) == 1
|
||||||
|
assert got[0]["name"] == "thread-1"
|
||||||
|
|
||||||
|
|
||||||
|
def test_media_pool(db):
|
||||||
|
db.add_media_to_pool("hash123", "/path/file.png", 512, "image/png", "http://cdn.test/file.png")
|
||||||
|
db._conn.commit()
|
||||||
|
entry = db.get_media_by_hash("hash123")
|
||||||
|
assert entry is not None
|
||||||
|
assert entry["local_path"] == "/path/file.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_messages_paged_after_id(db):
|
||||||
|
msgs = [
|
||||||
|
{"id": i, "channel_id": 10, "author_id": 1, "content": f"msg{i}",
|
||||||
|
"timestamp": f"2023-01-01T00:0{i}:00Z", "type": 0,
|
||||||
|
"message_reference": None, "is_pinned": 0, "extra_data": None}
|
||||||
|
for i in range(5)
|
||||||
|
]
|
||||||
|
db.save_messages_batch(msgs)
|
||||||
|
page = db.get_messages_paged("10", limit=3, offset=0)
|
||||||
|
assert len(page) == 3
|
||||||
|
page_after = db.get_messages_paged("10", limit=10, after_id="2")
|
||||||
|
assert all(m["id"] > 2 for m in page_after)
|
||||||
138
tests/test_migration.py
Normal file
138
tests/test_migration.py
Normal file
|
|
@ -0,0 +1,138 @@
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, AsyncMock, patch
|
||||||
|
from src.core.base import MigrationContext
|
||||||
|
from src.core.configuration import AppConfig
|
||||||
|
from src.core.backup_reader import ChannelType
|
||||||
|
from src.fluxer.migrate_message import migrate_messages as fluxer_migrate, _process_and_send_message as fluxer_send
|
||||||
|
from src.stoat.migrate_message import migrate_messages as stoat_migrate, _process_and_send_message as stoat_send
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# --- Platform Detection (Same as e2e_simulation) ---
|
||||||
|
def get_platforms():
|
||||||
|
"""Determine which platforms to test based on config, or use defaults."""
|
||||||
|
config_path = (Path(__file__).parent.parent / "ReaperFiles-AutoTest/reaper_config.yaml").resolve()
|
||||||
|
if not config_path.exists():
|
||||||
|
return ["fluxer", "stoat"]
|
||||||
|
with open(config_path, "r") as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
platforms = []
|
||||||
|
if data.get("fluxer_bot_token"): platforms.append("fluxer")
|
||||||
|
if data.get("stoat_bot_token"): platforms.append("stoat")
|
||||||
|
return platforms if platforms else ["fluxer", "stoat"]
|
||||||
|
|
||||||
|
# --- Unit Tests (Transformation Logic) ---
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_context(mock_discord_reader, mock_fluxer_writer, mock_stoat_writer):
|
||||||
|
context = MagicMock(spec=MigrationContext)
|
||||||
|
context.discord_reader = mock_discord_reader
|
||||||
|
context.fluxer_writer = mock_fluxer_writer
|
||||||
|
context.stoat_writer = mock_stoat_writer
|
||||||
|
context.state = MagicMock()
|
||||||
|
context.state.get_user_alias.return_value = "TestAlias"
|
||||||
|
context.state.emoji_map = {}
|
||||||
|
context.state.channel_map = {}
|
||||||
|
context.is_running = True
|
||||||
|
return context
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_message():
|
||||||
|
msg = MagicMock()
|
||||||
|
msg.id = 111
|
||||||
|
msg.author.id = 222
|
||||||
|
msg.author.display_name = "Author"
|
||||||
|
msg.content = "Test content"
|
||||||
|
msg.attachments = []
|
||||||
|
msg.embeds = []
|
||||||
|
msg.stickers = []
|
||||||
|
msg.created_at.timestamp.return_value = 1600000000.0
|
||||||
|
msg.flags.forwarded = False
|
||||||
|
msg.reference = None
|
||||||
|
return msg
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_migration_transform_fluxer(mock_context, mock_message):
|
||||||
|
stats = {"messages": 0, "attachments": 0}
|
||||||
|
result = await fluxer_send(context=mock_context, msg=mock_message, target_channel_id="c1", stats=stats)
|
||||||
|
assert result == "fluxer_msg_123"
|
||||||
|
assert stats["messages"] == 1
|
||||||
|
assert mock_context.fluxer_writer.send_message.called
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_migration_transform_stoat(mock_context, mock_message):
|
||||||
|
stats = {"messages": 0, "attachments": 0}
|
||||||
|
result = await stoat_send(context=mock_context, msg=mock_message, target_channel_id="c1", stats=stats)
|
||||||
|
assert result == "stoat_msg_123"
|
||||||
|
assert stats["messages"] == 1
|
||||||
|
assert mock_context.stoat_writer.send_message.called
|
||||||
|
|
||||||
|
# --- Integration Tests (Backup Reader) ---
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_backup_reader_interaction(backup_reader, reaper_config):
|
||||||
|
await backup_reader.start()
|
||||||
|
assert backup_reader.guild is not None
|
||||||
|
# Verify ID from config (or mock default)
|
||||||
|
assert str(backup_reader.guild.id) == reaper_config.get("discord_server_id")
|
||||||
|
|
||||||
|
channels = await backup_reader.fetch_channels()
|
||||||
|
assert len(channels) > 0
|
||||||
|
|
||||||
|
# --- E2E Simulation ---
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.parametrize("platform", get_platforms())
|
||||||
|
async def test_migration_e2e_loop(reaper_config, test_data_dir, tmp_path, platform, request):
|
||||||
|
config = AppConfig(
|
||||||
|
discord_bot_token=reaper_config["discord_bot_token"],
|
||||||
|
discord_server_id=reaper_config["discord_server_id"],
|
||||||
|
target_platform=platform,
|
||||||
|
fluxer_bot_token=reaper_config.get("fluxer_bot_token"),
|
||||||
|
fluxer_server_id=reaper_config.get("fluxer_server_id"),
|
||||||
|
stoat_bot_token=reaper_config.get("stoat_bot_token"),
|
||||||
|
stoat_server_id=reaper_config.get("stoat_server_id"),
|
||||||
|
anonymize_users=reaper_config["anonymize_users"]
|
||||||
|
)
|
||||||
|
|
||||||
|
if test_data_dir.exists():
|
||||||
|
base_dir = test_data_dir
|
||||||
|
msg = f"[DATA_SOURCE] E2E_SIMULATION: Using sample data from {base_dir.name}"
|
||||||
|
else:
|
||||||
|
base_dir = tmp_path
|
||||||
|
msg = f"[DATA_SOURCE] E2E_SIMULATION: Fallback to mock data in {base_dir}"
|
||||||
|
|
||||||
|
from tests.conftest import _log
|
||||||
|
print(msg)
|
||||||
|
_log(msg)
|
||||||
|
|
||||||
|
context = MigrationContext(config, source_mode="backup", base_dir=str(base_dir))
|
||||||
|
|
||||||
|
mock_writer = request.getfixturevalue(f"mock_{platform}_writer")
|
||||||
|
if platform == "fluxer":
|
||||||
|
context.fluxer_writer = mock_writer
|
||||||
|
migrate_func = fluxer_migrate
|
||||||
|
else:
|
||||||
|
context.stoat_writer = mock_writer
|
||||||
|
migrate_func = stoat_migrate
|
||||||
|
|
||||||
|
context.is_running = True
|
||||||
|
from src.core.database import MigrationDatabase
|
||||||
|
context.state.db = MigrationDatabase(tmp_path / f"e2e_{platform}.db", platform=platform)
|
||||||
|
|
||||||
|
await context.discord_reader.start()
|
||||||
|
channels = await context.discord_reader.fetch_channels()
|
||||||
|
text_channels = [c for c in channels if c.type == ChannelType.text]
|
||||||
|
|
||||||
|
if text_channels:
|
||||||
|
source_channel_id = text_channels[0].id
|
||||||
|
target_channel_id = "999" if platform == "fluxer" else "stoat123"
|
||||||
|
|
||||||
|
mock_writer.send_message.side_effect = lambda **kwargs: "ok"
|
||||||
|
|
||||||
|
# Test just the first available channel to keep it fast
|
||||||
|
stats = await migrate_func(context=context, source_channel_id=source_channel_id, target_channel_id=target_channel_id)
|
||||||
|
assert stats["messages"] >= 0
|
||||||
|
|
||||||
|
await context.discord_reader.close()
|
||||||
117
tests/test_ui.py
Normal file
117
tests/test_ui.py
Normal file
|
|
@ -0,0 +1,117 @@
|
||||||
|
import pytest
|
||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, AsyncMock, patch
|
||||||
|
from textual.app import App
|
||||||
|
from src.ui.main_app import ReaperApp, ConfigSelectionScreen, ConfigScreen
|
||||||
|
from src.ui.mode_screen import ModeScreen
|
||||||
|
from src.core.configuration import AppConfig
|
||||||
|
|
||||||
|
import os
|
||||||
|
from textual.widgets import ListItem, ListView, Input, Button
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_configs(tmp_path, log):
|
||||||
|
reaper_dir = tmp_path / "ReaperFiles-TestConfig"
|
||||||
|
reaper_dir.mkdir()
|
||||||
|
(reaper_dir / "reaper_config.yaml").write_text("discord_bot_token: 'fake'\ndiscord_server_id: '123'\ntool_mode: 'backup_only'")
|
||||||
|
|
||||||
|
old_cwd = os.getcwd()
|
||||||
|
os.chdir(tmp_path)
|
||||||
|
log(f"CWD changed to: {tmp_path}")
|
||||||
|
yield tmp_path
|
||||||
|
os.chdir(old_cwd)
|
||||||
|
|
||||||
|
async def wait_for_screen(app, screen_class, timeout=5.0):
|
||||||
|
import time
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < timeout:
|
||||||
|
if isinstance(app.screen, screen_class):
|
||||||
|
return True
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
return False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ui_minimal_launch(mock_configs, log):
|
||||||
|
"""Verify app launch and screen transition to ModeScreen."""
|
||||||
|
log("Running test_ui_minimal_launch")
|
||||||
|
try:
|
||||||
|
# Reverting to AsyncMock to avoid WorkerError.
|
||||||
|
# RuntimeWarnings are now handled globally in conftest.py.
|
||||||
|
with patch("src.ui.main_app.ConfigSelectionScreen.check_updates", AsyncMock()):
|
||||||
|
app = ReaperApp()
|
||||||
|
async with app.run_test() as pilot:
|
||||||
|
await wait_for_screen(app, ConfigSelectionScreen)
|
||||||
|
await pilot.click(ListItem)
|
||||||
|
await wait_for_screen(app, ModeScreen)
|
||||||
|
assert isinstance(app.screen, ModeScreen)
|
||||||
|
log("test_ui_minimal_launch PASSED")
|
||||||
|
except Exception as e:
|
||||||
|
log(f"test_ui_minimal_launch FAILED: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ui_config_wizard_save(mock_configs, log):
|
||||||
|
"""Verify configuration editing and saving."""
|
||||||
|
log("Running test_ui_config_wizard_save")
|
||||||
|
try:
|
||||||
|
with patch("src.ui.main_app.ConfigSelectionScreen.check_updates", AsyncMock()):
|
||||||
|
app = ReaperApp()
|
||||||
|
async with app.run_test() as pilot:
|
||||||
|
await wait_for_screen(app, ConfigSelectionScreen)
|
||||||
|
await pilot.click(ListItem)
|
||||||
|
await wait_for_screen(app, ModeScreen)
|
||||||
|
await pilot.click("#btn_config")
|
||||||
|
await wait_for_screen(app, ConfigScreen)
|
||||||
|
|
||||||
|
screen = app.screen
|
||||||
|
inp = screen.query_one("#inp_discord_token", Input)
|
||||||
|
inp.value = "new_fake_token"
|
||||||
|
|
||||||
|
with patch("src.ui.main_app.save_config") as mock_save:
|
||||||
|
await pilot.click("#btn_save")
|
||||||
|
await pilot.pause(0.2)
|
||||||
|
assert mock_save.called
|
||||||
|
log("test_ui_config_wizard_save PASSED")
|
||||||
|
except Exception as e:
|
||||||
|
log(f"test_ui_config_wizard_save FAILED: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ui_operation_trigger(mock_configs, log):
|
||||||
|
"""Verify that an operation can be triggered."""
|
||||||
|
log("Running test_ui_operation_trigger")
|
||||||
|
from src.ui.shuttle_ops import OperationPane
|
||||||
|
from src.ui.modals import ChannelPickerScreen, ProgressScreen
|
||||||
|
try:
|
||||||
|
with patch("src.ui.main_app.ConfigSelectionScreen.check_updates", AsyncMock()):
|
||||||
|
# run_validate is decorated with @work in shuttle_ops.py, so it MUST be a coroutine (AsyncMock)
|
||||||
|
with patch.object(OperationPane, "run_validate", AsyncMock()):
|
||||||
|
app = ReaperApp()
|
||||||
|
async with app.run_test() as pilot:
|
||||||
|
await wait_for_screen(app, ConfigSelectionScreen)
|
||||||
|
await pilot.click(ListItem)
|
||||||
|
await wait_for_screen(app, ModeScreen)
|
||||||
|
|
||||||
|
pane = app.screen.query_one(OperationPane)
|
||||||
|
pane.tokens_valid = True
|
||||||
|
pane.src_channels = [{"id": 1, "name": "t"}]
|
||||||
|
pane.src_cat_map = {None: "D"}
|
||||||
|
pane.tgt_channels = [{"id": 2, "name": "t"}]
|
||||||
|
pane.tgt_cat_map = {None: "D"}
|
||||||
|
pane.all_tgt_channels = pane.tgt_channels
|
||||||
|
|
||||||
|
btn = pane.query_one("#op_backup_msgs", Button)
|
||||||
|
btn.disabled = False
|
||||||
|
await pilot.pause(0.2)
|
||||||
|
btn.focus()
|
||||||
|
await pilot.press("enter")
|
||||||
|
|
||||||
|
await wait_for_screen(app, (ChannelPickerScreen, ProgressScreen))
|
||||||
|
assert isinstance(app.screen, (ChannelPickerScreen, ProgressScreen))
|
||||||
|
log("test_ui_operation_trigger PASSED")
|
||||||
|
except Exception as e:
|
||||||
|
log(f"test_ui_operation_trigger FAILED: {e}")
|
||||||
|
raise
|
||||||
60
tests/test_utils.py
Normal file
60
tests/test_utils.py
Normal file
|
|
@ -0,0 +1,60 @@
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
from src.core.utils import parse_snowflake, resolve_discord_links
|
||||||
|
|
||||||
|
def test_parse_snowflake_valid():
|
||||||
|
assert parse_snowflake("12345") == 12345
|
||||||
|
assert parse_snowflake(12345) == 12345
|
||||||
|
assert parse_snowflake(" 67890 ") == 67890
|
||||||
|
|
||||||
|
def test_parse_snowflake_invalid():
|
||||||
|
assert parse_snowflake(None) is None
|
||||||
|
assert parse_snowflake("") is None
|
||||||
|
assert parse_snowflake("none") is None
|
||||||
|
assert parse_snowflake("NULL") is None
|
||||||
|
assert parse_snowflake("not_a_number") is None
|
||||||
|
|
||||||
|
def test_resolve_discord_links_no_content():
|
||||||
|
assert resolve_discord_links("", None, "fluxer", "target_id") == ""
|
||||||
|
assert resolve_discord_links(None, None, "fluxer", "target_id") is None
|
||||||
|
|
||||||
|
def test_resolve_discord_links_no_mapping():
|
||||||
|
mock_state = MagicMock()
|
||||||
|
mock_state.get_target_channel_id.return_value = None
|
||||||
|
mock_state.get_target_category_id.return_value = None
|
||||||
|
mock_state.find_message_mapping.return_value = (None, None)
|
||||||
|
|
||||||
|
content = "Check this: https://discord.com/channels/1/2/3"
|
||||||
|
resolved = resolve_discord_links(content, mock_state, "fluxer", "target_server")
|
||||||
|
assert "[`discord-message`](<https://discord.com/channels/1/2/3>)" in resolved
|
||||||
|
|
||||||
|
def test_resolve_discord_links_channel_mapping():
|
||||||
|
mock_state = MagicMock()
|
||||||
|
mock_state.get_target_channel_id.return_value = "target_chan_456"
|
||||||
|
mock_state.find_message_mapping.return_value = (None, None)
|
||||||
|
|
||||||
|
content = "Go to https://discord.com/channels/123/456"
|
||||||
|
|
||||||
|
# Test Fluxer
|
||||||
|
resolved_fluxer = resolve_discord_links(content, mock_state, "fluxer", "target_server")
|
||||||
|
assert "https://fluxer.app/channels/target_server/target_chan_456" in resolved_fluxer
|
||||||
|
|
||||||
|
# Test Stoat
|
||||||
|
resolved_stoat = resolve_discord_links(content, mock_state, "stoat", "target_server")
|
||||||
|
assert "https://stoat.chat/server/target_server/channel/target_chan_456" in resolved_stoat
|
||||||
|
|
||||||
|
def test_resolve_discord_links_message_mapping():
|
||||||
|
mock_state = MagicMock()
|
||||||
|
mock_state.find_message_mapping.return_value = ("target_chan_456", "target_msg_789")
|
||||||
|
|
||||||
|
content = "Look at this: https://discord.com/channels/123/456/789"
|
||||||
|
|
||||||
|
# Test Fluxer
|
||||||
|
resolved_fluxer = resolve_discord_links(content, mock_state, "fluxer", "target_server")
|
||||||
|
assert "https://fluxer.app/channels/target_server/target_chan_456/target_msg_789" in resolved_fluxer
|
||||||
|
|
||||||
|
def test_resolve_discord_links_skips_wrapped():
|
||||||
|
mock_state = MagicMock()
|
||||||
|
content = "Already wrapped: [link](https://discord.com/channels/1/2/3)"
|
||||||
|
resolved = resolve_discord_links(content, mock_state, "fluxer", "target_server")
|
||||||
|
assert resolved == content
|
||||||
Loading…
Add table
Reference in a new issue