Implement auto updater

This commit is contained in:
rambros 2026-03-22 23:49:58 +05:30
parent 694ee1d67a
commit c22856a324
4 changed files with 387 additions and 2 deletions

View file

@ -88,12 +88,30 @@ def setup_ssl():
os.environ["REQUESTS_CA_BUNDLE"] = path os.environ["REQUESTS_CA_BUNDLE"] = path
break break
def cleanup_old_update():
"""Removes the .old executable left behind by a Windows update."""
import os
import sys
if sys.platform != "win32":
return
current_exe = sys.executable if getattr(sys, 'frozen', False) else sys.argv[0]
old_exe = current_exe + ".old"
if os.path.exists(old_exe):
try:
os.remove(old_exe)
except Exception:
pass
def main(): def main():
import os import os
# Ensure screenshots directory is configured (but not created yet) # Ensure screenshots directory is configured (but not created yet)
shot_path = os.path.abspath("screenshots") shot_path = os.path.abspath("screenshots")
os.environ["TEXTUAL_SCREENSHOT_LOCATION"] = shot_path os.environ["TEXTUAL_SCREENSHOT_LOCATION"] = shot_path
cleanup_old_update()
relaunch_in_terminal() relaunch_in_terminal()
setup_ssl() setup_ssl()
setup_logging() setup_logging()

214
src/core/updater.py Normal file
View file

