Add scan_log_directories() to discover.py that recursively walks /var/log and /opt, filters to readable log files, and scores each candidate by recency (mtime, 0.7 weight), file size (0.3), and keyword match against an optional problem-context query (shifts weights to 0.4/0.2/0.4 when a query is provided). - GET /api/setup/scan?query=...&max_results=N — new API endpoint - SourcesView: "Scan" button opens a panel with ranked candidates, checkboxes, and "Add selected" to write to sources.yaml - 13 new unit tests, 466 passing total Closes: #23
133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
"""Tests for scan_log_directories in app.services.discover."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from app.services.discover import scan_log_directories, _path_to_source_id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_log(tmp_path: Path, name: str, content: str = "hello\n", age_days: float = 0) -> Path:
|
|
p = tmp_path / name
|
|
p.write_text(content)
|
|
mtime = time.time() - age_days * 86400
|
|
os.utime(p, (mtime, mtime))
|
|
return p
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _path_to_source_id
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_path_to_source_id_basic():
|
|
result = _path_to_source_id(Path("/var/log/nginx/access.log"))
|
|
assert result.startswith("var-log-nginx-access")
|
|
assert "/" not in result
|
|
assert " " not in result
|
|
|
|
|
|
def test_path_to_source_id_max_length():
|
|
long_path = Path("/" + "a" * 200 + ".log")
|
|
assert len(_path_to_source_id(long_path)) <= 64
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# scan_log_directories
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_scan_finds_log_files(tmp_path):
|
|
_make_log(tmp_path, "app.log", "error: something\n")
|
|
_make_log(tmp_path, "system.log", "kernel: ok\n")
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
paths = [r["path"] for r in results]
|
|
assert str(tmp_path / "app.log") in paths
|
|
assert str(tmp_path / "system.log") in paths
|
|
|
|
|
|
def test_scan_ignores_empty_files(tmp_path):
|
|
_make_log(tmp_path, "empty.log", "")
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
assert not any(r["label"] == "empty.log" for r in results)
|
|
|
|
|
|
def test_scan_ignores_non_log_extensions(tmp_path):
|
|
(tmp_path / "config.yaml").write_text("key: value\n")
|
|
(tmp_path / "data.json").write_text('{"a":1}\n')
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
names = [r["label"] for r in results]
|
|
assert "config.yaml" not in names
|
|
assert "data.json" not in names
|
|
|
|
|
|
def test_scan_ignores_compressed(tmp_path):
|
|
_make_log(tmp_path, "old.log.gz", "compressed content")
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
assert not any(r["label"].endswith(".gz") for r in results)
|
|
|
|
|
|
def test_scan_respects_max_results(tmp_path):
|
|
for i in range(20):
|
|
_make_log(tmp_path, f"app{i}.log", f"log line {i}\n")
|
|
results = scan_log_directories(dirs=[str(tmp_path)], max_results=5)
|
|
assert len(results) <= 5
|
|
|
|
|
|
def test_scan_recent_files_score_higher(tmp_path):
|
|
recent = _make_log(tmp_path, "recent.log", "new stuff\n", age_days=0)
|
|
old = _make_log(tmp_path, "old.log", "old stuff\n", age_days=60)
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
scores = {r["path"]: r["score"] for r in results}
|
|
assert scores[str(recent)] > scores[str(old)]
|
|
|
|
|
|
def test_scan_keyword_match_boosts_score(tmp_path):
|
|
nginx_log = _make_log(tmp_path, "nginx.log", "GET / 200\n", age_days=5)
|
|
other_log = _make_log(tmp_path, "kernel.log", "boot ok\n", age_days=5)
|
|
results = scan_log_directories(query="nginx 502 error", dirs=[str(tmp_path)])
|
|
scores = {r["path"]: r["score"] for r in results}
|
|
assert scores[str(nginx_log)] > scores[str(other_log)]
|
|
|
|
|
|
def test_scan_returns_required_fields(tmp_path):
|
|
_make_log(tmp_path, "test.log", "data\n")
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
assert results
|
|
r = results[0]
|
|
assert r["type"] == "file"
|
|
assert "id" in r
|
|
assert "path" in r
|
|
assert "label" in r
|
|
assert "size_bytes" in r
|
|
assert "mtime" in r
|
|
assert "score" in r
|
|
assert r["available"] is True
|
|
|
|
|
|
def test_scan_missing_dir_is_graceful():
|
|
results = scan_log_directories(dirs=["/nonexistent/path/xyz"])
|
|
assert results == []
|
|
|
|
|
|
def test_scan_subdirectory_recursive(tmp_path):
|
|
subdir = tmp_path / "subapp"
|
|
subdir.mkdir()
|
|
_make_log(subdir, "subapp.log", "nested log\n")
|
|
results = scan_log_directories(dirs=[str(tmp_path)])
|
|
paths = [r["path"] for r in results]
|
|
assert str(subdir / "subapp.log") in paths
|
|
|
|
|
|
def test_scan_no_query_weights_recency_heavily(tmp_path):
|
|
"""Without a query, recency (0.7) dominates over size (0.3)."""
|
|
fresh = _make_log(tmp_path, "fresh.log", "x" * 100, age_days=0)
|
|
stale = _make_log(tmp_path, "stale.log", "x" * 10000, age_days=20)
|
|
results = scan_log_directories(query=None, dirs=[str(tmp_path)])
|
|
scores = {r["path"]: r["score"] for r in results}
|
|
assert scores[str(fresh)] > scores[str(stale)]
|