fix: backup stats viewer for sqlite

This commit is contained in:
rambros 2026-03-13 16:52:52 +05:30
parent f402a36477
commit 2581714fe3
4 changed files with 142 additions and 85 deletions

View file

@ -3,10 +3,22 @@ import logging
import json import json
import threading import threading
from pathlib import Path from pathlib import Path
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional, Union
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def parse_snowflake(value: Any) -> Optional[int]:
"""Safely parses a Discord ID (Snowflake) from any input, handling 'None' strings."""
if value is None:
return None
s = str(value).strip()
if not s or s.lower() == "none" or s == "NULL":
return None
try:
return int(s)
except ValueError:
return None
class BackupDatabase: class BackupDatabase:
"""Manages the SQLite database for local Discord backups.""" """Manages the SQLite database for local Discord backups."""
@ -524,9 +536,8 @@ class BackupDatabase:
stats = {} stats = {}
for r in msg_rows: for r in msg_rows:
cid_raw = r["channel_id"] cid = parse_snowflake(r["channel_id"])
if cid_raw is None or cid_raw == "None": continue if cid is None: continue
cid = int(cid_raw)
stats[cid] = { stats[cid] = {
"message_count": r["msg_count"], "message_count": r["msg_count"],
"thread_count": 0, "thread_count": 0,
@ -535,17 +546,15 @@ class BackupDatabase:
} }
for r in thread_rows: for r in thread_rows:
cid_raw = r["parent_id"] cid = parse_snowflake(r["parent_id"])
if cid_raw is None or cid_raw == "None": continue if cid is None: continue
cid = int(cid_raw)
if cid not in stats: if cid not in stats:
stats[cid] = {"message_count": 0, "thread_count": 0, "attachment_count": 0, "total_size": 0} stats[cid] = {"message_count": 0, "thread_count": 0, "attachment_count": 0, "total_size": 0}
stats[cid]["thread_count"] = r["thread_count"] stats[cid]["thread_count"] = r["thread_count"]
for r in att_rows: for r in att_rows:
cid_raw = r["channel_id"] cid = parse_snowflake(r["channel_id"])
if cid_raw is None or cid_raw == "None": continue if cid is None: continue
cid = int(cid_raw)
if cid not in stats: if cid not in stats:
stats[cid] = {"message_count": 0, "thread_count": 0, "attachment_count": 0, "total_size": 0} stats[cid] = {"message_count": 0, "thread_count": 0, "attachment_count": 0, "total_size": 0}
stats[cid]["attachment_count"] = r["att_count"] stats[cid]["attachment_count"] = r["att_count"]

View file