@ -0,0 +1,214 @@
import logging
import asyncio
import aiohttp
import sys
import os
import stat
from typing import Dict, Any, Optional
from pathlib import Path
from src.core.utils import get_app_version
logger = logging.getLogger(__name__)
REPO_OWNER = "rambros3d"
REPO_NAME = "disco-reaper"
API_URL = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/releases"
def get_current_version() -> str:
"""Returns the current version string, e.g., '1.0.0'. Strips 'Reaper-' and 'v'."""
raw = get_app_version()
# E.g., raw is 'Reaper-v1.0.0' or 'Reaper-1.0.0' or 'Reaper-Unknown'
if raw.startswith("Reaper-"):
raw = raw[7:]
if raw.startswith("v"):
raw = raw[1:]
return raw
def parse_version(version_str: str) -> tuple:
"""Parses a version string into a tuple of ints for comparison. e.g., '1.0.0' -> (1,0,0)"""
try:
# Strip 'v' and any prerelease tags like '-beta' for simple int comparison
clean = version_str.lstrip('v').split('-')[0]
return tuple(map(int, clean.split('.')))
except Exception:
return (0, 0, 0)
async def check_for_updates() -> Optional[Dict[str, Any]]:
"""
Fetches the latest release from GitHub.
Returns a dict with update info if a newer version is available, else None.
Dict structure: {"version": "v1.2.0", "url": "...", "body": "...", "asset_url": "...", "asset_name": "..."}
"""
# Only offer updates if we are running from a frozen PyInstaller bundle
# We don't want to overwrite the dev environment python script
if not getattr(sys, 'frozen', False):
logger.info("Auto-updater: Running from source, updates disabled.")
return None
current_ver_str = get_current_version()
if "Unknown" in current_ver_str or "git" in current_ver_str:
logger.info(f"Auto-updater: Current version '{current_ver_str}' is unstable/dev. Checking disabled.")
return None
try:
async with aiohttp.ClientSession() as session:
async with session.get(API_URL, headers={"Accept": "application/vnd.github.v3+json"}) as resp:
if resp.status == 200:
releases = await resp.json()
if not isinstance(releases, list) or not releases:
return None
data = releases[0]
latest_tag = data.get("tag_name", "")
latest_ver = parse_version(latest_tag)
current_ver = parse_version(current_ver_str)
if latest_ver > current_ver:
logger.info(f"Auto-updater: New version found: {latest_tag} (Current: {current_ver_str})")
# Find the correct asset for this OS
expected_asset_name = ""
if sys.platform == "win32":
expected_asset_name = "disco-reaper-windows.zip"
elif sys.platform == "darwin":
expected_asset_name = "disco-reaper-macos.zip"
else:
expected_asset_name = "disco-reaper-linux.zip"
asset_url = None
for asset in data.get("assets", []):
if asset.get("name") == expected_asset_name:
asset_url = asset.get("browser_download_url")
break
if not asset_url:
logger.error(f"Auto-updater: Could not find asset '{expected_asset_name}' in release.")
return None
return {
"version": latest_tag,
"url": data.get("html_url"),
"body": data.get("body", "No release notes provided."),
"asset_url": asset_url,
"asset_name": expected_asset_name,
"prerelease": data.get("prerelease", False)
}
else:
logger.info(f"Auto-updater: Up to date (Current: {current_ver_str}, Latest: {latest_tag})")
else:
logger.warning(f"Auto-updater: Failed to fetch latest release. Status: {resp.status}")
except Exception as e:
logger.error(f"Auto-updater: Error checking for updates: {e}")
return None
async def download_and_extract_update(asset_url: str, progress_callback=None) -> Optional[Path]:
"""
Downloads the zip asset from GitHub and extracts the executable to a temporary path.
Returns the path to the downloaded executable.
"""
import tempfile
import zipfile
try:
temp_dir = Path(tempfile.mkdtemp(prefix="discoreaper_update_"))
zip_path = temp_dir / "update.zip"
async with aiohttp.ClientSession() as session:
async with session.get(asset_url) as resp:
if resp.status != 200:
logger.error(f"Auto-updater: Download failed with status {resp.status}")
return None
total_size = int(resp.headers.get('content-length', 0))
downloaded_size = 0
with open(zip_path, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192):
f.write(chunk)
downloaded_size += len(chunk)
if progress_callback and total_size > 0:
progress_callback(downloaded_size, total_size)
# Extract the zip
# The zip typically contains a 'REAPER' folder with 'DiscoReaper' or 'DiscoReaper.exe' inside
executable_name = "DiscoReaper.exe" if sys.platform == "win32" else "DiscoReaper"
found_exe_path = None
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# We don't want to blindly extract everything everywhere. Just find the executable.
for file_info in zip_ref.infolist():
if file_info.filename.endswith(executable_name):
# We found the executable. Extract it directly to the temp dir.
# We read and write it so we flatten any directory structure inside the zip
with zip_ref.open(file_info) as source, open(temp_dir / executable_name, "wb") as target:
target.write(source.read())
found_exe_path = temp_dir / executable_name
break
if not found_exe_path or not found_exe_path.exists():
logger.error("Auto-updater: Could not find the executable inside the downloaded zip.")
return None
return found_exe_path
except Exception as e:
logger.error(f"Auto-updater: Error during download/extract: {e}")
return None
def apply_update_and_restart(new_exe_path: Path):
"""
Replaces the current executable with the new one and restarts.
Handles Windows file-locking quirks and cross-device filesystem moves.
"""
import shutil
import subprocess
try:
current_exe = Path(sys.executable) if getattr(sys, 'frozen', False) else Path(sys.argv[0])
current_exe = current_exe.resolve()
logger.info(f"Applying update: replacing {current_exe} with {new_exe_path}")
# Make the new binary executable (mainly for Linux/macOS)
if sys.platform != "win32":
st = os.stat(new_exe_path)
os.chmod(new_exe_path, st.st_mode | stat.S_IEXEC)
if sys.platform == "win32":
# On Windows, we can't overwrite a running .exe, but we CAN rename it.
old_exe = current_exe.parent / (current_exe.name + ".old")
if old_exe.exists():
try:
old_exe.unlink()
except Exception:
pass
os.rename(current_exe, old_exe)
shutil.move(str(new_exe_path), str(current_exe))
logger.info("Update applied. Restarting process...")
subprocess.Popen([str(current_exe)] + sys.argv[1:])
sys.exit(0)
else:
# On Linux/macOS, replace it.
# We must unlink (delete) the existing file first to avoid "Text file busy"
# if we are crossing filesystems (shutil.move falls back to copy & overwrite).
if current_exe.exists():
try:
logger.info(f"Unlinking {current_exe} to avoid 'Text file busy'")
current_exe.unlink()
except Exception as e:
logger.warning(f"Could not unlink current exe: {e}. Attempting move anyway.")
shutil.move(str(new_exe_path), str(current_exe))
st = os.stat(current_exe)
os.chmod(current_exe, st.st_mode | stat.S_IEXEC)
logger.info("Update applied. Restarting via execv...")
# Restart
os.execv(str(current_exe), [str(current_exe)] + sys.argv[1:])
except Exception as e:
logger.error(f"Auto-updater: Failed to apply update: {e}")
raise

