turnstone/tests/test_discover_scan.py
pyr0ball 600e5a9eac feat(sources): context-aware filesystem log scanner (#23)
Add scan_log_directories() to discover.py that recursively walks
/var/log and /opt, filters to readable log files, and scores each
candidate by recency (mtime, 0.7 weight), file size (0.3), and
keyword match against an optional problem-context query (shifts
weights to 0.4/0.2/0.4 when a query is provided).

- GET /api/setup/scan?query=...&max_results=N — new API endpoint
- SourcesView: "Scan" button opens a panel with ranked candidates,
  checkboxes, and "Add selected" to write to sources.yaml
- 13 new unit tests, 466 passing total

Closes: #23
2026-06-14 14:01:45 -07:00

133 lines
4.6 KiB
Python

"""Tests for scan_log_directories in app.services.discover."""
from __future__ import annotations
import os
import time
from pathlib import Path
import pytest
from app.services.discover import scan_log_directories, _path_to_source_id
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_log(tmp_path: Path, name: str, content: str = "hello\n", age_days: float = 0) -> Path:
p = tmp_path / name
p.write_text(content)
mtime = time.time() - age_days * 86400
os.utime(p, (mtime, mtime))
return p
# ---------------------------------------------------------------------------
# _path_to_source_id
# ---------------------------------------------------------------------------
def test_path_to_source_id_basic():
result = _path_to_source_id(Path("/var/log/nginx/access.log"))
assert result.startswith("var-log-nginx-access")
assert "/" not in result
assert " " not in result
def test_path_to_source_id_max_length():
long_path = Path("/" + "a" * 200 + ".log")
assert len(_path_to_source_id(long_path)) <= 64
# ---------------------------------------------------------------------------
# scan_log_directories
# ---------------------------------------------------------------------------
def test_scan_finds_log_files(tmp_path):
_make_log(tmp_path, "app.log", "error: something\n")
_make_log(tmp_path, "system.log", "kernel: ok\n")
results = scan_log_directories(dirs=[str(tmp_path)])
paths = [r["path"] for r in results]
assert str(tmp_path / "app.log") in paths
assert str(tmp_path / "system.log") in paths
def test_scan_ignores_empty_files(tmp_path):
_make_log(tmp_path, "empty.log", "")
results = scan_log_directories(dirs=[str(tmp_path)])
assert not any(r["label"] == "empty.log" for r in results)
def test_scan_ignores_non_log_extensions(tmp_path):
(tmp_path / "config.yaml").write_text("key: value\n")
(tmp_path / "data.json").write_text('{"a":1}\n')
results = scan_log_directories(dirs=[str(tmp_path)])
names = [r["label"] for r in results]
assert "config.yaml" not in names
assert "data.json" not in names
def test_scan_ignores_compressed(tmp_path):
_make_log(tmp_path, "old.log.gz", "compressed content")
results = scan_log_directories(dirs=[str(tmp_path)])
assert not any(r["label"].endswith(".gz") for r in results)
def test_scan_respects_max_results(tmp_path):
for i in range(20):
_make_log(tmp_path, f"app{i}.log", f"log line {i}\n")
results = scan_log_directories(dirs=[str(tmp_path)], max_results=5)
assert len(results) <= 5
def test_scan_recent_files_score_higher(tmp_path):
recent = _make_log(tmp_path, "recent.log", "new stuff\n", age_days=0)
old = _make_log(tmp_path, "old.log", "old stuff\n", age_days=60)
results = scan_log_directories(dirs=[str(tmp_path)])
scores = {r["path"]: r["score"] for r in results}
assert scores[str(recent)] > scores[str(old)]
def test_scan_keyword_match_boosts_score(tmp_path):
nginx_log = _make_log(tmp_path, "nginx.log", "GET / 200\n", age_days=5)
other_log = _make_log(tmp_path, "kernel.log", "boot ok\n", age_days=5)
results = scan_log_directories(query="nginx 502 error", dirs=[str(tmp_path)])
scores = {r["path"]: r["score"] for r in results}
assert scores[str(nginx_log)] > scores[str(other_log)]
def test_scan_returns_required_fields(tmp_path):
_make_log(tmp_path, "test.log", "data\n")
results = scan_log_directories(dirs=[str(tmp_path)])
assert results
r = results[0]
assert r["type"] == "file"
assert "id" in r
assert "path" in r
assert "label" in r
assert "size_bytes" in r
assert "mtime" in r
assert "score" in r
assert r["available"] is True
def test_scan_missing_dir_is_graceful():
results = scan_log_directories(dirs=["/nonexistent/path/xyz"])
assert results == []
def test_scan_subdirectory_recursive(tmp_path):
subdir = tmp_path / "subapp"
subdir.mkdir()
_make_log(subdir, "subapp.log", "nested log\n")
results = scan_log_directories(dirs=[str(tmp_path)])
paths = [r["path"] for r in results]
assert str(subdir / "subapp.log") in paths
def test_scan_no_query_weights_recency_heavily(tmp_path):
"""Without a query, recency (0.7) dominates over size (0.3)."""
fresh = _make_log(tmp_path, "fresh.log", "x" * 100, age_days=0)
stale = _make_log(tmp_path, "stale.log", "x" * 10000, age_days=20)
results = scan_log_directories(query=None, dirs=[str(tmp_path)])
scores = {r["path"]: r["score"] for r in results}
assert scores[str(fresh)] > scores[str(stale)]