turnstone/tests/test_gen_corpus.py
pyr0ball f904658d6f chore: replace vendor product name with generic ext_device throughout
- Rename _AVCX_CODES → _EXT_DEVICE_CODES, gen_avcx → gen_ext_device
- Rename corpus output directory avcx/ → ext_device/
- Update default.yaml placeholder pattern name and description
- Update tests to match new directory and class names
- Corresponding Forgejo issue titles updated (#43, #44, #54)
2026-06-13 22:03:26 -07:00

197 lines
8.8 KiB
Python

"""Tests for scripts/gen_corpus.py synthetic log generator."""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from datetime import datetime, timezone
from scripts.gen_corpus import generate, main
# Fixed reference time keeps timestamps deterministic across test runs
_REF_TIME = datetime(2026, 6, 10, 12, 0, 0, tzinfo=timezone.utc)
# ── Helpers ────────────────────────────────────────────────────────────────────
def _run(tmp_path: Path, days: int = 1, seed: int = 42, error_rate: float = 0.05) -> dict[str, int]:
return generate(tmp_path, days=days, seed=seed, error_rate=error_rate, reference_time=_REF_TIME)
# ── Output structure ───────────────────────────────────────────────────────────
class TestOutputStructure:
def test_creates_all_four_files(self, tmp_path: Path) -> None:
_run(tmp_path)
assert (tmp_path / "journald" / "system.jsonl").exists()
assert (tmp_path / "docker" / "services.jsonl").exists()
assert (tmp_path / "qbittorrent" / "qbt.log").exists()
assert (tmp_path / "ext_device" / "device.log").exists()
def test_returns_line_counts(self, tmp_path: Path) -> None:
totals = _run(tmp_path)
assert len(totals) == 4
assert all(v > 0 for v in totals.values())
# ── Reproducibility ────────────────────────────────────────────────────────────
class TestReproducibility:
def test_same_seed_same_output(self, tmp_path: Path) -> None:
out_a = tmp_path / "a"
out_b = tmp_path / "b"
_run(out_a, seed=99)
_run(out_b, seed=99)
for sub in ["journald/system.jsonl", "docker/services.jsonl"]:
assert (out_a / sub).read_text() == (out_b / sub).read_text()
def test_different_seeds_differ(self, tmp_path: Path) -> None:
out_a = tmp_path / "a"
out_b = tmp_path / "b"
_run(out_a, seed=1)
_run(out_b, seed=2)
assert (out_a / "journald/system.jsonl").read_text() != (out_b / "journald/system.jsonl").read_text()
# ── Journald format ────────────────────────────────────────────────────────────
class TestJournaldFormat:
def test_valid_json_lines(self, tmp_path: Path) -> None:
_run(tmp_path)
lines = (tmp_path / "journald/system.jsonl").read_text().splitlines()
for line in lines[:100]:
obj = json.loads(line)
assert "__REALTIME_TIMESTAMP" in obj
assert "MESSAGE" in obj
assert "PRIORITY" in obj
def test_timestamp_is_microseconds(self, tmp_path: Path) -> None:
_run(tmp_path)
lines = (tmp_path / "journald/system.jsonl").read_text().splitlines()
ts = int(json.loads(lines[0])["__REALTIME_TIMESTAMP"])
# microseconds since epoch — should be > year 2020
assert ts > 1_577_836_800_000_000
def test_parseable_by_journald_glean(self, tmp_path: Path) -> None:
from app.glean.journald import parse
_run(tmp_path)
with (tmp_path / "journald/system.jsonl").open() as fh:
entries = list(parse(fh, "test", []))
assert len(entries) > 0
severities = {e.severity for e in entries if e.severity}
assert severities <= {"INFO", "DEBUG", "WARN", "ERROR", "CRITICAL"}
# ── Docker format ──────────────────────────────────────────────────────────────
class TestDockerFormat:
def test_valid_json_lines(self, tmp_path: Path) -> None:
_run(tmp_path)
lines = (tmp_path / "docker/services.jsonl").read_text().splitlines()
for line in lines[:100]:
obj = json.loads(line)
assert "SOURCE" in obj
assert "MESSAGE" in obj
def test_parseable_by_docker_glean(self, tmp_path: Path) -> None:
from app.glean.docker_log import parse
_run(tmp_path)
with (tmp_path / "docker/services.jsonl").open() as fh:
entries = list(parse(fh, "test", []))
assert len(entries) > 0
# Severity should be detected in most entries (messages embed level= / LEVEL:)
detected = [e for e in entries if e.severity is not None]
assert len(detected) / len(entries) > 0.8
# ── qBittorrent format ─────────────────────────────────────────────────────────
class TestQbittorrentFormat:
def test_hotio_format_lines(self, tmp_path: Path) -> None:
_run(tmp_path)
lines = (tmp_path / "qbittorrent/qbt.log").read_text().splitlines()
import re
pattern = re.compile(r"^\([NIWC]\) \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} - .+$")
assert all(pattern.match(line) for line in lines[:50])
def test_parseable_by_qbt_glean(self, tmp_path: Path) -> None:
from app.glean.qbittorrent import parse
_run(tmp_path)
with (tmp_path / "qbittorrent/qbt.log").open() as fh:
entries = list(parse(fh, "test", []))
assert len(entries) > 0
severities = {e.severity for e in entries if e.severity}
assert severities <= {"INFO", "WARN", "CRITICAL"}
# ── Vendor device format ────────────────────────────────────────────────────────────────
class TestAvcxFormat:
def test_iso_timestamp_prefix(self, tmp_path: Path) -> None:
_run(tmp_path)
lines = (tmp_path / "ext_device/device.log").read_text().splitlines()
import re
pattern = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} \[.+\] .+$")
assert all(pattern.match(line) for line in lines[:50])
def test_parseable_by_plaintext_glean(self, tmp_path: Path) -> None:
from app.glean.plaintext import parse
_run(tmp_path)
with (tmp_path / "ext_device/device.log").open() as fh:
entries = list(parse(fh, "test", []))
assert len(entries) > 0
# ISO timestamps should parse cleanly
timestamped = [e for e in entries if e.timestamp_iso]
assert len(timestamped) / len(entries) > 0.95
# ── Error rate ─────────────────────────────────────────────────────────────────
class TestErrorRate:
def test_high_error_rate_increases_errors(self, tmp_path: Path) -> None:
from app.glean.journald import parse
low = tmp_path / "low"
high = tmp_path / "high"
_run(low, seed=7, error_rate=0.01)
_run(high, seed=7, error_rate=0.50)
def error_ratio(path: Path) -> float:
with path.open() as fh:
entries = list(parse(fh, "test", []))
errs = sum(1 for e in entries if e.severity in ("ERROR", "CRITICAL"))
return errs / len(entries) if entries else 0.0
assert error_ratio(high / "journald/system.jsonl") > error_ratio(low / "journald/system.jsonl")
def test_invalid_error_rate_returns_nonzero(self, tmp_path: Path) -> None:
rc = main(["--days", "1", "--out", str(tmp_path), "--error-rate", "1.5"])
assert rc != 0
# ── CLI ────────────────────────────────────────────────────────────────────────
class TestCLI:
def test_acceptance_criteria(self, tmp_path: Path) -> None:
"""Acceptance: --days 7 --out <dir> produces a gleanable corpus with varied severities."""
from app.glean.journald import parse
rc = main(["--days", "7", "--out", str(tmp_path)])
assert rc == 0
with (tmp_path / "journald/system.jsonl").open() as fh:
entries = list(parse(fh, "test", []))
severities = {e.severity for e in entries if e.severity}
assert {"INFO", "WARN", "ERROR", "CRITICAL"}.issubset(severities)
assert len(entries) > 100_000 # 7 days of ~86k/day
def test_missing_out_fails(self, tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
with pytest.raises(SystemExit) as exc_info:
main(["--days", "1"])
assert exc_info.value.code != 0