View file

@ -127,7 +127,9 @@ class ConfigSelectionScreen(Screen):
} }
#config_sel_actions { height: auto; margin-top: 0; } #config_sel_actions { height: auto; margin-top: 0; }
#config_sel_actions Button { width: 1fr; margin: 0 1; } #config_sel_actions Button { width: 1fr; margin: 0 1; }
#btn_about { margin-top: 2; border: none; } #bottom_actions_row { height: auto; align: center middle; margin-top: 2; }
#btn_update_app { display: none; margin-bottom: 1; width: 40; border: none; height: 1; }
#btn_about { border: none; width: 40; height: 1; }
""" """
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
@ -141,12 +143,15 @@ class ConfigSelectionScreen(Screen):
yield Button("New Config", id="btn_new_config", variant="success", tooltip="Create a new configuration folder") yield Button("New Config", id="btn_new_config", variant="success", tooltip="Create a new configuration folder")
yield Button("Exit", id="btn_exit", variant="error") yield Button("Exit", id="btn_exit", variant="error")
with Center(): with Center():
yield Button("Info", id="btn_about", tooltip="Show app information") with Vertical(id="bottom_actions_row"):
yield Button("Update Available", id="btn_update_app", variant="warning", tooltip="A new version is available!")
yield Button("Info", id="btn_about", tooltip="Show app information")
yield Footer() yield Footer()
yield Footnote() yield Footnote()
yield RamDisplay() yield RamDisplay()
def on_mount(self) -> None: def on_mount(self) -> None:
self.run_worker(self.check_updates())
configs = self.refresh_configs() configs = self.refresh_configs()
if not configs: if not configs:
def on_first_info_dismiss(res): def on_first_info_dismiss(res):
@ -154,6 +159,23 @@ class ConfigSelectionScreen(Screen):
self.action_new_config() self.action_new_config()
self.app.push_screen(FirstInfoModal(), on_first_info_dismiss) self.app.push_screen(FirstInfoModal(), on_first_info_dismiss)
async def check_updates(self):
from src.core.updater import check_for_updates
update_info = await check_for_updates()
if update_info:
self.update_info = update_info
try:
btn = self.query_one("#btn_update_app", Button)
btn.display = True
if update_info.get("prerelease", False):
btn.label = f"Beta Version Available ({update_info['version']})"
btn.variant = "warning"
else:
btn.label = f"Update Available ({update_info['version']})"
btn.variant = "success"
except Exception:
pass
def on_screen_resume(self) -> None: def on_screen_resume(self) -> None:
self.refresh_configs() self.refresh_configs()
@ -192,6 +214,20 @@ class ConfigSelectionScreen(Screen):
self.action_new_config() self.action_new_config()
elif event.button.id == "btn_about": elif event.button.id == "btn_about":
self.app.push_screen(FirstInfoModal()) self.app.push_screen(FirstInfoModal())
elif event.button.id == "btn_update_app":
if hasattr(self, 'update_info') and self.update_info:
from src.ui.modals import UpdateModalScreen, UpdateProgressScreen
def on_update_confirm(do_update):
if do_update:
self.app.push_screen(UpdateProgressScreen(asset_url=self.update_info['asset_url']))
self.app.push_screen(
UpdateModalScreen(
version=self.update_info['version'],
notes=self.update_info['body'],
prerelease=self.update_info.get('prerelease', False)
),
on_update_confirm
)
elif event.button.id == "btn_exit": elif event.button.id == "btn_exit":
self.app.exit() self.app.exit()

View file

