TheOrb/archive_bot/db.py

111 lines
3 KiB
Python

from __future__ import annotations
import json
import os
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
DEFAULT_DB_PATH = Path("state/archive-bot.db")
def database_path() -> Path:
raw = os.getenv("ARCHIVE_BOT_DB_PATH", "").strip()
return Path(raw) if raw else DEFAULT_DB_PATH
def document_name(path: Path) -> str:
return path.as_posix()
def connect_db() -> sqlite3.Connection:
db_path = database_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
return connection
def initialize_database() -> None:
with connect_db() as connection:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS documents (
name TEXT PRIMARY KEY,
content TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
connection.execute(
"""
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
connection.execute(
"""
INSERT INTO metadata(key, value)
VALUES ('schema_version', '1')
ON CONFLICT(key) DO UPDATE SET value=excluded.value
"""
)
def load_document(path: Path) -> dict[str, Any] | None:
initialize_database()
with connect_db() as connection:
row = connection.execute(
"SELECT content FROM documents WHERE name = ?",
(document_name(path),),
).fetchone()
if row is None:
return None
try:
data = json.loads(str(row["content"]))
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}
def save_document(path: Path, data: dict[str, Any]) -> None:
initialize_database()
payload = json.dumps(data, indent=2, sort_keys=True)
now = datetime.now(timezone.utc).isoformat()
with connect_db() as connection:
connection.execute(
"""
INSERT INTO documents(name, content, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(name) DO UPDATE
SET content = excluded.content,
updated_at = excluded.updated_at
""",
(document_name(path), payload, now),
)
connection.commit()
def migrate_json_document(path: Path) -> None:
existing = load_document(path)
if existing is not None:
return
if not path.exists():
return
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (OSError, json.JSONDecodeError):
return
if isinstance(data, dict):
save_document(path, data)
def migrate_runtime_documents(paths: list[Path]) -> None:
initialize_database()
for path in paths:
migrate_json_document(path)