@ -12,7 +12,7 @@ from datetime import datetime, timezone
from enum import IntEnum from enum import IntEnum
from pathlib import Path from pathlib import Path
from typing import AsyncGenerator, Dict, Any, List, Optional from typing import AsyncGenerator, Dict, Any, List, Optional
from src.core.backup_database import BackupDatabase from src.core.backup_database import BackupDatabase, parse_snowflake
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -252,11 +252,11 @@ class BackupRole:
self.id = 0 self.id = 0
self.name = "Unknown" self.name = "Unknown"
return return
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data["name"] self.name = data["name"]
self.color = BackupColor(int(data.get("color", 0))) self.color = BackupColor(parse_snowflake(data.get("color", 0)) or 0)
self.position = data.get("position", 0) self.position = data.get("position", 0)
self.permissions = BackupPermissions(int(data.get("permissions", 0))) self.permissions = BackupPermissions(parse_snowflake(data.get("permissions", 0)) or 0)
self.hoist = bool(data.get("hoist", False)) self.hoist = bool(data.get("hoist", False))
self.managed = False self.managed = False
self.mentionable = bool(data.get("mentionable", True)) self.mentionable = bool(data.get("mentionable", True))
@ -285,9 +285,9 @@ def _parse_overwrites(raw_list: list | Any) -> dict:
if not isinstance(entry, dict): if not isinstance(entry, dict):
continue continue
try: try:
target = BackupOverwriteTarget(int(entry["id"])) target = BackupOverwriteTarget(parse_snowflake(entry["id"]))
ow = BackupPermissionOverwrite(allow=int(entry.get("allow", 0)), ow = BackupPermissionOverwrite(allow=parse_snowflake(entry.get("allow", 0)) or 0,
deny=int(entry.get("deny", 0))) deny=parse_snowflake(entry.get("deny", 0)) or 0)
result[target] = ow result[target] = ow
except (KeyError, ValueError, TypeError): except (KeyError, ValueError, TypeError):
continue continue
@ -301,7 +301,7 @@ class BackupCategory:
def __init__(self, data: dict): def __init__(self, data: dict):
try: try:
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
except (ValueError, TypeError): except (ValueError, TypeError):
self.id = 0 # 'uncategorized' sentinel self.id = 0 # 'uncategorized' sentinel
self.name = data["name"] self.name = data["name"]
@ -330,17 +330,17 @@ class BackupChannel:
"thread": ChannelType.public_thread, "thread": ChannelType.public_thread,
} }
def __init__(self, data: dict, category_id: int | None = None, guild: "BackupGuild|None" = None): def __init__(self, data: dict, category_id: int | None = None, guild: "BackupGuild|None" = None):
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data["name"] self.name = data["name"]
try: try:
self.type = ChannelType(int(data.get("type", 0))) self.type = ChannelType(parse_snowflake(data.get("type", 0)) or 0)
except ValueError: except ValueError:
self.type = ChannelType.text self.type = ChannelType.text
self.position = data.get("position", 0) self.position = data.get("position", 0)
self.topic = data.get("topic") self.topic = data.get("topic")
self.nsfw = bool(data.get("nsfw", False)) self.nsfw = bool(data.get("nsfw", False))
cid = data.get("category_id") cid = data.get("category_id")
self.category_id = int(cid) if cid and cid != "None" else category_id self.category_id = parse_snowflake(cid) if cid else category_id
self.parent_id = self.category_id self.parent_id = self.category_id
self.guild = guild self.guild = guild
@ -389,7 +389,7 @@ class BackupMember:
self.id = 0 self.id = 0
self.name = "Unknown" self.name = "Unknown"
return return
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data.get("username", "Unknown") self.name = data.get("username", "Unknown")
self.display_name = data.get("display_name") or self.name self.display_name = data.get("display_name") or self.name
self.global_name = self.display_name self.global_name = self.display_name
@ -447,7 +447,7 @@ class BackupAttachment:
self.id = 0 self.id = 0
self.filename = "unknown" self.filename = "unknown"
return return
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.filename = data.get("filename", "unknown") self.filename = data.get("filename", "unknown")
self.size = data.get("size", 0) self.size = data.get("size", 0)
self.url = data.get("url", "") self.url = data.get("url", "")
@ -487,7 +487,7 @@ class BackupEmoji:
self.id = 0 self.id = 0
self.name = "Unknown" self.name = "Unknown"
return return
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data["name"] self.name = data["name"]
self.animated = data.get("mime_type") == "image/gif" self.animated = data.get("mime_type") == "image/gif"
filename = data.get("filename", "") filename = data.get("filename", "")
@ -514,7 +514,7 @@ class BackupSticker:
self.id = 0 self.id = 0
self.name = "Sticker" self.name = "Sticker"
return return
self.id = int(data.get("id") or data.get("sticker_id", 0)) self.id = parse_snowflake(data.get("id") or data.get("sticker_id", 0)) or 0
self.name = data.get("name", "Sticker") self.name = data.get("name", "Sticker")
# Determine format # Determine format
@ -580,7 +580,7 @@ class BackupReaction:
if ":" in emoji_raw and not emoji_raw.startswith("<"): if ":" in emoji_raw and not emoji_raw.startswith("<"):
parts = emoji_raw.split(":", 1) parts = emoji_raw.split(":", 1)
try: try:
self.emoji = BackupPartialEmoji(name=parts[0], id=int(parts[1])) self.emoji = BackupPartialEmoji(name=parts[0], id=parse_snowflake(parts[1]))
except (ValueError, IndexError): except (ValueError, IndexError):
self.emoji = BackupPartialEmoji(name=emoji_raw) self.emoji = BackupPartialEmoji(name=emoji_raw)
else: else:
@ -594,12 +594,12 @@ class BackupTag:
"""Minimal stand-in for discord.ForumTag.""" """Minimal stand-in for discord.ForumTag."""
__slots__ = ("id", "name", "moderated", "emoji") __slots__ = ("id", "name", "moderated", "emoji")
def __init__(self, data: dict): def __init__(self, data: dict):
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data["name"] self.name = data["name"]
self.moderated = bool(data.get("moderated", False)) self.moderated = bool(data.get("moderated", False))
emoji_id = data.get("emoji_id") emoji_id = data.get("emoji_id")
emoji_name = data.get("emoji_name") emoji_name = data.get("emoji_name")
self.emoji = BackupPartialEmoji(name=emoji_name, id=int(emoji_id) if emoji_id else None) if emoji_name else None self.emoji = BackupPartialEmoji(name=emoji_name, id=parse_snowflake(emoji_id)) if emoji_name else None
def __repr__(self) -> str: def __repr__(self) -> str:
return f"BackupTag(id={self.id}, name='{self.name}')" return f"BackupTag(id={self.id}, name='{self.name}')"
@ -611,8 +611,8 @@ class BackupMessageReference:
__slots__ = ("message_id", "channel_id") __slots__ = ("message_id", "channel_id")
def __init__(self, data: dict): def __init__(self, data: dict):
self.message_id = int(data["messageId"]) self.message_id = parse_snowflake(data["messageId"])
self.channel_id = int(data["channelId"]) self.channel_id = parse_snowflake(data["channelId"])
class BackupThread: class BackupThread:
@ -626,10 +626,13 @@ class BackupThread:
self.id = 0 self.id = 0
self.name = "" self.name = ""
return return
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data.get("name", "") self.name = data.get("name", "")
try: try:
self.type = ChannelType(int(data.get("type", 11))) if data.get("type") is not None:
self.type = ChannelType(parse_snowflake(data.get("type", 11)) or 11)
else:
self.type = ChannelType.public_thread
except ValueError: except ValueError:
self.type = ChannelType.public_thread self.type = ChannelType.public_thread
self.message_count = data.get("message_count", 0) self.message_count = data.get("message_count", 0)
@ -637,7 +640,7 @@ class BackupThread:
self.auto_archive_duration = data.get("auto_archive_duration", 1440) self.auto_archive_duration = data.get("auto_archive_duration", 1440)
self.locked = data.get("locked", False) self.locked = data.get("locked", False)
pid = data.get("parent_id") pid = data.get("parent_id")
self.parent_id = int(pid) if pid and pid != "None" else parent_id self.parent_id = parse_snowflake(pid) if pid else parent_id
# Parse applied tags (JSON IDs) # Parse applied tags (JSON IDs)
self.applied_tags = [] self.applied_tags = []
@ -645,7 +648,7 @@ class BackupThread:
if raw_tags: if raw_tags:
try: try:
tag_ids = json.loads(raw_tags) if isinstance(raw_tags, str) else raw_tags tag_ids = json.loads(raw_tags) if isinstance(raw_tags, str) else raw_tags
self.applied_tags = [int(tid) for tid in tag_ids] self.applied_tags = [parse_snowflake(tid) for tid in tag_ids if parse_snowflake(tid)]
except Exception: pass except Exception: pass
@ -671,9 +674,12 @@ class BackupMessage:
channel: Optional[Any] = None, channel: Optional[Any] = None,
backup_root: Path | None = None, backup_root: Path | None = None,
media_pool: dict | None = None): media_pool: dict | None = None):
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
try: try:
self.type = MessageType(int(data.get("type", 0))) if data.get("type") is not None:
self.type = MessageType(parse_snowflake(data.get("type", 0)) or 0)
else:
self.type = MessageType.default
except ValueError: except ValueError:
self.type = MessageType.default self.type = MessageType.default
self.pinned = bool(data.get("is_pinned", False)) self.pinned = bool(data.get("is_pinned", False))
@ -682,9 +688,9 @@ class BackupMessage:
self.guild = guild self.guild = guild
self.channel = channel self.channel = channel
cid = data.get("channel_id") cid = data.get("channel_id")
self.channel_id = int(cid) if cid and cid != "None" else (channel.id if channel else None) self.channel_id = parse_snowflake(cid) if cid else (channel.id if channel else None)
# Mentions # Mentions (simplified)
self.mentions = [] self.mentions = []
self.role_mentions = [] self.role_mentions = []
self.channel_mentions = [] self.channel_mentions = []
@ -710,7 +716,7 @@ class BackupMessage:
else: else:
self.created_at = datetime.now(timezone.utc) self.created_at = datetime.now(timezone.utc)
# Attachments (parsed from DB or passed in) # Attachments
self.attachments = [] self.attachments = []
raw_atts = data.get("attachments", []) raw_atts = data.get("attachments", [])
if isinstance(raw_atts, str): if isinstance(raw_atts, str):
@ -725,6 +731,13 @@ class BackupMessage:
# Embeds # Embeds
self.embeds = [] self.embeds = []
raw_embeds = data.get("embeds", [])
if isinstance(raw_embeds, str):
try:
raw_embeds = json.loads(raw_embeds)
except Exception:
raw_embeds = []
for e in raw_embeds: for e in raw_embeds:
if isinstance(e, dict): if isinstance(e, dict):
self.embeds.append(BackupEmbed(e)) self.embeds.append(BackupEmbed(e))
@ -732,10 +745,38 @@ class BackupMessage:
# Stickers # Stickers
self.stickers = [] self.stickers = []
raw_stickers = data.get("stickers", []) raw_stickers = data.get("stickers", [])
if isinstance(raw_stickers, str):
try:
raw_stickers = json.loads(raw_stickers)
except Exception:
raw_stickers = []
for s in raw_stickers: for s in raw_stickers:
if isinstance(s, dict): if isinstance(s, dict):
self.stickers.append(BackupSticker(s, backup_root=backup_root, media_pool=media_pool)) self.stickers.append(BackupSticker(s, backup_root=backup_root, media_pool=media_pool))
# Reactions
self.reactions = []
raw_reactions = data.get("reactions", [])
if isinstance(raw_reactions, str):
try:
self.reactions = json.loads(raw_reactions)
except Exception:
self.reactions = []
elif isinstance(raw_reactions, list):
self.reactions = raw_reactions
# Reference (replies/forwards)
self.reference = None
if data.get("message_reference"):
self.reference = type("Ref", (), {"message_id": parse_snowflake(data["message_reference"]), "channel_id": self.channel_id})()
self.thread = None
self.flags = type("Flags", (), {"value": 0})()
def __repr__(self) -> str:
return f"BackupMessage(id={self.id}, author={self.author})"
class BackupEmbed: class BackupEmbed:
"""Minimal stand-in for discord.Embed.""" """Minimal stand-in for discord.Embed."""
__slots__ = ("title", "description", "url", "color", "timestamp", __slots__ = ("title", "description", "url", "color", "timestamp",
@ -748,8 +789,8 @@ class BackupEmbed:
self.color = data.get("color") self.color = data.get("color")
self.timestamp = data.get("timestamp") self.timestamp = data.get("timestamp")
self.thumbnail = type("Thumbnail", (), {"url": data["thumbnail"]["url"]})() if data.get("thumbnail") else None self.thumbnail = type("Thumbnail", (), {"url": data["thumbnail"]["url"]})() if data.get("thumbnail") and "url" in data["thumbnail"] else None
self.image = type("Image", (), {"url": data["image"]["url"]})() if data.get("image") else None self.image = type("Image", (), {"url": data["image"]["url"]})() if data.get("image") and "url" in data["image"] else None
author = data.get("author") author = data.get("author")
self.author = type("Author", (), { self.author = type("Author", (), {
@ -766,6 +807,7 @@ class BackupEmbed:
self.fields = [BackupEmbedField(f) for f in data.get("fields", [])] self.fields = [BackupEmbedField(f) for f in data.get("fields", [])]
class BackupEmbedField: class BackupEmbedField:
"""Minimal stand-in for embed fields.""" """Minimal stand-in for embed fields."""
__slots__ = ("name", "value", "inline") __slots__ = ("name", "value", "inline")
@ -773,31 +815,6 @@ class BackupEmbedField:
self.name = data.get("name") self.name = data.get("name")
self.value = data.get("value") self.value = data.get("value")
self.inline = bool(data.get("inline", False)) self.inline = bool(data.get("inline", False))
# Legacy extra_data support removed as requested
self.stickers = []
# Reactions
self.reactions = []
raw_reactions = data.get("reactions", [])
if isinstance(raw_reactions, list):
self.reactions = raw_reactions
elif isinstance(raw_reactions, str):
try:
self.reactions = json.loads(raw_reactions)
except Exception: pass
# Reference (replies/forwards)
self.reference = None
if data.get("message_reference"):
self.reference = type("Ref", (), {"message_id": int(data["message_reference"]), "channel_id": self.channel_id})()
self.thread = None
self.flags = type("Flags", (), {"value": 0})()
def __repr__(self) -> str:
return f"BackupMessage(id={self.id}, author={self.author})"
class BackupGuild: class BackupGuild:
@ -806,7 +823,7 @@ class BackupGuild:
__slots__ = ("id", "name", "icon", "banner", "_reader") __slots__ = ("id", "name", "icon", "banner", "_reader")
def __init__(self, data: dict, backup_path: Path, reader: "BackupReader" = None): def __init__(self, data: dict, backup_path: Path, reader: "BackupReader" = None):
self.id = int(data["id"]) self.id = parse_snowflake(data["id"])
self.name = data["name"] self.name = data["name"]
self._reader = reader self._reader = reader
@ -846,17 +863,17 @@ class BackupGuild:
def get_member(self, user_id: int) -> "BackupMember | None": def get_member(self, user_id: int) -> "BackupMember | None":
if self._reader: if self._reader:
return self._reader._member_map.get(int(user_id)) return self._reader._member_map.get(parse_snowflake(user_id))
return None return None
def get_role(self, role_id: int) -> "BackupRole | None": def get_role(self, role_id: int) -> "BackupRole | None":
if self._reader: if self._reader:
return next((r for r in self._reader._roles if r.id == int(role_id)), None) return next((r for r in self._reader._roles if r.id == parse_snowflake(role_id)), None)
return None return None
def get_channel(self, channel_id: int) -> "BackupChannel | None": def get_channel(self, channel_id: int) -> "BackupChannel | None":
if self._reader: if self._reader:
return next((c for c in self._reader._channels if c.id == int(channel_id)), None) return next((c for c in self._reader._channels if c.id == parse_snowflake(channel_id)), None)
return None return None
def __repr__(self) -> str: def __repr__(self) -> str:
@ -1053,7 +1070,9 @@ class BackupReader:
user_role_ids = set() user_role_ids = set()
for rid in (u.get("roles") or []): for rid in (u.get("roles") or []):
try: try:
user_role_ids.add(int(rid)) rid_parsed = parse_snowflake(rid)
if rid_parsed:
user_role_ids.add(rid_parsed)
except (ValueError, TypeError): except (ValueError, TypeError):
continue continue
role_objs = [r for r in self.roles if r.id in user_role_ids] role_objs = [r for r in self.roles if r.id in user_role_ids]
@ -1126,7 +1145,7 @@ class BackupReader:
conn = sqlite3.connect(self.db.db_path) conn = sqlite3.connect(self.db.db_path)
rows = conn.execute("SELECT DISTINCT channel_id FROM messages").fetchall() rows = conn.execute("SELECT DISTINCT channel_id FROM messages").fetchall()
conn.close() conn.close()
return [int(r[0]) for r in rows] return [parse_snowflake(r[0]) for r in rows if parse_snowflake(r[0])]
async def get_channel(self, channel_id: int) -> BackupChannel | BackupThread | None: async def get_channel(self, channel_id: int) -> BackupChannel | BackupThread | None:
for c in self.channels: for c in self.channels:
@ -1165,7 +1184,7 @@ class BackupReader:
# Try to fetch from DB # Try to fetch from DB
user_data = self.db.get_user(str(user_id)) if self.db else None user_data = self.db.get_user(str(user_id)) if self.db else None
if user_data: if user_data:
user_role_ids = {int(rid) for rid in (user_data.get("roles") or [])} user_role_ids = {parse_snowflake(rid) for rid in (user_data.get("roles") or []) if parse_snowflake(rid)}
role_objs = [r for r in self.roles if r.id in user_role_ids] role_objs = [r for r in self.roles if r.id in user_role_ids]
member = BackupMember(user_data, role_objects=role_objs, backup_path=self.backup_path) member = BackupMember(user_data, role_objects=role_objs, backup_path=self.backup_path)
self._members.append(member) self._members.append(member)
@ -1181,12 +1200,12 @@ class BackupReader:
return stub return stub
def _hydrate_message(self, msg_data: dict) -> BackupMessage: def _hydrate_message(self, msg_data: dict) -> BackupMessage:
user_id = int(msg_data.get("author_id", 0)) user_id = parse_snowflake(msg_data.get("author_id", 0)) or 0
author = self._resolve_author(user_id) author = self._resolve_author(user_id)
self._ensure_media_pool_loaded() self._ensure_media_pool_loaded()
channel_id = int(msg_data["channel_id"]) channel_id = parse_snowflake(msg_data["channel_id"])
channel = next((c for c in self.channels if c.id == channel_id), None) channel = next((c for c in self.channels if c.id == channel_id), None)
return BackupMessage( return BackupMessage(

View file

@ -639,6 +639,7 @@ class DiscordExporter:
logger.error(f"Failed to fetch threads for {channel.name}: {e}") logger.error(f"Failed to fetch threads for {channel.name}: {e}")
is_forum = isinstance(channel, discord.ForumChannel) is_forum = isinstance(channel, discord.ForumChannel)
logger.debug(f"Exporting threads for channel '{channel.name}' ({channel.id}) [Type: {type(channel)}] [Is Forum: {is_forum}]")
if all_threads and self.db: if all_threads and self.db:
thread_meta = [] thread_meta = []
@ -658,17 +659,31 @@ class DiscordExporter:
# Attempt 1: Standard attribute # Attempt 1: Standard attribute
applied_tags = [str(tag.id) for tag in t.applied_tags] applied_tags = [str(tag.id) for tag in t.applied_tags]
# Attempt 2: If still empty and it's a forum thread, it might not be loaded # Attempt 2: Internal list of IDs if available (sometimes populated when property is empty)
if not applied_tags and hasattr(t, "_applied_tags"):
raw_ids = getattr(t, "_applied_tags", [])
if raw_ids:
logger.info(f"Thread '{t.name}' ({t.id}) found raw tags in _applied_tags: {raw_ids}")
applied_tags = [str(tid) for tid in raw_ids]
# Attempt 3: If still empty and it's a forum thread, try to fetch it specifically
if not applied_tags and is_forum: if not applied_tags and is_forum:
try: try:
# We can try to fetch the thread specifically to get tags # We can try to fetch the thread specifically to get tags
# But we only do this if we really have to
# (Discord sometimes doesn't include tags in bulk guild.active_threads) # (Discord sometimes doesn't include tags in bulk guild.active_threads)
fetched_t = await self.reader.client.fetch_channel(t.id) fetched_t = await self.reader.client.fetch_channel(t.id)
# Check both property and internal list on fetched object
if hasattr(fetched_t, "applied_tags"): if hasattr(fetched_t, "applied_tags"):
applied_tags = [str(tag.id) for tag in fetched_t.applied_tags] applied_tags = [str(tag.id) for tag in fetched_t.applied_tags]
except Exception: if not applied_tags and hasattr(fetched_t, "_applied_tags"):
raw_ids = getattr(fetched_t, "_applied_tags", [])
applied_tags = [str(tid) for tid in raw_ids]
except Exception as e:
logger.debug(f"Failed to fetch thread {t.id} for tags: {e}")
pass pass
if not applied_tags and is_forum:
logger.warning(f"Thread '{t.name}' ({t.id}) is in forum '{channel.name}' but NO tags found (tried all methods)")
thread_meta.append({ thread_meta.append({
"id": str(t.id), "id": str(t.id),

View file

@ -142,6 +142,7 @@ class BackupStatsScreen(Screen[None]):
height: auto; height: auto;
border: solid $accent; border: solid $accent;
background: $boost; background: $boost;
color: $text;
} }
#bs_actions { #bs_actions {
@ -225,7 +226,8 @@ class BackupStatsScreen(Screen[None]):
self.stats_tree.show_root = False self.stats_tree.show_root = False
# Add a header row to the tree root, purely for visual columns # Add a header row to the tree root, purely for visual columns
header_text = self._format_tree_row("NAME", "MESSAGES", "THREADS", "FILES", "SIZE") # Using depth=4 for header to compensate for root node toggle position delta
header_text = self._format_tree_row("NAME", "MESSAGES", "THREADS", "FILES", "SIZE", depth=-1)
header_text.stylize("bold") header_text.stylize("bold")
self.stats_tree.root.set_label(header_text) self.stats_tree.root.set_label(header_text)
self.stats_tree.show_root = True self.stats_tree.show_root = True
@ -247,9 +249,16 @@ class BackupStatsScreen(Screen[None]):
return f"{size_bytes / (1024 * 1024):.2f} MB" return f"{size_bytes / (1024 * 1024):.2f} MB"
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB" return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
def _format_tree_row(self, name: str, msgs, threads, files, size) -> Text: def _format_tree_row(self, name: str, msgs, threads, files, size, depth=0) -> Text:
"""Pads and aligns columns for the tree view to simulate a table.""" """Pads and aligns columns for the tree view to simulate a table."""
col_name = str(name)[:30].ljust(35) # Textual Tree indents child nodes. Standard indent is 4 characters.
# To maintain vertical alignment of values, we subtract the indentation from the name column width.
indent_compensation = depth * 2
# Name column: base width is 24.
name_col_width = max(6, 24 - indent_compensation)
col_name = str(name)[:name_col_width].ljust(name_col_width)
col_msg = str(msgs).rjust(12) col_msg = str(msgs).rjust(12)
col_thd = str(threads).rjust(12) col_thd = str(threads).rjust(12)
col_file = str(files).rjust(12) col_file = str(files).rjust(12)
@ -331,8 +340,13 @@ class BackupStatsScreen(Screen[None]):
self.query_one("#bs_val_size", Label).update(f"{self._format_size(total_size)}") self.query_one("#bs_val_size", Label).update(f"{self._format_size(total_size)}")
self.query_one("#bs_val_coverage", Label).update(f"{backed_up_channels} / {total_channels}") self.query_one("#bs_val_coverage", Label).update(f"{backed_up_channels} / {total_channels}")
# 5. Build Tree # 5. Build Tree - Sort categories to show Uncategorized first, then by name
for cat_id, info in cat_map.items(): sorted_items = sorted(
cat_map.items(),
key=lambda x: (0 if x[0] is None else 1, x[1]["cat"].name if x[0] is not None else "")
)
for cat_id, info in sorted_items:
cat = info["cat"] cat = info["cat"]
chans = info["chans"] chans = info["chans"]
if not chans: continue if not chans: continue
@ -364,13 +378,13 @@ class BackupStatsScreen(Screen[None]):
c_files += stats["attachment_count"] c_files += stats["attachment_count"]
c_size += stats["total_size"] c_size += stats["total_size"]
cat_lbl = self._format_tree_row(cat_name, c_msgs, c_thds, c_files, self._format_size(c_size)) cat_lbl = self._format_tree_row(cat_name, c_msgs, c_thds, c_files, self._format_size(c_size), depth=1)
cat_lbl.stylize("bold yellow") cat_lbl.stylize("bold yellow")
node = self.stats_tree.root.add(cat_lbl, expand=True) node = self.stats_tree.root.add(cat_lbl, expand=True)
for ch_data in chan_nodes_data: for ch_data in chan_nodes_data:
size_str = self._format_size(ch_data['size']) if ch_data['is_backed_up'] else "NA" size_str = self._format_size(ch_data['size']) if ch_data['is_backed_up'] else "NA"
ch_lbl = self._format_tree_row(f" {ch_data['name']}", ch_data['msgs'], ch_data['threads'], ch_data['files'], size_str) ch_lbl = self._format_tree_row(ch_data['name'], ch_data['msgs'], ch_data['threads'], ch_data['files'], size_str, depth=2)
if ch_data['is_backed_up']: if ch_data['is_backed_up']:
ch_lbl.stylize("bold white") ch_lbl.stylize("bold white")