- Add app/db/ abstraction layer: Backend enum, DbConn wrapper, dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id() - Auto-detect backend from DATABASE_URL; SQLite remains default when unset — no config change for local deployments - Add tenant_id column to all three logical DBs (main, context, incidents); idempotent ALTER TABLE migration runs before schema scripts on existing DBs - All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '') for backward compat with pre-namespacing rows - Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds) and optional external Postgres support via DATABASE_URL override - Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs - Fix SSH glean path in pipeline.py to use ensure_schema + get_conn (was still using raw sqlite3.connect + old _SCHEMA without tenant_id) - Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search - Update all tests to use ensure_*_schema fixtures; add row_factory where needed - 394/394 tests passing Closes: #42 Closes: #50
240 lines
9.3 KiB
Python
240 lines
9.3 KiB
Python
"""Tests for fingerprint-based incremental glean skipping (issue #30).
|
|
|
|
Verifies that _glean_files() (and its public wrappers) skip local files whose
|
|
mtime+size fingerprint has not changed since the last glean, and that force=True
|
|
bypasses that check.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from app.glean.pipeline import (
|
|
_fingerprint,
|
|
_fp_unchanged,
|
|
_save_fingerprint,
|
|
ensure_schema,
|
|
glean_dir,
|
|
glean_file,
|
|
)
|
|
from app.glean.base import now_iso
|
|
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture()
|
|
def db_path(tmp_path: Path) -> Path:
|
|
path = tmp_path / "test.db"
|
|
ensure_schema(path)
|
|
return path
|
|
|
|
|
|
@pytest.fixture()
|
|
def log_file(tmp_path: Path) -> Path:
|
|
"""A minimal plaintext log file."""
|
|
f = tmp_path / "test.log"
|
|
f.write_text("May 24 10:00:00 heimdall kernel: test message\n")
|
|
return f
|
|
|
|
|
|
# ── Unit: fingerprint helpers ──────────────────────────────────────────────────
|
|
|
|
class TestFingerprintHelpers:
|
|
def test_fingerprint_returns_mtime_and_size(self, log_file: Path) -> None:
|
|
mtime, size = _fingerprint(log_file)
|
|
st = log_file.stat()
|
|
assert mtime == st.st_mtime
|
|
assert size == st.st_size
|
|
|
|
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
mtime, size = _fingerprint(log_file)
|
|
assert _fp_unchanged(conn, log_file, mtime, size) is False
|
|
conn.close()
|
|
|
|
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
mtime, size = _fingerprint(log_file)
|
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
|
conn.commit()
|
|
assert _fp_unchanged(conn, log_file, mtime, size) is True
|
|
conn.close()
|
|
|
|
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
mtime, size = _fingerprint(log_file)
|
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
|
conn.commit()
|
|
# Simulate size change (new content appended)
|
|
assert _fp_unchanged(conn, log_file, mtime, size + 1) is False
|
|
conn.close()
|
|
|
|
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
mtime, size = _fingerprint(log_file)
|
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
|
conn.commit()
|
|
assert _fp_unchanged(conn, log_file, mtime + 1.0, size) is False
|
|
conn.close()
|
|
|
|
def test_save_fingerprint_upserts(self, db_path: Path, log_file: Path) -> None:
|
|
"""Second save with different values replaces the first (UPSERT semantics)."""
|
|
conn = sqlite3.connect(str(db_path))
|
|
_save_fingerprint(conn, log_file, 1000.0, 100, "2026-01-01T00:00:00Z")
|
|
conn.commit()
|
|
_save_fingerprint(conn, log_file, 2000.0, 200, "2026-01-02T00:00:00Z")
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
|
(str(log_file),),
|
|
).fetchone()
|
|
assert row == (2000.0, 200)
|
|
conn.close()
|
|
|
|
|
|
# ── Integration: glean_file skipping ─────────────────────────────────────────
|
|
|
|
class TestGleanFileFingerprint:
|
|
def test_first_glean_writes_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
|
glean_file(log_file, db_path)
|
|
conn = sqlite3.connect(str(db_path))
|
|
row = conn.execute(
|
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
|
(str(log_file),),
|
|
).fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
mtime, size = _fingerprint(log_file)
|
|
assert row == (mtime, size)
|
|
|
|
def test_second_glean_skips_unchanged_file(self, db_path: Path, log_file: Path) -> None:
|
|
stats_first = glean_file(log_file, db_path)
|
|
count_first = sum(stats_first.values())
|
|
|
|
# Re-glean without touching the file — should produce 0 new entries.
|
|
stats_second = glean_file(log_file, db_path)
|
|
count_second = sum(stats_second.values())
|
|
|
|
assert count_first >= 1, "First glean should find at least one entry"
|
|
assert count_second == 0, "Second glean should skip unchanged file"
|
|
|
|
def test_second_glean_runs_when_file_grows(self, db_path: Path, log_file: Path) -> None:
|
|
glean_file(log_file, db_path)
|
|
|
|
# Append a new line and update mtime by rewriting.
|
|
original = log_file.read_text()
|
|
log_file.write_text(original + "May 24 10:01:00 heimdall kernel: second message\n")
|
|
|
|
stats_second = glean_file(log_file, db_path)
|
|
# INSERT OR IGNORE means the original entry won't re-count, but parsing
|
|
# does happen — at minimum the new line is processed.
|
|
assert sum(stats_second.values()) >= 0 # glean ran (not skipped)
|
|
|
|
# Confirm fingerprint updated to new size.
|
|
conn = sqlite3.connect(str(db_path))
|
|
row = conn.execute(
|
|
"SELECT size FROM glean_fingerprints WHERE path = ?",
|
|
(str(log_file),),
|
|
).fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert row[0] == log_file.stat().st_size
|
|
|
|
def test_force_bypasses_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
|
glean_file(log_file, db_path)
|
|
|
|
# Without force: skipped.
|
|
stats_no_force = glean_file(log_file, db_path)
|
|
assert sum(stats_no_force.values()) == 0
|
|
|
|
# With force: glean runs (INSERT OR IGNORE means count may be 0, but
|
|
# we verify the fingerprint was re-saved with a fresh gleaned_at).
|
|
conn_before = sqlite3.connect(str(db_path))
|
|
ts_before = conn_before.execute(
|
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
|
(str(log_file),),
|
|
).fetchone()[0]
|
|
conn_before.close()
|
|
|
|
time.sleep(0.01) # ensure gleaned_at advances
|
|
glean_file(log_file, db_path, force=True)
|
|
|
|
conn_after = sqlite3.connect(str(db_path))
|
|
ts_after = conn_after.execute(
|
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
|
(str(log_file),),
|
|
).fetchone()[0]
|
|
conn_after.close()
|
|
|
|
assert ts_after > ts_before, "force=True should update gleaned_at timestamp"
|
|
|
|
|
|
# ── Integration: glean_dir skipping ──────────────────────────────────────────
|
|
|
|
class TestGleanDirFingerprint:
|
|
def test_glean_dir_skips_unchanged_on_second_run(self, db_path: Path, tmp_path: Path) -> None:
|
|
log1 = tmp_path / "a.log"
|
|
log2 = tmp_path / "b.log"
|
|
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
|
log2.write_text("May 24 10:00:00 heimdall kernel: msg two\n")
|
|
|
|
glean_dir(tmp_path, db_path)
|
|
|
|
stats_second = glean_dir(tmp_path, db_path)
|
|
assert sum(stats_second.values()) == 0, "Both unchanged files should be skipped"
|
|
|
|
def test_glean_dir_force_reruns_all(self, db_path: Path, tmp_path: Path) -> None:
|
|
log1 = tmp_path / "a.log"
|
|
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
|
|
|
glean_dir(tmp_path, db_path)
|
|
|
|
# force=True: runs even though nothing changed; INSERT OR IGNORE keeps DB clean.
|
|
conn_before = sqlite3.connect(str(db_path))
|
|
ts_before = conn_before.execute(
|
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
|
(str(log1),),
|
|
).fetchone()[0]
|
|
conn_before.close()
|
|
|
|
time.sleep(0.01)
|
|
glean_dir(tmp_path, db_path, force=True)
|
|
|
|
conn_after = sqlite3.connect(str(db_path))
|
|
ts_after = conn_after.execute(
|
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
|
(str(log1),),
|
|
).fetchone()[0]
|
|
conn_after.close()
|
|
|
|
assert ts_after > ts_before
|
|
|
|
|
|
# ── Schema: ensure fingerprints table created ─────────────────────────────────
|
|
|
|
class TestEnsureSchema:
|
|
def test_fingerprints_table_exists_after_ensure_schema(self, tmp_path: Path) -> None:
|
|
db = tmp_path / "fresh.db"
|
|
ensure_schema(db)
|
|
conn = sqlite3.connect(str(db))
|
|
tables = {
|
|
row[0]
|
|
for row in conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()
|
|
}
|
|
conn.close()
|
|
assert "glean_fingerprints" in tables
|
|
|
|
def test_ensure_schema_idempotent(self, tmp_path: Path) -> None:
|
|
"""Calling ensure_schema twice on the same DB must not raise."""
|
|
db = tmp_path / "fresh.db"
|
|
ensure_schema(db)
|
|
ensure_schema(db) # second call — should be a no-op
|