From c22856a324b39817e794b8f125d01c3895ad2fc2 Mon Sep 17 00:00:00 2001 From: rambros Date: Sun, 22 Mar 2026 23:49:58 +0530 Subject: [PATCH] Implement auto updater --- disco-reaper.py | 18 ++++ src/core/updater.py | 214 ++++++++++++++++++++++++++++++++++++++++++++ src/ui/main_app.py | 40 ++++++++- src/ui/modals.py | 117 ++++++++++++++++++++++++ 4 files changed, 387 insertions(+), 2 deletions(-) create mode 100644 src/core/updater.py diff --git a/disco-reaper.py b/disco-reaper.py index f773d6c..6b8b233 100644 --- a/disco-reaper.py +++ b/disco-reaper.py @@ -88,12 +88,30 @@ def setup_ssl(): os.environ["REQUESTS_CA_BUNDLE"] = path break +def cleanup_old_update(): + """Removes the .old executable left behind by a Windows update.""" + import os + import sys + + if sys.platform != "win32": + return + + current_exe = sys.executable if getattr(sys, 'frozen', False) else sys.argv[0] + old_exe = current_exe + ".old" + + if os.path.exists(old_exe): + try: + os.remove(old_exe) + except Exception: + pass + def main(): import os # Ensure screenshots directory is configured (but not created yet) shot_path = os.path.abspath("screenshots") os.environ["TEXTUAL_SCREENSHOT_LOCATION"] = shot_path + cleanup_old_update() relaunch_in_terminal() setup_ssl() setup_logging() diff --git a/src/core/updater.py b/src/core/updater.py new file mode 100644 index 0000000..671c43f --- /dev/null +++ b/src/core/updater.py @@ -0,0 +1,214 @@ +import logging +import asyncio +import aiohttp +import sys +import os +import stat +from typing import Dict, Any, Optional +from pathlib import Path +from src.core.utils import get_app_version + +logger = logging.getLogger(__name__) + +REPO_OWNER = "rambros3d" +REPO_NAME = "disco-reaper" +API_URL = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/releases" + +def get_current_version() -> str: + """Returns the current version string, e.g., '1.0.0'. Strips 'Reaper-' and 'v'.""" + raw = get_app_version() + # E.g., raw is 'Reaper-v1.0.0' or 'Reaper-1.0.0' or 'Reaper-Unknown' + if raw.startswith("Reaper-"): + raw = raw[7:] + if raw.startswith("v"): + raw = raw[1:] + return raw + +def parse_version(version_str: str) -> tuple: + """Parses a version string into a tuple of ints for comparison. e.g., '1.0.0' -> (1,0,0)""" + try: + # Strip 'v' and any prerelease tags like '-beta' for simple int comparison + clean = version_str.lstrip('v').split('-')[0] + return tuple(map(int, clean.split('.'))) + except Exception: + return (0, 0, 0) + +async def check_for_updates() -> Optional[Dict[str, Any]]: + """ + Fetches the latest release from GitHub. + Returns a dict with update info if a newer version is available, else None. + Dict structure: {"version": "v1.2.0", "url": "...", "body": "...", "asset_url": "...", "asset_name": "..."} + """ + # Only offer updates if we are running from a frozen PyInstaller bundle + # We don't want to overwrite the dev environment python script + if not getattr(sys, 'frozen', False): + logger.info("Auto-updater: Running from source, updates disabled.") + return None + + current_ver_str = get_current_version() + if "Unknown" in current_ver_str or "git" in current_ver_str: + logger.info(f"Auto-updater: Current version '{current_ver_str}' is unstable/dev. Checking disabled.") + return None + + try: + async with aiohttp.ClientSession() as session: + async with session.get(API_URL, headers={"Accept": "application/vnd.github.v3+json"}) as resp: + if resp.status == 200: + releases = await resp.json() + if not isinstance(releases, list) or not releases: + return None + data = releases[0] + latest_tag = data.get("tag_name", "") + + latest_ver = parse_version(latest_tag) + current_ver = parse_version(current_ver_str) + + if latest_ver > current_ver: + logger.info(f"Auto-updater: New version found: {latest_tag} (Current: {current_ver_str})") + + # Find the correct asset for this OS + expected_asset_name = "" + if sys.platform == "win32": + expected_asset_name = "disco-reaper-windows.zip" + elif sys.platform == "darwin": + expected_asset_name = "disco-reaper-macos.zip" + else: + expected_asset_name = "disco-reaper-linux.zip" + + asset_url = None + for asset in data.get("assets", []): + if asset.get("name") == expected_asset_name: + asset_url = asset.get("browser_download_url") + break + + if not asset_url: + logger.error(f"Auto-updater: Could not find asset '{expected_asset_name}' in release.") + return None + + return { + "version": latest_tag, + "url": data.get("html_url"), + "body": data.get("body", "No release notes provided."), + "asset_url": asset_url, + "asset_name": expected_asset_name, + "prerelease": data.get("prerelease", False) + } + else: + logger.info(f"Auto-updater: Up to date (Current: {current_ver_str}, Latest: {latest_tag})") + else: + logger.warning(f"Auto-updater: Failed to fetch latest release. Status: {resp.status}") + except Exception as e: + logger.error(f"Auto-updater: Error checking for updates: {e}") + + return None + +async def download_and_extract_update(asset_url: str, progress_callback=None) -> Optional[Path]: + """ + Downloads the zip asset from GitHub and extracts the executable to a temporary path. + Returns the path to the downloaded executable. + """ + import tempfile + import zipfile + + try: + temp_dir = Path(tempfile.mkdtemp(prefix="discoreaper_update_")) + zip_path = temp_dir / "update.zip" + + async with aiohttp.ClientSession() as session: + async with session.get(asset_url) as resp: + if resp.status != 200: + logger.error(f"Auto-updater: Download failed with status {resp.status}") + return None + + total_size = int(resp.headers.get('content-length', 0)) + downloaded_size = 0 + + with open(zip_path, 'wb') as f: + async for chunk in resp.content.iter_chunked(8192): + f.write(chunk) + downloaded_size += len(chunk) + if progress_callback and total_size > 0: + progress_callback(downloaded_size, total_size) + + # Extract the zip + # The zip typically contains a 'REAPER' folder with 'DiscoReaper' or 'DiscoReaper.exe' inside + executable_name = "DiscoReaper.exe" if sys.platform == "win32" else "DiscoReaper" + found_exe_path = None + + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + # We don't want to blindly extract everything everywhere. Just find the executable. + for file_info in zip_ref.infolist(): + if file_info.filename.endswith(executable_name): + # We found the executable. Extract it directly to the temp dir. + # We read and write it so we flatten any directory structure inside the zip + with zip_ref.open(file_info) as source, open(temp_dir / executable_name, "wb") as target: + target.write(source.read()) + found_exe_path = temp_dir / executable_name + break + + if not found_exe_path or not found_exe_path.exists(): + logger.error("Auto-updater: Could not find the executable inside the downloaded zip.") + return None + + return found_exe_path + + except Exception as e: + logger.error(f"Auto-updater: Error during download/extract: {e}") + return None + +def apply_update_and_restart(new_exe_path: Path): + """ + Replaces the current executable with the new one and restarts. + Handles Windows file-locking quirks and cross-device filesystem moves. + """ + import shutil + import subprocess + + try: + current_exe = Path(sys.executable) if getattr(sys, 'frozen', False) else Path(sys.argv[0]) + current_exe = current_exe.resolve() + + logger.info(f"Applying update: replacing {current_exe} with {new_exe_path}") + + # Make the new binary executable (mainly for Linux/macOS) + if sys.platform != "win32": + st = os.stat(new_exe_path) + os.chmod(new_exe_path, st.st_mode | stat.S_IEXEC) + + if sys.platform == "win32": + # On Windows, we can't overwrite a running .exe, but we CAN rename it. + old_exe = current_exe.parent / (current_exe.name + ".old") + if old_exe.exists(): + try: + old_exe.unlink() + except Exception: + pass + os.rename(current_exe, old_exe) + shutil.move(str(new_exe_path), str(current_exe)) + + logger.info("Update applied. Restarting process...") + subprocess.Popen([str(current_exe)] + sys.argv[1:]) + sys.exit(0) + + else: + # On Linux/macOS, replace it. + # We must unlink (delete) the existing file first to avoid "Text file busy" + # if we are crossing filesystems (shutil.move falls back to copy & overwrite). + if current_exe.exists(): + try: + logger.info(f"Unlinking {current_exe} to avoid 'Text file busy'") + current_exe.unlink() + except Exception as e: + logger.warning(f"Could not unlink current exe: {e}. Attempting move anyway.") + + shutil.move(str(new_exe_path), str(current_exe)) + st = os.stat(current_exe) + os.chmod(current_exe, st.st_mode | stat.S_IEXEC) + + logger.info("Update applied. Restarting via execv...") + # Restart + os.execv(str(current_exe), [str(current_exe)] + sys.argv[1:]) + + except Exception as e: + logger.error(f"Auto-updater: Failed to apply update: {e}") + raise diff --git a/src/ui/main_app.py b/src/ui/main_app.py index 3708d8c..8e05059 100644 --- a/src/ui/main_app.py +++ b/src/ui/main_app.py @@ -127,7 +127,9 @@ class ConfigSelectionScreen(Screen): } #config_sel_actions { height: auto; margin-top: 0; } #config_sel_actions Button { width: 1fr; margin: 0 1; } - #btn_about { margin-top: 2; border: none; } + #bottom_actions_row { height: auto; align: center middle; margin-top: 2; } + #btn_update_app { display: none; margin-bottom: 1; width: 40; border: none; height: 1; } + #btn_about { border: none; width: 40; height: 1; } """ def compose(self) -> ComposeResult: @@ -141,12 +143,15 @@ class ConfigSelectionScreen(Screen): yield Button("New Config", id="btn_new_config", variant="success", tooltip="Create a new configuration folder") yield Button("Exit", id="btn_exit", variant="error") with Center(): - yield Button("Info", id="btn_about", tooltip="Show app information") + with Vertical(id="bottom_actions_row"): + yield Button("Update Available", id="btn_update_app", variant="warning", tooltip="A new version is available!") + yield Button("Info", id="btn_about", tooltip="Show app information") yield Footer() yield Footnote() yield RamDisplay() def on_mount(self) -> None: + self.run_worker(self.check_updates()) configs = self.refresh_configs() if not configs: def on_first_info_dismiss(res): @@ -154,6 +159,23 @@ class ConfigSelectionScreen(Screen): self.action_new_config() self.app.push_screen(FirstInfoModal(), on_first_info_dismiss) + async def check_updates(self): + from src.core.updater import check_for_updates + update_info = await check_for_updates() + if update_info: + self.update_info = update_info + try: + btn = self.query_one("#btn_update_app", Button) + btn.display = True + if update_info.get("prerelease", False): + btn.label = f"Beta Version Available ({update_info['version']})" + btn.variant = "warning" + else: + btn.label = f"Update Available ({update_info['version']})" + btn.variant = "success" + except Exception: + pass + def on_screen_resume(self) -> None: self.refresh_configs() @@ -192,6 +214,20 @@ class ConfigSelectionScreen(Screen): self.action_new_config() elif event.button.id == "btn_about": self.app.push_screen(FirstInfoModal()) + elif event.button.id == "btn_update_app": + if hasattr(self, 'update_info') and self.update_info: + from src.ui.modals import UpdateModalScreen, UpdateProgressScreen + def on_update_confirm(do_update): + if do_update: + self.app.push_screen(UpdateProgressScreen(asset_url=self.update_info['asset_url'])) + self.app.push_screen( + UpdateModalScreen( + version=self.update_info['version'], + notes=self.update_info['body'], + prerelease=self.update_info.get('prerelease', False) + ), + on_update_confirm + ) elif event.button.id == "btn_exit": self.app.exit() diff --git a/src/ui/modals.py b/src/ui/modals.py index 7317069..b1d118d 100644 --- a/src/ui/modals.py +++ b/src/ui/modals.py @@ -1115,3 +1115,120 @@ class ChannelIDInputModal(ModalScreen[dict | None]): else: preview.update(f"[bold red]No channel found with ID: {chan_id_str}[/bold red]\n[dim]Make sure the ID belongs to a channel in the target community.[/dim]") yield RamDisplay() + +# --------------------------------------------------------------------------- +# UpdateModalScreen & UpdateProgressScreen +# --------------------------------------------------------------------------- + +class UpdateModalScreen(ModalScreen[bool]): + """Modal to display release notes and confirm update.""" + DEFAULT_CSS = """ + UpdateModalScreen { align: center middle; } + #update_dialog { + width: 60%; height: auto; max-height: 60%; + border: solid $accent; background: $surface; padding: 1 2; layout: vertical; + } + #update_title { text-style: bold; color: $accent; margin-bottom: 1; content-align: center middle; width: 100%; } + #update_notes_scroll { height: 1fr; margin-bottom: 1; border: solid $primary; padding: 1; } + #update_buttons { height: auto; dock: bottom; } + #update_buttons Button { width: 1fr; margin: 0 1; } + """ + + def __init__(self, version: str, notes: str, prerelease: bool = False): + super().__init__() + self.version_tag = version + self.notes = notes + self.prerelease = prerelease + + def compose(self) -> ComposeResult: + from textual.widgets import Markdown + with Container(id="update_dialog"): + titlePrefix = "[orange]Beta Available[/orange]" if self.prerelease else "[green]Update Available[/green]" + yield Label(f"{titlePrefix}: {self.version_tag}", id="update_title") + with VerticalScroll(id="update_notes_scroll"): + yield Markdown(self.notes) + with Horizontal(id="update_buttons"): + yield Button("Auto Update", variant="success", id="btn_do_update", tooltip="Download and install the update. The app will restart.") + yield Button("Not Now", id="btn_cancel_update") + yield RamDisplay() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "btn_cancel_update": + self.dismiss(False) + elif event.button.id == "btn_do_update": + self.dismiss(True) + +class UpdateProgressScreen(ModalScreen[None]): + """Screen to show download progress of the update.""" + DEFAULT_CSS = """ + UpdateProgressScreen { align: center middle; } + #up_prog_dialog { + width: 60; height: auto; + border: solid green; background: $surface; padding: 1 2; layout: vertical; + } + #up_prog_title { text-style: bold; margin-bottom: 1; content-align: center middle; width: 100%; } + #up_prog_bar { margin-bottom: 1; } + #up_prog_status { content-align: center middle; width: 100%; margin-bottom: 1; } + #up_prog_btn { display: none; margin-top: 1; width: 100%; } + """ + + def __init__(self, asset_url: str): + super().__init__() + self.asset_url = asset_url + + def compose(self) -> ComposeResult: + with Container(id="up_prog_dialog"): + yield Label("Downloading Update...", id="up_prog_title") + yield ProgressBar(id="up_prog_bar", show_eta=False) + yield Label("Starting download...", id="up_prog_status") + yield Button("Close", id="up_prog_btn", variant="error") + yield RamDisplay() + + async def on_mount(self) -> None: + self.run_worker(self.do_download()) + + async def do_download(self): + from src.core.updater import download_and_extract_update, apply_update_and_restart + import logging + logger = logging.getLogger(__name__) + + try: + bar = self.query_one("#up_prog_bar", ProgressBar) + lbl = self.query_one("#up_prog_status", Label) + + def prog_cb(downloaded, total): + try: + # We are in the main thread (async worker). Update UI directly. + bar.update(total=total, progress=downloaded) + + mb_dl = downloaded / (1024*1024) + mb_tot = total / (1024*1024) + lbl.update(f"{mb_dl:.1f} MB / {mb_tot:.1f} MB") + except Exception as e: + logger.error(f"Error in download progress callback: {e}") + + logger.info(f"Starting update download from: {self.asset_url}") + new_exe_path = await download_and_extract_update(self.asset_url, progress_callback=prog_cb) + + if new_exe_path: + logger.info(f"Update downloaded to {new_exe_path}. Applying...") + lbl.update("[bold green]Download complete! Applying update...[/bold green]") + # We need to give the UI a moment before restarting + await asyncio.sleep(1) + apply_update_and_restart(new_exe_path) + else: + logger.error("Download failed - no path returned from updater.") + lbl.update("[bold red]Applying update failed! Check logs.[/bold red]") + self.query_one("#up_prog_title", Label).update("Update Failed") + self.query_one("#up_prog_btn", Button).display = True + except Exception as e: + logger.exception(f"Exception in do_download worker: {e}") + try: + self.query_one("#up_prog_status", Label).update(f"[bold red]Error:[/bold red] {e}") + self.query_one("#up_prog_btn", Button).display = True + except Exception: + pass + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "up_prog_btn": + self.dismiss(None)