"""Tests for scripts/gen_corpus.py synthetic log generator.""" from __future__ import annotations import json import sys from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from datetime import datetime, timezone from scripts.gen_corpus import generate, main # Fixed reference time keeps timestamps deterministic across test runs _REF_TIME = datetime(2026, 6, 10, 12, 0, 0, tzinfo=timezone.utc) # ── Helpers ──────────────────────────────────────────────────────────────────── def _run(tmp_path: Path, days: int = 1, seed: int = 42, error_rate: float = 0.05) -> dict[str, int]: return generate(tmp_path, days=days, seed=seed, error_rate=error_rate, reference_time=_REF_TIME) # ── Output structure ─────────────────────────────────────────────────────────── class TestOutputStructure: def test_creates_all_four_files(self, tmp_path: Path) -> None: _run(tmp_path) assert (tmp_path / "journald" / "system.jsonl").exists() assert (tmp_path / "docker" / "services.jsonl").exists() assert (tmp_path / "qbittorrent" / "qbt.log").exists() assert (tmp_path / "ext_device" / "device.log").exists() def test_returns_line_counts(self, tmp_path: Path) -> None: totals = _run(tmp_path) assert len(totals) == 4 assert all(v > 0 for v in totals.values()) # ── Reproducibility ──────────────────────────────────────────────────────────── class TestReproducibility: def test_same_seed_same_output(self, tmp_path: Path) -> None: out_a = tmp_path / "a" out_b = tmp_path / "b" _run(out_a, seed=99) _run(out_b, seed=99) for sub in ["journald/system.jsonl", "docker/services.jsonl"]: assert (out_a / sub).read_text() == (out_b / sub).read_text() def test_different_seeds_differ(self, tmp_path: Path) -> None: out_a = tmp_path / "a" out_b = tmp_path / "b" _run(out_a, seed=1) _run(out_b, seed=2) assert (out_a / "journald/system.jsonl").read_text() != (out_b / "journald/system.jsonl").read_text() # ── Journald format ──────────────────────────────────────────────────────────── class TestJournaldFormat: def test_valid_json_lines(self, tmp_path: Path) -> None: _run(tmp_path) lines = (tmp_path / "journald/system.jsonl").read_text().splitlines() for line in lines[:100]: obj = json.loads(line) assert "__REALTIME_TIMESTAMP" in obj assert "MESSAGE" in obj assert "PRIORITY" in obj def test_timestamp_is_microseconds(self, tmp_path: Path) -> None: _run(tmp_path) lines = (tmp_path / "journald/system.jsonl").read_text().splitlines() ts = int(json.loads(lines[0])["__REALTIME_TIMESTAMP"]) # microseconds since epoch — should be > year 2020 assert ts > 1_577_836_800_000_000 def test_parseable_by_journald_glean(self, tmp_path: Path) -> None: from app.glean.journald import parse _run(tmp_path) with (tmp_path / "journald/system.jsonl").open() as fh: entries = list(parse(fh, "test", [])) assert len(entries) > 0 severities = {e.severity for e in entries if e.severity} assert severities <= {"INFO", "DEBUG", "WARN", "ERROR", "CRITICAL"} # ── Docker format ────────────────────────────────────────────────────────────── class TestDockerFormat: def test_valid_json_lines(self, tmp_path: Path) -> None: _run(tmp_path) lines = (tmp_path / "docker/services.jsonl").read_text().splitlines() for line in lines[:100]: obj = json.loads(line) assert "SOURCE" in obj assert "MESSAGE" in obj def test_parseable_by_docker_glean(self, tmp_path: Path) -> None: from app.glean.docker_log import parse _run(tmp_path) with (tmp_path / "docker/services.jsonl").open() as fh: entries = list(parse(fh, "test", [])) assert len(entries) > 0 # Severity should be detected in most entries (messages embed level= / LEVEL:) detected = [e for e in entries if e.severity is not None] assert len(detected) / len(entries) > 0.8 # ── qBittorrent format ───────────────────────────────────────────────────────── class TestQbittorrentFormat: def test_hotio_format_lines(self, tmp_path: Path) -> None: _run(tmp_path) lines = (tmp_path / "qbittorrent/qbt.log").read_text().splitlines() import re pattern = re.compile(r"^\([NIWC]\) \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} - .+$") assert all(pattern.match(line) for line in lines[:50]) def test_parseable_by_qbt_glean(self, tmp_path: Path) -> None: from app.glean.qbittorrent import parse _run(tmp_path) with (tmp_path / "qbittorrent/qbt.log").open() as fh: entries = list(parse(fh, "test", [])) assert len(entries) > 0 severities = {e.severity for e in entries if e.severity} assert severities <= {"INFO", "WARN", "CRITICAL"} # ── EXT_DEVICE format ──────────────────────────────────────────────────────────────── class TestAvcxFormat: def test_iso_timestamp_prefix(self, tmp_path: Path) -> None: _run(tmp_path) lines = (tmp_path / "ext_device/device.log").read_text().splitlines() import re pattern = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} \[.+\] .+$") assert all(pattern.match(line) for line in lines[:50]) def test_parseable_by_plaintext_glean(self, tmp_path: Path) -> None: from app.glean.plaintext import parse _run(tmp_path) with (tmp_path / "ext_device/device.log").open() as fh: entries = list(parse(fh, "test", [])) assert len(entries) > 0 # ISO timestamps should parse cleanly timestamped = [e for e in entries if e.timestamp_iso] assert len(timestamped) / len(entries) > 0.95 # ── Error rate ───────────────────────────────────────────────────────────────── class TestErrorRate: def test_high_error_rate_increases_errors(self, tmp_path: Path) -> None: from app.glean.journald import parse low = tmp_path / "low" high = tmp_path / "high" _run(low, seed=7, error_rate=0.01) _run(high, seed=7, error_rate=0.50) def error_ratio(path: Path) -> float: with path.open() as fh: entries = list(parse(fh, "test", [])) errs = sum(1 for e in entries if e.severity in ("ERROR", "CRITICAL")) return errs / len(entries) if entries else 0.0 assert error_ratio(high / "journald/system.jsonl") > error_ratio(low / "journald/system.jsonl") def test_invalid_error_rate_returns_nonzero(self, tmp_path: Path) -> None: rc = main(["--days", "1", "--out", str(tmp_path), "--error-rate", "1.5"]) assert rc != 0 # ── CLI ──────────────────────────────────────────────────────────────────────── class TestCLI: def test_acceptance_criteria(self, tmp_path: Path) -> None: """Acceptance: --days 7 --out