Adds scripts/gen_corpus.py that produces realistic-but-artificial log files across all four supported formats (journald JSON, docker envelope, qBittorrent hotio, AVCX plaintext). Output feeds directly into glean_corpus.py for demo environments and parser regression tests with no production data required. - Seed-based RNG with independent per-source sub-streams (same seed = same sequence for each file regardless of source count changes) - Controllable time range, event density, and error injection rate - Severity distribution mirrors real infrastructure (70% INFO, ~6% ERROR, ~2% CRITICAL) with adjustable boost via --error-rate - 17 tests covering output structure, reproducibility, format correctness, parser round-trip, and CLI acceptance criteria Also fixes a latent bug in app/glean/plaintext.py: ISO 8601 timestamps were silently failing to parse because the T separator was normalised to space in the input string but the strptime format string still contained T. Fix: apply the same normalisation to the format before calling strptime. Closes: #46
197 lines
8.8 KiB
Python
197 lines
8.8 KiB
Python
"""Tests for scripts/gen_corpus.py synthetic log generator."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from scripts.gen_corpus import generate, main
|
|
|
|
# Fixed reference time keeps timestamps deterministic across test runs
|
|
_REF_TIME = datetime(2026, 6, 10, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
# ── Helpers ────────────────────────────────────────────────────────────────────
|
|
|
|
def _run(tmp_path: Path, days: int = 1, seed: int = 42, error_rate: float = 0.05) -> dict[str, int]:
|
|
return generate(tmp_path, days=days, seed=seed, error_rate=error_rate, reference_time=_REF_TIME)
|
|
|
|
|
|
# ── Output structure ───────────────────────────────────────────────────────────
|
|
|
|
class TestOutputStructure:
|
|
def test_creates_all_four_files(self, tmp_path: Path) -> None:
|
|
_run(tmp_path)
|
|
assert (tmp_path / "journald" / "system.jsonl").exists()
|
|
assert (tmp_path / "docker" / "services.jsonl").exists()
|
|
assert (tmp_path / "qbittorrent" / "qbt.log").exists()
|
|
assert (tmp_path / "avcx" / "device.log").exists()
|
|
|
|
def test_returns_line_counts(self, tmp_path: Path) -> None:
|
|
totals = _run(tmp_path)
|
|
assert len(totals) == 4
|
|
assert all(v > 0 for v in totals.values())
|
|
|
|
|
|
# ── Reproducibility ────────────────────────────────────────────────────────────
|
|
|
|
class TestReproducibility:
|
|
def test_same_seed_same_output(self, tmp_path: Path) -> None:
|
|
out_a = tmp_path / "a"
|
|
out_b = tmp_path / "b"
|
|
_run(out_a, seed=99)
|
|
_run(out_b, seed=99)
|
|
for sub in ["journald/system.jsonl", "docker/services.jsonl"]:
|
|
assert (out_a / sub).read_text() == (out_b / sub).read_text()
|
|
|
|
def test_different_seeds_differ(self, tmp_path: Path) -> None:
|
|
out_a = tmp_path / "a"
|
|
out_b = tmp_path / "b"
|
|
_run(out_a, seed=1)
|
|
_run(out_b, seed=2)
|
|
assert (out_a / "journald/system.jsonl").read_text() != (out_b / "journald/system.jsonl").read_text()
|
|
|
|
|
|
# ── Journald format ────────────────────────────────────────────────────────────
|
|
|
|
class TestJournaldFormat:
|
|
def test_valid_json_lines(self, tmp_path: Path) -> None:
|
|
_run(tmp_path)
|
|
lines = (tmp_path / "journald/system.jsonl").read_text().splitlines()
|
|
for line in lines[:100]:
|
|
obj = json.loads(line)
|
|
assert "__REALTIME_TIMESTAMP" in obj
|
|
assert "MESSAGE" in obj
|
|
assert "PRIORITY" in obj
|
|
|
|
def test_timestamp_is_microseconds(self, tmp_path: Path) -> None:
|
|
_run(tmp_path)
|
|
lines = (tmp_path / "journald/system.jsonl").read_text().splitlines()
|
|
ts = int(json.loads(lines[0])["__REALTIME_TIMESTAMP"])
|
|
# microseconds since epoch — should be > year 2020
|
|
assert ts > 1_577_836_800_000_000
|
|
|
|
def test_parseable_by_journald_glean(self, tmp_path: Path) -> None:
|
|
from app.glean.journald import parse
|
|
_run(tmp_path)
|
|
with (tmp_path / "journald/system.jsonl").open() as fh:
|
|
entries = list(parse(fh, "test", []))
|
|
assert len(entries) > 0
|
|
severities = {e.severity for e in entries if e.severity}
|
|
assert severities <= {"INFO", "DEBUG", "WARN", "ERROR", "CRITICAL"}
|
|
|
|
|
|
# ── Docker format ──────────────────────────────────────────────────────────────
|
|
|
|
class TestDockerFormat:
|
|
def test_valid_json_lines(self, tmp_path: Path) -> None:
|
|
_run(tmp_path)
|
|
lines = (tmp_path / "docker/services.jsonl").read_text().splitlines()
|
|
for line in lines[:100]:
|
|
obj = json.loads(line)
|
|
assert "SOURCE" in obj
|
|
assert "MESSAGE" in obj
|
|
|
|
def test_parseable_by_docker_glean(self, tmp_path: Path) -> None:
|
|
from app.glean.docker_log import parse
|
|
_run(tmp_path)
|
|
with (tmp_path / "docker/services.jsonl").open() as fh:
|
|
entries = list(parse(fh, "test", []))
|
|
assert len(entries) > 0
|
|
# Severity should be detected in most entries (messages embed level= / LEVEL:)
|
|
detected = [e for e in entries if e.severity is not None]
|
|
assert len(detected) / len(entries) > 0.8
|
|
|
|
|
|
# ── qBittorrent format ─────────────────────────────────────────────────────────
|
|
|
|
class TestQbittorrentFormat:
|
|
def test_hotio_format_lines(self, tmp_path: Path) -> None:
|
|
_run(tmp_path)
|
|
lines = (tmp_path / "qbittorrent/qbt.log").read_text().splitlines()
|
|
import re
|
|
pattern = re.compile(r"^\([NIWC]\) \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} - .+$")
|
|
assert all(pattern.match(line) for line in lines[:50])
|
|
|
|
def test_parseable_by_qbt_glean(self, tmp_path: Path) -> None:
|
|
from app.glean.qbittorrent import parse
|
|
_run(tmp_path)
|
|
with (tmp_path / "qbittorrent/qbt.log").open() as fh:
|
|
entries = list(parse(fh, "test", []))
|
|
assert len(entries) > 0
|
|
severities = {e.severity for e in entries if e.severity}
|
|
assert severities <= {"INFO", "WARN", "CRITICAL"}
|
|
|
|
|
|
# ── AVCX format ────────────────────────────────────────────────────────────────
|
|
|
|
class TestAvcxFormat:
|
|
def test_iso_timestamp_prefix(self, tmp_path: Path) -> None:
|
|
_run(tmp_path)
|
|
lines = (tmp_path / "avcx/device.log").read_text().splitlines()
|
|
import re
|
|
pattern = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} \[.+\] .+$")
|
|
assert all(pattern.match(line) for line in lines[:50])
|
|
|
|
def test_parseable_by_plaintext_glean(self, tmp_path: Path) -> None:
|
|
from app.glean.plaintext import parse
|
|
_run(tmp_path)
|
|
with (tmp_path / "avcx/device.log").open() as fh:
|
|
entries = list(parse(fh, "test", []))
|
|
assert len(entries) > 0
|
|
# ISO timestamps should parse cleanly
|
|
timestamped = [e for e in entries if e.timestamp_iso]
|
|
assert len(timestamped) / len(entries) > 0.95
|
|
|
|
|
|
# ── Error rate ─────────────────────────────────────────────────────────────────
|
|
|
|
class TestErrorRate:
|
|
def test_high_error_rate_increases_errors(self, tmp_path: Path) -> None:
|
|
from app.glean.journald import parse
|
|
|
|
low = tmp_path / "low"
|
|
high = tmp_path / "high"
|
|
_run(low, seed=7, error_rate=0.01)
|
|
_run(high, seed=7, error_rate=0.50)
|
|
|
|
def error_ratio(path: Path) -> float:
|
|
with path.open() as fh:
|
|
entries = list(parse(fh, "test", []))
|
|
errs = sum(1 for e in entries if e.severity in ("ERROR", "CRITICAL"))
|
|
return errs / len(entries) if entries else 0.0
|
|
|
|
assert error_ratio(high / "journald/system.jsonl") > error_ratio(low / "journald/system.jsonl")
|
|
|
|
def test_invalid_error_rate_returns_nonzero(self, tmp_path: Path) -> None:
|
|
rc = main(["--days", "1", "--out", str(tmp_path), "--error-rate", "1.5"])
|
|
assert rc != 0
|
|
|
|
|
|
# ── CLI ────────────────────────────────────────────────────────────────────────
|
|
|
|
class TestCLI:
|
|
def test_acceptance_criteria(self, tmp_path: Path) -> None:
|
|
"""Acceptance: --days 7 --out <dir> produces a gleanable corpus with varied severities."""
|
|
from app.glean.journald import parse
|
|
|
|
rc = main(["--days", "7", "--out", str(tmp_path)])
|
|
assert rc == 0
|
|
|
|
with (tmp_path / "journald/system.jsonl").open() as fh:
|
|
entries = list(parse(fh, "test", []))
|
|
|
|
severities = {e.severity for e in entries if e.severity}
|
|
assert {"INFO", "WARN", "ERROR", "CRITICAL"}.issubset(severities)
|
|
assert len(entries) > 100_000 # 7 days of ~86k/day
|
|
|
|
def test_missing_out_fails(self, tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
main(["--days", "1"])
|
|
assert exc_info.value.code != 0
|