@ -1115,3 +1115,120 @@ class ChannelIDInputModal(ModalScreen[dict | None]):
else: else:
preview.update(f"[bold red]No channel found with ID: {chan_id_str}[/bold red]\n[dim]Make sure the ID belongs to a channel in the target community.[/dim]") preview.update(f"[bold red]No channel found with ID: {chan_id_str}[/bold red]\n[dim]Make sure the ID belongs to a channel in the target community.[/dim]")
yield RamDisplay() yield RamDisplay()
# ---------------------------------------------------------------------------
# UpdateModalScreen & UpdateProgressScreen
# ---------------------------------------------------------------------------
class UpdateModalScreen(ModalScreen[bool]):
"""Modal to display release notes and confirm update."""
DEFAULT_CSS = """
UpdateModalScreen { align: center middle; }
#update_dialog {
width: 60%; height: auto; max-height: 60%;
border: solid $accent; background: $surface; padding: 1 2; layout: vertical;
}
#update_title { text-style: bold; color: $accent; margin-bottom: 1; content-align: center middle; width: 100%; }
#update_notes_scroll { height: 1fr; margin-bottom: 1; border: solid $primary; padding: 1; }
#update_buttons { height: auto; dock: bottom; }
#update_buttons Button { width: 1fr; margin: 0 1; }
"""
def __init__(self, version: str, notes: str, prerelease: bool = False):
super().__init__()
self.version_tag = version
self.notes = notes
self.prerelease = prerelease
def compose(self) -> ComposeResult:
from textual.widgets import Markdown
with Container(id="update_dialog"):
titlePrefix = "[orange]Beta Available[/orange]" if self.prerelease else "[green]Update Available[/green]"
yield Label(f"{titlePrefix}: {self.version_tag}", id="update_title")
with VerticalScroll(id="update_notes_scroll"):
yield Markdown(self.notes)
with Horizontal(id="update_buttons"):
yield Button("Auto Update", variant="success", id="btn_do_update", tooltip="Download and install the update. The app will restart.")
yield Button("Not Now", id="btn_cancel_update")
yield RamDisplay()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn_cancel_update":
self.dismiss(False)
elif event.button.id == "btn_do_update":
self.dismiss(True)
class UpdateProgressScreen(ModalScreen[None]):
"""Screen to show download progress of the update."""
DEFAULT_CSS = """
UpdateProgressScreen { align: center middle; }
#up_prog_dialog {
width: 60; height: auto;
border: solid green; background: $surface; padding: 1 2; layout: vertical;
}
#up_prog_title { text-style: bold; margin-bottom: 1; content-align: center middle; width: 100%; }
#up_prog_bar { margin-bottom: 1; }
#up_prog_status { content-align: center middle; width: 100%; margin-bottom: 1; }
#up_prog_btn { display: none; margin-top: 1; width: 100%; }
"""
def __init__(self, asset_url: str):
super().__init__()
self.asset_url = asset_url
def compose(self) -> ComposeResult:
with Container(id="up_prog_dialog"):
yield Label("Downloading Update...", id="up_prog_title")
yield ProgressBar(id="up_prog_bar", show_eta=False)
yield Label("Starting download...", id="up_prog_status")
yield Button("Close", id="up_prog_btn", variant="error")
yield RamDisplay()
async def on_mount(self) -> None:
self.run_worker(self.do_download())
async def do_download(self):
from src.core.updater import download_and_extract_update, apply_update_and_restart
import logging
logger = logging.getLogger(__name__)
try:
bar = self.query_one("#up_prog_bar", ProgressBar)
lbl = self.query_one("#up_prog_status", Label)
def prog_cb(downloaded, total):
try:
# We are in the main thread (async worker). Update UI directly.
bar.update(total=total, progress=downloaded)
mb_dl = downloaded / (1024*1024)
mb_tot = total / (1024*1024)
lbl.update(f"{mb_dl:.1f} MB / {mb_tot:.1f} MB")
except Exception as e:
logger.error(f"Error in download progress callback: {e}")
logger.info(f"Starting update download from: {self.asset_url}")
new_exe_path = await download_and_extract_update(self.asset_url, progress_callback=prog_cb)
if new_exe_path:
logger.info(f"Update downloaded to {new_exe_path}. Applying...")
lbl.update("[bold green]Download complete! Applying update...[/bold green]")
# We need to give the UI a moment before restarting
await asyncio.sleep(1)
apply_update_and_restart(new_exe_path)
else:
logger.error("Download failed - no path returned from updater.")
lbl.update("[bold red]Applying update failed! Check logs.[/bold red]")
self.query_one("#up_prog_title", Label).update("Update Failed")
self.query_one("#up_prog_btn", Button).display = True
except Exception as e:
logger.exception(f"Exception in do_download worker: {e}")
try:
self.query_one("#up_prog_status", Label).update(f"[bold red]Error:[/bold red] {e}")
self.query_one("#up_prog_btn", Button).display = True
except Exception:
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "up_prog_btn":
self.dismiss(None)