peregrine/tests/test_backup.py
pyr0ball 7a698496f9 feat(cloud): fix backup/restore for cloud mode — SQLCipher encrypt/decrypt
T13: Three fixes:
1. backup.py: _decrypt_db_to_bytes() decrypts SQLCipher DB before archiving
   so the zip is portable to any local Docker install (plain SQLite).
2. backup.py: _encrypt_db_from_bytes() re-encrypts on restore in cloud mode
   so the app can open the restored DB normally.
3. 2_Settings.py: _base_dir uses get_db_path().parent in cloud mode (user's
   per-tenant data dir) instead of the hardcoded app root; db_key wired
   through both create_backup() and restore_backup() calls.

6 new cloud backup tests + 2 unit tests for SQLCipher helpers (pysqlcipher3
mocked — not available in the local conda test env). 419/419 total passing.
2026-03-09 22:41:44 -07:00

372 lines
14 KiB
Python

"""Tests for scripts/backup.py — create, list, restore, and multi-instance support."""
from __future__ import annotations
import json
import zipfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from scripts.backup import (
_decrypt_db_to_bytes,
_detect_source_label,
_encrypt_db_from_bytes,
create_backup,
list_backup_contents,
restore_backup,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_instance(tmp_path: Path, name: str, *, root_db: bool = False) -> Path:
"""Build a minimal fake instance directory for testing."""
base = tmp_path / name
base.mkdir()
# Secret configs
(base / "config").mkdir()
(base / "config" / "notion.yaml").write_text("token: secret")
(base / "config" / "email.yaml").write_text("user: test@example.com")
# Extra config
(base / "config" / "llm.yaml").write_text("backend: ollama")
(base / "config" / "resume_keywords.yaml").write_text("keywords: [python]")
(base / "config" / "server.yaml").write_text("port: 8502")
# DB — either at data/staging.db (Peregrine) or staging.db root (legacy)
if root_db:
(base / "staging.db").write_bytes(b"SQLite legacy")
else:
(base / "data").mkdir()
(base / "data" / "staging.db").write_bytes(b"SQLite peregrine")
return base
# ---------------------------------------------------------------------------
# create_backup
# ---------------------------------------------------------------------------
class TestCreateBackup:
def test_returns_valid_zip(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
assert zipfile.is_zipfile(__import__("io").BytesIO(data))
def test_includes_secret_configs(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "config/notion.yaml" in info["files"]
assert "config/email.yaml" in info["files"]
def test_includes_extra_configs(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "config/llm.yaml" in info["files"]
assert "config/resume_keywords.yaml" in info["files"]
assert "config/server.yaml" in info["files"]
def test_includes_db_by_default(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert info["manifest"]["includes_db"] is True
assert any(f.endswith(".db") for f in info["files"])
def test_excludes_db_when_flag_false(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base, include_db=False)
info = list_backup_contents(data)
assert info["manifest"]["includes_db"] is False
assert not any(f.endswith(".db") for f in info["files"])
def test_silently_skips_missing_files(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
# tokens.yaml not created in fixture — should not raise
data = create_backup(base)
info = list_backup_contents(data)
assert "config/tokens.yaml" not in info["files"]
def test_manifest_contains_source_label(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert info["manifest"]["source"] == "peregrine"
def test_source_label_override(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base, source_label="custom-label")
info = list_backup_contents(data)
assert info["manifest"]["source"] == "custom-label"
# ---------------------------------------------------------------------------
# Legacy instance (staging.db at repo root)
# ---------------------------------------------------------------------------
class TestLegacyInstance:
def test_picks_up_root_db(self, tmp_path):
base = _make_instance(tmp_path, "job-seeker", root_db=True)
data = create_backup(base)
info = list_backup_contents(data)
assert "staging.db" in info["files"]
assert "data/staging.db" not in info["files"]
def test_source_label_is_job_seeker(self, tmp_path):
base = _make_instance(tmp_path, "job-seeker", root_db=True)
data = create_backup(base)
info = list_backup_contents(data)
assert info["manifest"]["source"] == "job-seeker"
def test_missing_peregrine_only_configs_skipped(self, tmp_path):
"""Legacy doesn't have server.yaml, user.yaml, etc. — should not error."""
base = _make_instance(tmp_path, "job-seeker", root_db=True)
# Remove server.yaml to simulate legacy (it won't exist there)
(base / "config" / "server.yaml").unlink()
data = create_backup(base)
info = list_backup_contents(data)
assert "config/server.yaml" not in info["files"]
assert "config/notion.yaml" in info["files"]
# ---------------------------------------------------------------------------
# list_backup_contents
# ---------------------------------------------------------------------------
class TestListBackupContents:
def test_returns_manifest_and_files(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "manifest" in info
assert "files" in info
assert "sizes" in info
assert "total_bytes" in info
def test_total_bytes_is_sum_of_file_sizes(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
expected = sum(info["sizes"][f] for f in info["files"] if f in info["sizes"])
assert info["total_bytes"] == expected
def test_manifest_not_in_files_list(self, tmp_path):
base = _make_instance(tmp_path, "peregrine")
data = create_backup(base)
info = list_backup_contents(data)
assert "backup-manifest.json" not in info["files"]
# ---------------------------------------------------------------------------
# restore_backup
# ---------------------------------------------------------------------------
class TestRestoreBackup:
def test_restores_all_files(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
data = create_backup(src)
result = restore_backup(data, dst)
assert len(result["restored"]) > 0
assert (dst / "config" / "notion.yaml").exists()
def test_skips_db_when_flag_false(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
data = create_backup(src)
result = restore_backup(data, dst, include_db=False)
assert not any(f.endswith(".db") for f in result["restored"])
assert any(f.endswith(".db") for f in result["skipped"])
def test_no_overwrite_skips_existing(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
(dst / "config").mkdir()
existing = dst / "config" / "notion.yaml"
existing.write_text("original content")
data = create_backup(src)
result = restore_backup(data, dst, overwrite=False)
assert "config/notion.yaml" in result["skipped"]
assert existing.read_text() == "original content"
def test_overwrite_replaces_existing(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
dst = tmp_path / "restored"
dst.mkdir()
(dst / "config").mkdir()
(dst / "config" / "notion.yaml").write_text("stale content")
data = create_backup(src)
restore_backup(data, dst, overwrite=True)
assert (dst / "config" / "notion.yaml").read_text() == "token: secret"
def test_roundtrip_preserves_content(self, tmp_path):
src = _make_instance(tmp_path, "peregrine")
original = (src / "config" / "notion.yaml").read_text()
dst = tmp_path / "restored"
dst.mkdir()
data = create_backup(src)
restore_backup(data, dst)
assert (dst / "config" / "notion.yaml").read_text() == original
# ---------------------------------------------------------------------------
# _detect_source_label
# ---------------------------------------------------------------------------
class TestDetectSourceLabel:
def test_returns_directory_name(self, tmp_path):
base = tmp_path / "peregrine"
base.mkdir()
assert _detect_source_label(base) == "peregrine"
def test_legacy_label(self, tmp_path):
base = tmp_path / "job-seeker"
base.mkdir()
assert _detect_source_label(base) == "job-seeker"
# ---------------------------------------------------------------------------
# Cloud mode — SQLCipher encrypt / decrypt (pysqlcipher3 mocked)
# ---------------------------------------------------------------------------
class _FakeCursor:
def __enter__(self): return self
def __exit__(self, *a): return False
def execute(self, *a): pass
def fetchone(self): return None
def _make_mock_sqlcipher_conn(plain_bytes: bytes, tmp_path: Path):
"""Return a mock pysqlcipher3 connection that writes plain_bytes to the
first 'ATTACH DATABASE' path it sees (simulating sqlcipher_export)."""
attached: dict = {}
conn = MagicMock()
def fake_execute(sql, *args):
if "ATTACH DATABASE" in sql:
# Extract path between first pair of quotes
parts = sql.split("'")
path = parts[1]
attached["path"] = path
elif "sqlcipher_export" in sql:
# Simulate export: write plain_bytes to the attached path
Path(attached["path"]).write_bytes(plain_bytes)
conn.execute.side_effect = fake_execute
conn.close = MagicMock()
return conn
class TestCloudBackup:
"""Backup/restore with SQLCipher encryption — pysqlcipher3 mocked out."""
def test_create_backup_decrypts_db_when_key_set(self, tmp_path):
"""With db_key, _decrypt_db_to_bytes is called and plain bytes go into zip."""
base = _make_instance(tmp_path, "cloud-user")
plain_db = b"SQLite format 3\x00plain-content"
with patch("scripts.backup._decrypt_db_to_bytes", return_value=plain_db) as mock_dec:
data = create_backup(base, include_db=True, db_key="testkey")
mock_dec.assert_called_once()
# The zip should contain the plain bytes, not the raw encrypted file
with zipfile.ZipFile(__import__("io").BytesIO(data)) as zf:
db_files = [n for n in zf.namelist() if n.endswith(".db")]
assert len(db_files) == 1
assert zf.read(db_files[0]) == plain_db
def test_create_backup_no_key_reads_file_directly(self, tmp_path):
"""Without db_key, _decrypt_db_to_bytes is NOT called."""
base = _make_instance(tmp_path, "local-user")
with patch("scripts.backup._decrypt_db_to_bytes") as mock_dec:
create_backup(base, include_db=True, db_key="")
mock_dec.assert_not_called()
def test_restore_backup_encrypts_db_when_key_set(self, tmp_path):
"""With db_key, _encrypt_db_from_bytes is called for .db files."""
src = _make_instance(tmp_path, "cloud-src")
dst = tmp_path / "cloud-dst"
dst.mkdir()
plain_db = b"SQLite format 3\x00plain-content"
# Create a backup with plain DB bytes
with patch("scripts.backup._decrypt_db_to_bytes", return_value=plain_db):
data = create_backup(src, include_db=True, db_key="testkey")
with patch("scripts.backup._encrypt_db_from_bytes") as mock_enc:
restore_backup(data, dst, include_db=True, db_key="testkey")
mock_enc.assert_called_once()
call_args = mock_enc.call_args
assert call_args[0][0] == plain_db # plain_bytes
assert call_args[0][2] == "testkey" # db_key
def test_restore_backup_no_key_writes_file_directly(self, tmp_path):
"""Without db_key, _encrypt_db_from_bytes is NOT called."""
src = _make_instance(tmp_path, "local-src")
dst = tmp_path / "local-dst"
dst.mkdir()
data = create_backup(src, include_db=True, db_key="")
with patch("scripts.backup._encrypt_db_from_bytes") as mock_enc:
restore_backup(data, dst, include_db=True, db_key="")
mock_enc.assert_not_called()
def test_decrypt_db_to_bytes_calls_sqlcipher(self, tmp_path):
"""_decrypt_db_to_bytes imports pysqlcipher3.dbapi2 and calls sqlcipher_export."""
fake_db = tmp_path / "staging.db"
fake_db.write_bytes(b"encrypted")
plain_bytes = b"SQLite format 3\x00"
mock_conn = _make_mock_sqlcipher_conn(plain_bytes, tmp_path)
mock_module = MagicMock()
mock_module.connect.return_value = mock_conn
# Must set dbapi2 explicitly on the package mock so `from pysqlcipher3 import
# dbapi2` resolves to mock_module (not a new auto-created MagicMock attr).
mock_pkg = MagicMock()
mock_pkg.dbapi2 = mock_module
with patch.dict("sys.modules", {"pysqlcipher3": mock_pkg, "pysqlcipher3.dbapi2": mock_module}):
result = _decrypt_db_to_bytes(fake_db, "testkey")
mock_module.connect.assert_called_once_with(str(fake_db))
assert result == plain_bytes
def test_encrypt_db_from_bytes_calls_sqlcipher(self, tmp_path):
"""_encrypt_db_from_bytes imports pysqlcipher3.dbapi2 and calls sqlcipher_export."""
dest = tmp_path / "staging.db"
plain_bytes = b"SQLite format 3\x00"
mock_conn = MagicMock()
mock_module = MagicMock()
mock_module.connect.return_value = mock_conn
mock_pkg = MagicMock()
mock_pkg.dbapi2 = mock_module
with patch.dict("sys.modules", {"pysqlcipher3": mock_pkg, "pysqlcipher3.dbapi2": mock_module}):
_encrypt_db_from_bytes(plain_bytes, dest, "testkey")
mock_module.connect.assert_called_once()
# Verify ATTACH DATABASE call included the dest path and key
attach_calls = [
call for call in mock_conn.execute.call_args_list
if "ATTACH DATABASE" in str(call)
]
assert len(attach_calls) == 1
assert str(dest) in str(attach_calls[0])
assert "testkey" in str(attach_calls[0])