diff --git a/app/glean/plaintext.py b/app/glean/plaintext.py index a205fc0..65e36cd 100644 --- a/app/glean/plaintext.py +++ b/app/glean/plaintext.py @@ -32,10 +32,11 @@ def _extract_ts(line: str) -> tuple[str, str]: if m: ts_raw = m.group("ts") try: - # Strip fractional seconds / TZ for strptime compat + # Strip fractional seconds / TZ for strptime compat. + # Normalise ISO 8601 T-separator to space so strptime format matches. clean = re.sub(r"(\.\d+)?([Zz]|[+-]\d{2}:?\d{2})?$", "", ts_raw).strip() clean = clean.replace("T", " ") - dt = datetime.strptime(clean, fmt) + dt = datetime.strptime(clean, fmt.replace("T", " ")) if dt.year == 1900: dt = dt.replace(year=datetime.now().year) dt = dt.astimezone(timezone.utc) diff --git a/scripts/gen_corpus.py b/scripts/gen_corpus.py new file mode 100644 index 0000000..1ad9ae5 --- /dev/null +++ b/scripts/gen_corpus.py @@ -0,0 +1,383 @@ +"""Synthetic log corpus generator. + +Produces realistic-but-entirely-artificial log files for demos, load tests, +and parser regression suites — no production data required. + +Usage: + python scripts/gen_corpus.py --days 7 --out /tmp/demo-corpus/ + python scripts/gen_corpus.py --days 1 --out /tmp/test-run/ --seed 42 --error-rate 0.15 + python scripts/gen_corpus.py --help + +Output tree: + /journald/system.jsonl — systemd/kernel journald JSON + /docker/services.jsonl — containerised app stdout + /qbittorrent/qbt.log — hotio-format qBittorrent log + /avcx/device.log — AVCX device plaintext log +""" +from __future__ import annotations + +import argparse +import json +import random +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Callable + +# ── Severity distribution ────────────────────────────────────────────────────── + +_SYSLOG_PRIORITY = { + "CRITICAL": "2", + "ERROR": "3", + "WARN": "4", + "INFO": "6", + "DEBUG": "7", +} + +_SEVERITY_WEIGHTS = { + "INFO": 0.70, + "DEBUG": 0.10, + "WARN": 0.12, + "ERROR": 0.06, + "CRITICAL": 0.02, +} + + +def _pick_severity(rng: random.Random, error_rate: float) -> str: + """Return a severity string, boosting ERROR/CRITICAL by error_rate.""" + weights = dict(_SEVERITY_WEIGHTS) + boost = error_rate * 0.08 # distribute extra weight to error tiers + weights["ERROR"] += boost + weights["CRITICAL"] += boost / 2 + weights["INFO"] -= boost * 1.2 + weights["DEBUG"] -= boost * 0.3 + choices = list(weights.keys()) + probs = [max(0.0, weights[k]) for k in choices] + return rng.choices(choices, weights=probs, k=1)[0] + + +# ── Timestamp helpers ────────────────────────────────────────────────────────── + +def _ts_seq(start: datetime, end: datetime, rng: random.Random) -> list[datetime]: + """Return a sorted list of random timestamps between start and end.""" + total_seconds = (end - start).total_seconds() + # Roughly 1 event every ~4 seconds on average across all sources + count = int(total_seconds / 4) + offsets = sorted(rng.uniform(0, total_seconds) for _ in range(count)) + return [start + timedelta(seconds=o) for o in offsets] + + +def _micros(dt: datetime) -> str: + """Journald __REALTIME_TIMESTAMP: microseconds since epoch, as string.""" + return str(int(dt.timestamp() * 1_000_000)) + + +# ── Message libraries ────────────────────────────────────────────────────────── + +_JOURNALD_UNITS = [ + "sshd.service", "nginx.service", "docker.service", "systemd-resolved.service", + "cron.service", "systemd-journald.service", "NetworkManager.service", + "turnstone.service", "podman.service", "fail2ban.service", +] + +_JOURNALD_MESSAGES: dict[str, list[str]] = { + "INFO": [ + "Started {unit}.", + "Listening on {port}/tcp.", + "Reloaded configuration for {unit}.", + "New connection from {ip}:{port}", + "Session opened for user {user} by (uid=0)", + "Accepted publickey for {user} from {ip} port {port}", + "System time synchronized from NTP server {ip}", + "Unit {unit} entered active state.", + "Loaded kernel module {module}.", + "DNS query resolved: {host} -> {ip}", + ], + "DEBUG": [ + "Polling interval set to {n}ms", + "Cache hit for key '{key}'", + "Heartbeat OK from {host}", + "Timer {n} fired", + "Worker {n} idle", + ], + "WARN": [ + "High memory usage on {unit}: {pct}% used", + "Slow DNS response ({ms}ms) for {host}", + "Deprecated option '{key}' in config — will be removed in next release", + "Retrying connection to {host} (attempt {n}/5)", + "Journal size limit reached, rotating", + "Disk usage at {pct}% on /dev/sda1", + ], + "ERROR": [ + "Failed to start {unit}: exit code {n}", + "Connection refused to {host}:{port}", + "Segmentation fault in {unit} (core dumped)", + "Authentication failure for user {user} from {ip}", + "Timeout waiting for {unit} to become ready", + "Failed to bind {port}/tcp: address already in use", + ], + "CRITICAL": [ + "Kernel panic — not syncing: {msg}", + "Out of memory: killed process {n} ({unit})", + "Hardware error on /dev/sda1: I/O error", + "Disk quota exceeded on /home for user {user}", + "Critical service {unit} failed; system may be unstable", + ], +} + +_DOCKER_SERVICES = [ + "caddy", "postgres", "redis", "turnstone", "avocet", + "prometheus", "grafana", "loki", "minio", "vllm", +] + +_DOCKER_MESSAGES: dict[str, list[str]] = { + "INFO": [ + "level=info msg=\"Server listening on 0.0.0.0:{port}\"", + "level=info msg=\"Connected to database at {host}:5432\"", + 'level=info msg="GET /api/health 200 {ms}ms" user={user}', + 'level=info msg="POST /api/v1/jobs 201 {ms}ms"', + "INFO: Worker pool size: {n}", + "INFO: Cache warmed — {n} entries loaded", + "INFO: Startup complete in {ms}ms", + "INFO: Scheduled job '{key}' executed successfully", + ], + "DEBUG": [ + "DEBUG: SQL query took {ms}ms: SELECT * FROM {key}", + "DEBUG: Redis HIT for key {key}", + "level=debug msg=\"span {key} completed\" duration={ms}ms", + "DEBUG: Trace ID {key}: handler returned 200", + ], + "WARN": [ + "level=warn msg=\"Slow query ({ms}ms) on table {key}\"", + "WARN: Connection pool at {pct}% capacity", + "WARN: Rate limit approaching for client {ip}", + "WARN: Deprecated endpoint /v1/{key} called by {ip}", + "level=warn msg=\"GC pause {ms}ms — possible memory pressure\"", + ], + "ERROR": [ + "level=error msg=\"Unhandled exception in handler '{key}'\" err={msg}", + "ERROR: Database connection lost: {msg}", + "level=error msg=\"Failed to acquire lock on {key} after {ms}ms\"", + "ERROR: HTTP 500 POST /api/v1/{key}: internal server error", + "ERROR: Redis NOAUTH: authentication required", + ], + "CRITICAL": [ + "level=critical msg=\"Panic: nil pointer dereference in {key}\"", + "CRITICAL: Fatal: cannot open database: {msg}", + "CRITICAL: OOM killer invoked — process {n} terminated", + ], +} + +_QBT_MESSAGES: dict[str, list[str]] = { + "INFO": [ + "Successfully listening on IP: 0.0.0.0; port: {port}", + "Torrent '{key}' added to download queue", + "Download of '{key}' complete ({n} MB)", + "Seeding '{key}' at {n} KB/s", + "Tracker '{host}' working, {n} seeds", + "Peer {ip} connected to torrent '{key}'", + "Free disk space: {n} GB", + ], + "WARN": [ + "Tracker '{host}' is not working (retrying)", + "Slow download speed ({n} KB/s) for '{key}'", + "Too many open files — reducing connection limit", + "DHT bootstrap failed, retrying in {n}s", + ], + "CRITICAL": [ + "Not enough space on disk to download '{key}'", + "File I/O error for torrent '{key}': {msg}", + "Unable to bind listen port {port}", + ], +} + +_AVCX_CODES: dict[str, list[str]] = { + "INFO": [ + "SYS-0100 Device boot complete, firmware v{n}.{n}.{n}", + "SYS-0101 Sensor array calibration OK", + "NET-0200 Link established on interface eth{n}", + "CFG-0300 Configuration loaded from flash", + "HW-0400 Fan speed nominal: {n} RPM", + ], + "WARN": [ + "NET-0210 Link quality degraded: RSSI -{n} dBm", + "HW-0410 Fan speed elevated: {n} RPM (threshold: {n} RPM)", + "CFG-0310 Unknown config key '{key}' ignored", + "SYS-0110 Watchdog near timeout — {n}ms remaining", + ], + "ERROR": [ + "ERR-1001 Sensor read failure on channel {n}: timeout", + "ERR-1002 I2C bus {n} NACK from address 0x{key}", + "ERR-2001 Network tx queue overflow — dropped {n} packets", + "ERR-3001 Flash write error at sector {n}", + ], + "CRITICAL": [ + "ERR-9001 Thermal runaway detected — initiating shutdown", + "ERR-9002 Supply voltage out of range: {n}mV", + "ERR-9003 Memory parity error at address 0x{key}", + ], +} + + +# ── Template substitution ────────────────────────────────────────────────────── + +_HOSTS = ["heimdall", "navi", "sif", "strahl", "bastion", "xanderland"] +_USERS = ["alan", "root", "deployer", "backup", "nobody"] +_MODULES = ["btrfs", "xfs", "nf_conntrack", "ip6table_filter", "overlay"] + +def _fill(template: str, rng: random.Random) -> str: + """Replace {placeholder} tokens with plausible random values.""" + def _sub(m: re.Match) -> str: + import re + key = m.group(1) + if key == "ip": return f"10.{rng.randint(0,255)}.{rng.randint(0,255)}.{rng.randint(1,254)}" + if key == "port": return str(rng.randint(1024, 65535)) + if key == "n": return str(rng.randint(1, 9999)) + if key == "pct": return str(rng.randint(50, 99)) + if key == "ms": return str(rng.randint(1, 5000)) + if key == "unit": return rng.choice(_JOURNALD_UNITS) + if key == "user": return rng.choice(_USERS) + if key == "host": return rng.choice(_HOSTS) + if key == "module": return rng.choice(_MODULES) + if key == "msg": return rng.choice(["unexpected EOF", "connection reset", "no such file"]) + if key == "key": return rng.choice(["auth", "jobs", "cache", "index", "sessions", "queue"]) + return m.group(0) + import re + return re.sub(r"\{(\w+)\}", _sub, template) + + +def _pick_msg(library: dict[str, list[str]], severity: str, rng: random.Random) -> str: + candidates = library.get(severity) or library.get("INFO", ["log entry"]) + return _fill(rng.choice(candidates), rng) + + +# ── Per-format generators ────────────────────────────────────────────────────── + +def gen_journald(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int: + """Emit journald JSON lines (-o json format).""" + lines = 0 + hostname = rng.choice(_HOSTS) + with path.open("w") as fh: + for dt in _ts_seq(start, end, rng): + severity = _pick_severity(rng, error_rate) + unit = rng.choice(_JOURNALD_UNITS) + msg = _pick_msg(_JOURNALD_MESSAGES, severity, rng) + entry = { + "__REALTIME_TIMESTAMP": _micros(dt), + "MESSAGE": msg, + "PRIORITY": _SYSLOG_PRIORITY.get(severity, "6"), + "_HOSTNAME": hostname, + "_SYSTEMD_UNIT": unit, + "SYSLOG_IDENTIFIER": unit.replace(".service", ""), + } + fh.write(json.dumps(entry) + "\n") + lines += 1 + return lines + + +def gen_docker(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int: + """Emit Docker-format JSON lines (SOURCE + MESSAGE envelope).""" + lines = 0 + with path.open("w") as fh: + for dt in _ts_seq(start, end, rng): + severity = _pick_severity(rng, error_rate) + service = rng.choice(_DOCKER_SERVICES) + msg = _pick_msg(_DOCKER_MESSAGES, severity, rng) + entry = { + "SOURCE": service, + "MESSAGE": msg, + } + fh.write(json.dumps(entry) + "\n") + lines += 1 + return lines + + +def gen_qbittorrent(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int: + """Emit hotio-format qBittorrent plaintext log.""" + _CODE = {"INFO": "N", "WARN": "W", "CRITICAL": "C", "ERROR": "C", "DEBUG": "N"} + lines = 0 + with path.open("w") as fh: + for dt in _ts_seq(start, end, rng): + severity = _pick_severity(rng, error_rate) + msg = _pick_msg(_QBT_MESSAGES, severity, rng) + code = _CODE.get(severity, "N") + ts_str = dt.strftime("%Y-%m-%dT%H:%M:%S") + fh.write(f"({code}) {ts_str} - {msg}\n") + lines += 1 + return lines + + +def gen_avcx(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int: + """Emit AVCX device plaintext log (ISO timestamp + level + ERR/SYS/NET code + message).""" + lines = 0 + with path.open("w") as fh: + for dt in _ts_seq(start, end, rng): + severity = _pick_severity(rng, error_rate) + msg = _pick_msg(_AVCX_CODES, severity, rng) + ts_str = dt.strftime("%Y-%m-%dT%H:%M:%S") + fh.write(f"{ts_str} [{severity}] {msg}\n") + lines += 1 + return lines + + +# ── Orchestration ────────────────────────────────────────────────────────────── + +_GENERATORS: list[tuple[str, str, Callable]] = [ + ("journald", "system.jsonl", gen_journald), + ("docker", "services.jsonl", gen_docker), + ("qbittorrent", "qbt.log", gen_qbittorrent), + ("avcx", "device.log", gen_avcx), +] + + +def generate( + out: Path, + days: int, + seed: int | None, + error_rate: float, + reference_time: datetime | None = None, +) -> dict[str, int]: + rng = random.Random(seed) + end = reference_time or datetime.now(tz=timezone.utc) + start = end - timedelta(days=days) + + totals: dict[str, int] = {} + for subdir, filename, gen_fn in _GENERATORS: + dest = out / subdir / filename + dest.parent.mkdir(parents=True, exist_ok=True) + # Each source gets its own seeded sub-RNG so streams are independent + sub_rng = random.Random(rng.randint(0, 2**31)) + count = gen_fn(dest, start, end, sub_rng, error_rate) + totals[str(dest.relative_to(out))] = count + print(f" {dest.relative_to(out)}: {count:,} lines") + + return totals + + +# ── CLI ──────────────────────────────────────────────────────────────────────── + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Generate a synthetic Turnstone log corpus for demos and testing." + ) + parser.add_argument("--days", type=int, default=7, help="Days of history to generate (default: 7)") + parser.add_argument("--out", type=Path, required=True, help="Output directory") + parser.add_argument("--seed", type=int, default=None, help="RNG seed for reproducibility") + parser.add_argument("--error-rate", type=float, default=0.05, help="Error injection rate 0.0-1.0 (default: 0.05)") + args = parser.parse_args(argv) + + if not 0.0 <= args.error_rate <= 1.0: + print("ERROR: --error-rate must be between 0.0 and 1.0", file=sys.stderr) + return 1 + + args.out.mkdir(parents=True, exist_ok=True) + print(f"Generating {args.days}-day corpus → {args.out} (seed={args.seed}, error_rate={args.error_rate})") + + totals = generate(args.out, args.days, args.seed, args.error_rate) + total_lines = sum(totals.values()) + print(f"Done — {total_lines:,} total log lines across {len(totals)} files") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_gen_corpus.py b/tests/test_gen_corpus.py new file mode 100644 index 0000000..0a03d75 --- /dev/null +++ b/tests/test_gen_corpus.py @@ -0,0 +1,197 @@ +"""Tests for scripts/gen_corpus.py synthetic log generator.""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from datetime import datetime, timezone + +from scripts.gen_corpus import generate, main + +# Fixed reference time keeps timestamps deterministic across test runs +_REF_TIME = datetime(2026, 6, 10, 12, 0, 0, tzinfo=timezone.utc) + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _run(tmp_path: Path, days: int = 1, seed: int = 42, error_rate: float = 0.05) -> dict[str, int]: + return generate(tmp_path, days=days, seed=seed, error_rate=error_rate, reference_time=_REF_TIME) + + +# ── Output structure ─────────────────────────────────────────────────────────── + +class TestOutputStructure: + def test_creates_all_four_files(self, tmp_path: Path) -> None: + _run(tmp_path) + assert (tmp_path / "journald" / "system.jsonl").exists() + assert (tmp_path / "docker" / "services.jsonl").exists() + assert (tmp_path / "qbittorrent" / "qbt.log").exists() + assert (tmp_path / "avcx" / "device.log").exists() + + def test_returns_line_counts(self, tmp_path: Path) -> None: + totals = _run(tmp_path) + assert len(totals) == 4 + assert all(v > 0 for v in totals.values()) + + +# ── Reproducibility ──────────────────────────────────────────────────────────── + +class TestReproducibility: + def test_same_seed_same_output(self, tmp_path: Path) -> None: + out_a = tmp_path / "a" + out_b = tmp_path / "b" + _run(out_a, seed=99) + _run(out_b, seed=99) + for sub in ["journald/system.jsonl", "docker/services.jsonl"]: + assert (out_a / sub).read_text() == (out_b / sub).read_text() + + def test_different_seeds_differ(self, tmp_path: Path) -> None: + out_a = tmp_path / "a" + out_b = tmp_path / "b" + _run(out_a, seed=1) + _run(out_b, seed=2) + assert (out_a / "journald/system.jsonl").read_text() != (out_b / "journald/system.jsonl").read_text() + + +# ── Journald format ──────────────────────────────────────────────────────────── + +class TestJournaldFormat: + def test_valid_json_lines(self, tmp_path: Path) -> None: + _run(tmp_path) + lines = (tmp_path / "journald/system.jsonl").read_text().splitlines() + for line in lines[:100]: + obj = json.loads(line) + assert "__REALTIME_TIMESTAMP" in obj + assert "MESSAGE" in obj + assert "PRIORITY" in obj + + def test_timestamp_is_microseconds(self, tmp_path: Path) -> None: + _run(tmp_path) + lines = (tmp_path / "journald/system.jsonl").read_text().splitlines() + ts = int(json.loads(lines[0])["__REALTIME_TIMESTAMP"]) + # microseconds since epoch — should be > year 2020 + assert ts > 1_577_836_800_000_000 + + def test_parseable_by_journald_glean(self, tmp_path: Path) -> None: + from app.glean.journald import parse + _run(tmp_path) + with (tmp_path / "journald/system.jsonl").open() as fh: + entries = list(parse(fh, "test", [])) + assert len(entries) > 0 + severities = {e.severity for e in entries if e.severity} + assert severities <= {"INFO", "DEBUG", "WARN", "ERROR", "CRITICAL"} + + +# ── Docker format ────────────────────────────────────────────────────────────── + +class TestDockerFormat: + def test_valid_json_lines(self, tmp_path: Path) -> None: + _run(tmp_path) + lines = (tmp_path / "docker/services.jsonl").read_text().splitlines() + for line in lines[:100]: + obj = json.loads(line) + assert "SOURCE" in obj + assert "MESSAGE" in obj + + def test_parseable_by_docker_glean(self, tmp_path: Path) -> None: + from app.glean.docker_log import parse + _run(tmp_path) + with (tmp_path / "docker/services.jsonl").open() as fh: + entries = list(parse(fh, "test", [])) + assert len(entries) > 0 + # Severity should be detected in most entries (messages embed level= / LEVEL:) + detected = [e for e in entries if e.severity is not None] + assert len(detected) / len(entries) > 0.8 + + +# ── qBittorrent format ───────────────────────────────────────────────────────── + +class TestQbittorrentFormat: + def test_hotio_format_lines(self, tmp_path: Path) -> None: + _run(tmp_path) + lines = (tmp_path / "qbittorrent/qbt.log").read_text().splitlines() + import re + pattern = re.compile(r"^\([NIWC]\) \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} - .+$") + assert all(pattern.match(line) for line in lines[:50]) + + def test_parseable_by_qbt_glean(self, tmp_path: Path) -> None: + from app.glean.qbittorrent import parse + _run(tmp_path) + with (tmp_path / "qbittorrent/qbt.log").open() as fh: + entries = list(parse(fh, "test", [])) + assert len(entries) > 0 + severities = {e.severity for e in entries if e.severity} + assert severities <= {"INFO", "WARN", "CRITICAL"} + + +# ── AVCX format ──────────────────────────────────────────────────────────────── + +class TestAvcxFormat: + def test_iso_timestamp_prefix(self, tmp_path: Path) -> None: + _run(tmp_path) + lines = (tmp_path / "avcx/device.log").read_text().splitlines() + import re + pattern = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2} \[.+\] .+$") + assert all(pattern.match(line) for line in lines[:50]) + + def test_parseable_by_plaintext_glean(self, tmp_path: Path) -> None: + from app.glean.plaintext import parse + _run(tmp_path) + with (tmp_path / "avcx/device.log").open() as fh: + entries = list(parse(fh, "test", [])) + assert len(entries) > 0 + # ISO timestamps should parse cleanly + timestamped = [e for e in entries if e.timestamp_iso] + assert len(timestamped) / len(entries) > 0.95 + + +# ── Error rate ───────────────────────────────────────────────────────────────── + +class TestErrorRate: + def test_high_error_rate_increases_errors(self, tmp_path: Path) -> None: + from app.glean.journald import parse + + low = tmp_path / "low" + high = tmp_path / "high" + _run(low, seed=7, error_rate=0.01) + _run(high, seed=7, error_rate=0.50) + + def error_ratio(path: Path) -> float: + with path.open() as fh: + entries = list(parse(fh, "test", [])) + errs = sum(1 for e in entries if e.severity in ("ERROR", "CRITICAL")) + return errs / len(entries) if entries else 0.0 + + assert error_ratio(high / "journald/system.jsonl") > error_ratio(low / "journald/system.jsonl") + + def test_invalid_error_rate_returns_nonzero(self, tmp_path: Path) -> None: + rc = main(["--days", "1", "--out", str(tmp_path), "--error-rate", "1.5"]) + assert rc != 0 + + +# ── CLI ──────────────────────────────────────────────────────────────────────── + +class TestCLI: + def test_acceptance_criteria(self, tmp_path: Path) -> None: + """Acceptance: --days 7 --out produces a gleanable corpus with varied severities.""" + from app.glean.journald import parse + + rc = main(["--days", "7", "--out", str(tmp_path)]) + assert rc == 0 + + with (tmp_path / "journald/system.jsonl").open() as fh: + entries = list(parse(fh, "test", [])) + + severities = {e.severity for e in entries if e.severity} + assert {"INFO", "WARN", "ERROR", "CRITICAL"}.issubset(severities) + assert len(entries) > 100_000 # 7 days of ~86k/day + + def test_missing_out_fails(self, tmp_path: Path, capsys: pytest.CaptureFixture) -> None: + with pytest.raises(SystemExit) as exc_info: + main(["--days", "1"]) + assert exc_info.value.code != 0