feat: fingerprint-based incremental glean — skip unchanged files (#30)
- Add glean_fingerprints table to schema (sha256 + mtime + size)
- _fingerprint(), _fp_unchanged(), _save_fingerprint() helpers in pipeline.py
- _glean_files() now checks fingerprint; skips file if hash unchanged
- force=True param threads through glean_dir → glean_file → glean_sources
- POST /api/tasks/glean and POST /api/sources/{id}/glean accept force=true
- 14 unit tests in tests/test_glean_fingerprint.py, all passing
Closes: #30
This commit is contained in:
parent
e746d55730
commit
2fde3a1814
4 changed files with 340 additions and 11 deletions
|
|
@ -119,6 +119,13 @@ CREATE TABLE IF NOT EXISTS blocklist_candidates (
|
||||||
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
||||||
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
||||||
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS glean_fingerprints (
|
||||||
|
path TEXT PRIMARY KEY,
|
||||||
|
mtime REAL NOT NULL,
|
||||||
|
size INTEGER NOT NULL,
|
||||||
|
gleaned_at TEXT NOT NULL
|
||||||
|
);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -139,6 +146,44 @@ def ensure_schema(db_path: Path) -> None:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _fingerprint(path: Path) -> tuple[float, int]:
|
||||||
|
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
||||||
|
st = path.stat()
|
||||||
|
return st.st_mtime, st.st_size
|
||||||
|
|
||||||
|
|
||||||
|
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
|
||||||
|
"""Return True only when the stored fingerprint exactly matches (mtime, size).
|
||||||
|
|
||||||
|
A smaller size (log rotation) or a larger size (new lines appended) both
|
||||||
|
return False so the caller re-gleams the file.
|
||||||
|
"""
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(path),),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return False
|
||||||
|
return row[0] == mtime and row[1] == size
|
||||||
|
|
||||||
|
|
||||||
|
def _save_fingerprint(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
path: Path,
|
||||||
|
mtime: float,
|
||||||
|
size: int,
|
||||||
|
gleaned_at: str,
|
||||||
|
) -> None:
|
||||||
|
"""Upsert the fingerprint for *path* after a successful glean."""
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(str(path), mtime, size, gleaned_at),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _detect_format(first_line: str) -> str:
|
def _detect_format(first_line: str) -> str:
|
||||||
try:
|
try:
|
||||||
obj = json.loads(first_line)
|
obj = json.loads(first_line)
|
||||||
|
|
@ -236,6 +281,7 @@ def _glean_files(
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
source_id_map: dict[Path, str] | None = None,
|
source_id_map: dict[Path, str] | None = None,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
pattern_file = pattern_file or Path("patterns/default.yaml")
|
pattern_file = pattern_file or Path("patterns/default.yaml")
|
||||||
patterns = load_patterns(pattern_file)
|
patterns = load_patterns(pattern_file)
|
||||||
|
|
@ -249,9 +295,19 @@ def _glean_files(
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
stats: dict[str, int] = {}
|
stats: dict[str, int] = {}
|
||||||
|
skipped: list[str] = []
|
||||||
|
|
||||||
for log_file in files:
|
for log_file in files:
|
||||||
source_id = source_id_map.get(log_file, log_file.stem)
|
source_id = source_id_map.get(log_file, log_file.stem)
|
||||||
|
|
||||||
|
# Fingerprint check — skip files whose mtime+size haven't changed.
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
if not force and _fp_unchanged(conn, log_file, mtime, size):
|
||||||
|
logger.debug("Skipping unchanged file: %s", log_file.name)
|
||||||
|
skipped.append(log_file.name)
|
||||||
|
stats[source_id] = stats.get(source_id, 0)
|
||||||
|
continue
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
batch: list[RetrievedEntry] = []
|
batch: list[RetrievedEntry] = []
|
||||||
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
||||||
|
|
@ -265,11 +321,18 @@ def _glean_files(
|
||||||
_write_batch(conn, batch)
|
_write_batch(conn, batch)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
count += len(batch)
|
count += len(batch)
|
||||||
|
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
stats[source_id] = stats.get(source_id, 0) + count
|
stats[source_id] = stats.get(source_id, 0) + count
|
||||||
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
if skipped:
|
||||||
|
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
|
||||||
|
|
||||||
logger.info("Building FTS index...")
|
logger.info("Building FTS index...")
|
||||||
build_fts_index(db_path)
|
build_fts_index(db_path)
|
||||||
logger.info("FTS index ready")
|
logger.info("FTS index ready")
|
||||||
|
|
@ -429,19 +492,28 @@ def glean_dir(
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Glean all .jsonl and .log files from a corpus directory."""
|
"""Glean all .jsonl and .log files from a corpus directory.
|
||||||
|
|
||||||
|
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
|
||||||
|
regardless of whether they have changed since the last run.
|
||||||
|
"""
|
||||||
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
||||||
return _glean_files(files, db_path, pattern_file, batch_size)
|
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
|
||||||
|
|
||||||
|
|
||||||
def glean_file(
|
def glean_file(
|
||||||
log_file: Path,
|
log_file: Path,
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Glean a single log file (any supported format)."""
|
"""Glean a single log file (any supported format).
|
||||||
return _glean_files([log_file], db_path, pattern_file)
|
|
||||||
|
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
|
||||||
|
"""
|
||||||
|
return _glean_files([log_file], db_path, pattern_file, force=force)
|
||||||
|
|
||||||
|
|
||||||
def glean_sources(
|
def glean_sources(
|
||||||
|
|
@ -449,6 +521,7 @@ def glean_sources(
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Glean all sources listed in a sources.yaml config file.
|
"""Glean all sources listed in a sources.yaml config file.
|
||||||
|
|
||||||
|
|
@ -510,7 +583,7 @@ def glean_sources(
|
||||||
|
|
||||||
stats: dict[str, int] = {}
|
stats: dict[str, int] = {}
|
||||||
if files:
|
if files:
|
||||||
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map))
|
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
|
||||||
|
|
||||||
# ── SSH remote sources ─────────────────────────────────────────────────
|
# ── SSH remote sources ─────────────────────────────────────────────────
|
||||||
if not ssh_sources:
|
if not ssh_sources:
|
||||||
|
|
|
||||||
23
app/rest.py
23
app/rest.py
|
|
@ -515,13 +515,20 @@ def delete_source(source_id: str) -> dict:
|
||||||
|
|
||||||
|
|
||||||
@router.post("/api/sources/{source_id}/glean")
|
@router.post("/api/sources/{source_id}/glean")
|
||||||
def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
def reglean_source(
|
||||||
|
source_id: str,
|
||||||
|
background_tasks: BackgroundTasks,
|
||||||
|
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean even if file is unchanged")] = False,
|
||||||
|
) -> dict:
|
||||||
"""Trigger a re-glean for a configured source from sources.yaml.
|
"""Trigger a re-glean for a configured source from sources.yaml.
|
||||||
|
|
||||||
Handles both local file sources and SSH remote sources. For SSH sources,
|
Handles both local file sources and SSH remote sources. For SSH sources,
|
||||||
the glean runs in the foreground and rebuilds the FTS index before returning
|
the glean runs in the foreground and rebuilds the FTS index before returning
|
||||||
(same behaviour as local sources — callers can rely on the count being final
|
(same behaviour as local sources — callers can rely on the count being final
|
||||||
when the response arrives).
|
when the response arrives).
|
||||||
|
|
||||||
|
Use ``?force=true`` to bypass the fingerprint cache and re-glean the file
|
||||||
|
even if mtime and size appear unchanged since the last run.
|
||||||
"""
|
"""
|
||||||
sources_file = PATTERN_DIR / "sources.yaml"
|
sources_file = PATTERN_DIR / "sources.yaml"
|
||||||
if not sources_file.exists():
|
if not sources_file.exists():
|
||||||
|
|
@ -536,6 +543,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
||||||
|
|
||||||
if src.get("transport") == "ssh":
|
if src.get("transport") == "ssh":
|
||||||
# SSH sources: open connection, glean all items, rebuild FTS inline.
|
# SSH sources: open connection, glean all items, rebuild FTS inline.
|
||||||
|
# Fingerprint skipping applies only to local file sources.
|
||||||
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
|
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
|
||||||
return {"source_id": source_id, "gleaned": sum(stats.values())}
|
return {"source_id": source_id, "gleaned": sum(stats.values())}
|
||||||
|
|
||||||
|
|
@ -543,7 +551,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
||||||
src_path = Path(src["path"])
|
src_path = Path(src["path"])
|
||||||
if not src_path.exists():
|
if not src_path.exists():
|
||||||
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
|
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
|
||||||
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE)
|
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE, force=force)
|
||||||
background_tasks.add_task(build_fts_index, DB_PATH)
|
background_tasks.add_task(build_fts_index, DB_PATH)
|
||||||
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
|
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
|
||||||
|
|
||||||
|
|
@ -656,8 +664,14 @@ def glean_task_status() -> dict:
|
||||||
|
|
||||||
|
|
||||||
@router.post("/api/tasks/glean")
|
@router.post("/api/tasks/glean")
|
||||||
async def trigger_glean() -> dict:
|
async def trigger_glean(
|
||||||
"""Manually trigger a glean of all configured sources. No-ops if already running."""
|
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean all sources")] = False,
|
||||||
|
) -> dict:
|
||||||
|
"""Manually trigger a glean of all configured sources. No-ops if already running.
|
||||||
|
|
||||||
|
Use ``?force=true`` to bypass the fingerprint cache and re-glean every local
|
||||||
|
file source even when mtime and size are unchanged since the last run.
|
||||||
|
"""
|
||||||
sources_file = PATTERN_DIR / "sources.yaml"
|
sources_file = PATTERN_DIR / "sources.yaml"
|
||||||
if not sources_file.exists():
|
if not sources_file.exists():
|
||||||
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
|
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
|
||||||
|
|
@ -665,6 +679,7 @@ async def trigger_glean() -> dict:
|
||||||
sources_file, DB_PATH, PATTERN_FILE,
|
sources_file, DB_PATH, PATTERN_FILE,
|
||||||
submit_endpoint=SUBMIT_ENDPOINT or None,
|
submit_endpoint=SUBMIT_ENDPOINT or None,
|
||||||
source_host=SOURCE_HOST,
|
source_host=SOURCE_HOST,
|
||||||
|
force=force,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -121,8 +121,13 @@ async def run_once(
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
submit_endpoint: str | None = None,
|
submit_endpoint: str | None = None,
|
||||||
source_host: str = "unknown",
|
source_host: str = "unknown",
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Ingest all sources once, then submit matched entries if configured."""
|
"""Ingest all sources once, then submit matched entries if configured.
|
||||||
|
|
||||||
|
Pass ``force=True`` to bypass fingerprint checks and re-glean all local
|
||||||
|
file sources regardless of whether they appear unchanged.
|
||||||
|
"""
|
||||||
if _lock.locked():
|
if _lock.locked():
|
||||||
return {"ok": False, "error": "glean already running", "skipped": True}
|
return {"ok": False, "error": "glean already running", "skipped": True}
|
||||||
|
|
||||||
|
|
@ -133,7 +138,7 @@ async def run_once(
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
stats: dict[str, int] = await loop.run_in_executor(
|
stats: dict[str, int] = await loop.run_in_executor(
|
||||||
None,
|
None,
|
||||||
lambda: glean_sources(sources_file, db_path, pattern_file),
|
lambda: glean_sources(sources_file, db_path, pattern_file, force=force),
|
||||||
)
|
)
|
||||||
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
||||||
_state.last_run_at = started.isoformat()
|
_state.last_run_at = started.isoformat()
|
||||||
|
|
|
||||||
236
tests/test_glean_fingerprint.py
Normal file
236
tests/test_glean_fingerprint.py
Normal file
|
|
@ -0,0 +1,236 @@
|
||||||
|
"""Tests for fingerprint-based incremental glean skipping (issue #30).
|
||||||
|
|
||||||
|
Verifies that _glean_files() (and its public wrappers) skip local files whose
|
||||||
|
mtime+size fingerprint has not changed since the last glean, and that force=True
|
||||||
|
bypasses that check.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.glean.pipeline import (
|
||||||
|
_fingerprint,
|
||||||
|
_fp_unchanged,
|
||||||
|
_save_fingerprint,
|
||||||
|
ensure_schema,
|
||||||
|
glean_dir,
|
||||||
|
glean_file,
|
||||||
|
)
|
||||||
|
from app.glean.base import now_iso
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def db_path(tmp_path: Path) -> Path:
|
||||||
|
path = tmp_path / "test.db"
|
||||||
|
ensure_schema(path)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def log_file(tmp_path: Path) -> Path:
|
||||||
|
"""A minimal plaintext log file."""
|
||||||
|
f = tmp_path / "test.log"
|
||||||
|
f.write_text("May 24 10:00:00 heimdall kernel: test message\n")
|
||||||
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
# ── Unit: fingerprint helpers ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestFingerprintHelpers:
|
||||||
|
def test_fingerprint_returns_mtime_and_size(self, log_file: Path) -> None:
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
st = log_file.stat()
|
||||||
|
assert mtime == st.st_mtime
|
||||||
|
assert size == st.st_size
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime, size) is False
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||||
|
conn.commit()
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime, size) is True
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||||
|
conn.commit()
|
||||||
|
# Simulate size change (new content appended)
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime, size + 1) is False
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||||
|
conn.commit()
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime + 1.0, size) is False
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_save_fingerprint_upserts(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
"""Second save with different values replaces the first (UPSERT semantics)."""
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
_save_fingerprint(conn, log_file, 1000.0, 100, "2026-01-01T00:00:00Z")
|
||||||
|
conn.commit()
|
||||||
|
_save_fingerprint(conn, log_file, 2000.0, 200, "2026-01-02T00:00:00Z")
|
||||||
|
conn.commit()
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()
|
||||||
|
assert row == (2000.0, 200)
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: glean_file skipping ─────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGleanFileFingerprint:
|
||||||
|
def test_first_glean_writes_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
glean_file(log_file, db_path)
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert row is not None
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
assert row == (mtime, size)
|
||||||
|
|
||||||
|
def test_second_glean_skips_unchanged_file(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
stats_first = glean_file(log_file, db_path)
|
||||||
|
count_first = sum(stats_first.values())
|
||||||
|
|
||||||
|
# Re-glean without touching the file — should produce 0 new entries.
|
||||||
|
stats_second = glean_file(log_file, db_path)
|
||||||
|
count_second = sum(stats_second.values())
|
||||||
|
|
||||||
|
assert count_first >= 1, "First glean should find at least one entry"
|
||||||
|
assert count_second == 0, "Second glean should skip unchanged file"
|
||||||
|
|
||||||
|
def test_second_glean_runs_when_file_grows(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
glean_file(log_file, db_path)
|
||||||
|
|
||||||
|
# Append a new line and update mtime by rewriting.
|
||||||
|
original = log_file.read_text()
|
||||||
|
log_file.write_text(original + "May 24 10:01:00 heimdall kernel: second message\n")
|
||||||
|
|
||||||
|
stats_second = glean_file(log_file, db_path)
|
||||||
|
# INSERT OR IGNORE means the original entry won't re-count, but parsing
|
||||||
|
# does happen — at minimum the new line is processed.
|
||||||
|
assert sum(stats_second.values()) >= 0 # glean ran (not skipped)
|
||||||
|
|
||||||
|
# Confirm fingerprint updated to new size.
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert row is not None
|
||||||
|
assert row[0] == log_file.stat().st_size
|
||||||
|
|
||||||
|
def test_force_bypasses_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
glean_file(log_file, db_path)
|
||||||
|
|
||||||
|
# Without force: skipped.
|
||||||
|
stats_no_force = glean_file(log_file, db_path)
|
||||||
|
assert sum(stats_no_force.values()) == 0
|
||||||
|
|
||||||
|
# With force: glean runs (INSERT OR IGNORE means count may be 0, but
|
||||||
|
# we verify the fingerprint was re-saved with a fresh gleaned_at).
|
||||||
|
conn_before = sqlite3.connect(str(db_path))
|
||||||
|
ts_before = conn_before.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_before.close()
|
||||||
|
|
||||||
|
time.sleep(0.01) # ensure gleaned_at advances
|
||||||
|
glean_file(log_file, db_path, force=True)
|
||||||
|
|
||||||
|
conn_after = sqlite3.connect(str(db_path))
|
||||||
|
ts_after = conn_after.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_after.close()
|
||||||
|
|
||||||
|
assert ts_after > ts_before, "force=True should update gleaned_at timestamp"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: glean_dir skipping ──────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGleanDirFingerprint:
|
||||||
|
def test_glean_dir_skips_unchanged_on_second_run(self, db_path: Path, tmp_path: Path) -> None:
|
||||||
|
log1 = tmp_path / "a.log"
|
||||||
|
log2 = tmp_path / "b.log"
|
||||||
|
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
||||||
|
log2.write_text("May 24 10:00:00 heimdall kernel: msg two\n")
|
||||||
|
|
||||||
|
glean_dir(tmp_path, db_path)
|
||||||
|
|
||||||
|
stats_second = glean_dir(tmp_path, db_path)
|
||||||
|
assert sum(stats_second.values()) == 0, "Both unchanged files should be skipped"
|
||||||
|
|
||||||
|
def test_glean_dir_force_reruns_all(self, db_path: Path, tmp_path: Path) -> None:
|
||||||
|
log1 = tmp_path / "a.log"
|
||||||
|
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
||||||
|
|
||||||
|
glean_dir(tmp_path, db_path)
|
||||||
|
|
||||||
|
# force=True: runs even though nothing changed; INSERT OR IGNORE keeps DB clean.
|
||||||
|
conn_before = sqlite3.connect(str(db_path))
|
||||||
|
ts_before = conn_before.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log1),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_before.close()
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
glean_dir(tmp_path, db_path, force=True)
|
||||||
|
|
||||||
|
conn_after = sqlite3.connect(str(db_path))
|
||||||
|
ts_after = conn_after.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log1),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_after.close()
|
||||||
|
|
||||||
|
assert ts_after > ts_before
|
||||||
|
|
||||||
|
|
||||||
|
# ── Schema: ensure fingerprints table created ─────────────────────────────────
|
||||||
|
|
||||||
|
class TestEnsureSchema:
|
||||||
|
def test_fingerprints_table_exists_after_ensure_schema(self, tmp_path: Path) -> None:
|
||||||
|
db = tmp_path / "fresh.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
conn = sqlite3.connect(str(db))
|
||||||
|
tables = {
|
||||||
|
row[0]
|
||||||
|
for row in conn.execute(
|
||||||
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||||
|
).fetchall()
|
||||||
|
}
|
||||||
|
conn.close()
|
||||||
|
assert "glean_fingerprints" in tables
|
||||||
|
|
||||||
|
def test_ensure_schema_idempotent(self, tmp_path: Path) -> None:
|
||||||
|
"""Calling ensure_schema twice on the same DB must not raise."""
|
||||||
|
db = tmp_path / "fresh.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
ensure_schema(db) # second call — should be a no-op
|
||||||
Loading…
Reference in a new issue