turnstone/scripts/gen_corpus.py
pyr0ball 7ab92a5cf4 feat(corpus): synthetic log corpus generator for demos and testing
Adds scripts/gen_corpus.py that produces realistic-but-artificial log
files across all four supported formats (journald JSON, docker envelope,
qBittorrent hotio, AVCX plaintext). Output feeds directly into
glean_corpus.py for demo environments and parser regression tests with
no production data required.

- Seed-based RNG with independent per-source sub-streams (same seed =
  same sequence for each file regardless of source count changes)
- Controllable time range, event density, and error injection rate
- Severity distribution mirrors real infrastructure (70% INFO, ~6% ERROR,
  ~2% CRITICAL) with adjustable boost via --error-rate
- 17 tests covering output structure, reproducibility, format correctness,
  parser round-trip, and CLI acceptance criteria

Also fixes a latent bug in app/glean/plaintext.py: ISO 8601 timestamps
were silently failing to parse because the T separator was normalised to
space in the input string but the strptime format string still contained T.
Fix: apply the same normalisation to the format before calling strptime.

Closes: #46
2026-06-11 10:57:20 -07:00

383 lines
16 KiB
Python

"""Synthetic log corpus generator.
Produces realistic-but-entirely-artificial log files for demos, load tests,
and parser regression suites — no production data required.
Usage:
python scripts/gen_corpus.py --days 7 --out /tmp/demo-corpus/
python scripts/gen_corpus.py --days 1 --out /tmp/test-run/ --seed 42 --error-rate 0.15
python scripts/gen_corpus.py --help
Output tree:
<out>/journald/system.jsonl — systemd/kernel journald JSON
<out>/docker/services.jsonl — containerised app stdout
<out>/qbittorrent/qbt.log — hotio-format qBittorrent log
<out>/avcx/device.log — AVCX device plaintext log
"""
from __future__ import annotations
import argparse
import json
import random
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Callable
# ── Severity distribution ──────────────────────────────────────────────────────
_SYSLOG_PRIORITY = {
"CRITICAL": "2",
"ERROR": "3",
"WARN": "4",
"INFO": "6",
"DEBUG": "7",
}
_SEVERITY_WEIGHTS = {
"INFO": 0.70,
"DEBUG": 0.10,
"WARN": 0.12,
"ERROR": 0.06,
"CRITICAL": 0.02,
}
def _pick_severity(rng: random.Random, error_rate: float) -> str:
"""Return a severity string, boosting ERROR/CRITICAL by error_rate."""
weights = dict(_SEVERITY_WEIGHTS)
boost = error_rate * 0.08 # distribute extra weight to error tiers
weights["ERROR"] += boost
weights["CRITICAL"] += boost / 2
weights["INFO"] -= boost * 1.2
weights["DEBUG"] -= boost * 0.3
choices = list(weights.keys())
probs = [max(0.0, weights[k]) for k in choices]
return rng.choices(choices, weights=probs, k=1)[0]
# ── Timestamp helpers ──────────────────────────────────────────────────────────
def _ts_seq(start: datetime, end: datetime, rng: random.Random) -> list[datetime]:
"""Return a sorted list of random timestamps between start and end."""
total_seconds = (end - start).total_seconds()
# Roughly 1 event every ~4 seconds on average across all sources
count = int(total_seconds / 4)
offsets = sorted(rng.uniform(0, total_seconds) for _ in range(count))
return [start + timedelta(seconds=o) for o in offsets]
def _micros(dt: datetime) -> str:
"""Journald __REALTIME_TIMESTAMP: microseconds since epoch, as string."""
return str(int(dt.timestamp() * 1_000_000))
# ── Message libraries ──────────────────────────────────────────────────────────
_JOURNALD_UNITS = [
"sshd.service", "nginx.service", "docker.service", "systemd-resolved.service",
"cron.service", "systemd-journald.service", "NetworkManager.service",
"turnstone.service", "podman.service", "fail2ban.service",
]
_JOURNALD_MESSAGES: dict[str, list[str]] = {
"INFO": [
"Started {unit}.",
"Listening on {port}/tcp.",
"Reloaded configuration for {unit}.",
"New connection from {ip}:{port}",
"Session opened for user {user} by (uid=0)",
"Accepted publickey for {user} from {ip} port {port}",
"System time synchronized from NTP server {ip}",
"Unit {unit} entered active state.",
"Loaded kernel module {module}.",
"DNS query resolved: {host} -> {ip}",
],
"DEBUG": [
"Polling interval set to {n}ms",
"Cache hit for key '{key}'",
"Heartbeat OK from {host}",
"Timer {n} fired",
"Worker {n} idle",
],
"WARN": [
"High memory usage on {unit}: {pct}% used",
"Slow DNS response ({ms}ms) for {host}",
"Deprecated option '{key}' in config — will be removed in next release",
"Retrying connection to {host} (attempt {n}/5)",
"Journal size limit reached, rotating",
"Disk usage at {pct}% on /dev/sda1",
],
"ERROR": [
"Failed to start {unit}: exit code {n}",
"Connection refused to {host}:{port}",
"Segmentation fault in {unit} (core dumped)",
"Authentication failure for user {user} from {ip}",
"Timeout waiting for {unit} to become ready",
"Failed to bind {port}/tcp: address already in use",
],
"CRITICAL": [
"Kernel panic — not syncing: {msg}",
"Out of memory: killed process {n} ({unit})",
"Hardware error on /dev/sda1: I/O error",
"Disk quota exceeded on /home for user {user}",
"Critical service {unit} failed; system may be unstable",
],
}
_DOCKER_SERVICES = [
"caddy", "postgres", "redis", "turnstone", "avocet",
"prometheus", "grafana", "loki", "minio", "vllm",
]
_DOCKER_MESSAGES: dict[str, list[str]] = {
"INFO": [
"level=info msg=\"Server listening on 0.0.0.0:{port}\"",
"level=info msg=\"Connected to database at {host}:5432\"",
'level=info msg="GET /api/health 200 {ms}ms" user={user}',
'level=info msg="POST /api/v1/jobs 201 {ms}ms"',
"INFO: Worker pool size: {n}",
"INFO: Cache warmed — {n} entries loaded",
"INFO: Startup complete in {ms}ms",
"INFO: Scheduled job '{key}' executed successfully",
],
"DEBUG": [
"DEBUG: SQL query took {ms}ms: SELECT * FROM {key}",
"DEBUG: Redis HIT for key {key}",
"level=debug msg=\"span {key} completed\" duration={ms}ms",
"DEBUG: Trace ID {key}: handler returned 200",
],
"WARN": [
"level=warn msg=\"Slow query ({ms}ms) on table {key}\"",
"WARN: Connection pool at {pct}% capacity",
"WARN: Rate limit approaching for client {ip}",
"WARN: Deprecated endpoint /v1/{key} called by {ip}",
"level=warn msg=\"GC pause {ms}ms — possible memory pressure\"",
],
"ERROR": [
"level=error msg=\"Unhandled exception in handler '{key}'\" err={msg}",
"ERROR: Database connection lost: {msg}",
"level=error msg=\"Failed to acquire lock on {key} after {ms}ms\"",
"ERROR: HTTP 500 POST /api/v1/{key}: internal server error",
"ERROR: Redis NOAUTH: authentication required",
],
"CRITICAL": [
"level=critical msg=\"Panic: nil pointer dereference in {key}\"",
"CRITICAL: Fatal: cannot open database: {msg}",
"CRITICAL: OOM killer invoked — process {n} terminated",
],
}
_QBT_MESSAGES: dict[str, list[str]] = {
"INFO": [
"Successfully listening on IP: 0.0.0.0; port: {port}",
"Torrent '{key}' added to download queue",
"Download of '{key}' complete ({n} MB)",
"Seeding '{key}' at {n} KB/s",
"Tracker '{host}' working, {n} seeds",
"Peer {ip} connected to torrent '{key}'",
"Free disk space: {n} GB",
],
"WARN": [
"Tracker '{host}' is not working (retrying)",
"Slow download speed ({n} KB/s) for '{key}'",
"Too many open files — reducing connection limit",
"DHT bootstrap failed, retrying in {n}s",
],
"CRITICAL": [
"Not enough space on disk to download '{key}'",
"File I/O error for torrent '{key}': {msg}",
"Unable to bind listen port {port}",
],
}
_AVCX_CODES: dict[str, list[str]] = {
"INFO": [
"SYS-0100 Device boot complete, firmware v{n}.{n}.{n}",
"SYS-0101 Sensor array calibration OK",
"NET-0200 Link established on interface eth{n}",
"CFG-0300 Configuration loaded from flash",
"HW-0400 Fan speed nominal: {n} RPM",
],
"WARN": [
"NET-0210 Link quality degraded: RSSI -{n} dBm",
"HW-0410 Fan speed elevated: {n} RPM (threshold: {n} RPM)",
"CFG-0310 Unknown config key '{key}' ignored",
"SYS-0110 Watchdog near timeout — {n}ms remaining",
],
"ERROR": [
"ERR-1001 Sensor read failure on channel {n}: timeout",
"ERR-1002 I2C bus {n} NACK from address 0x{key}",
"ERR-2001 Network tx queue overflow — dropped {n} packets",
"ERR-3001 Flash write error at sector {n}",
],
"CRITICAL": [
"ERR-9001 Thermal runaway detected — initiating shutdown",
"ERR-9002 Supply voltage out of range: {n}mV",
"ERR-9003 Memory parity error at address 0x{key}",
],
}
# ── Template substitution ──────────────────────────────────────────────────────
_HOSTS = ["heimdall", "navi", "sif", "strahl", "bastion", "xanderland"]
_USERS = ["alan", "root", "deployer", "backup", "nobody"]
_MODULES = ["btrfs", "xfs", "nf_conntrack", "ip6table_filter", "overlay"]
def _fill(template: str, rng: random.Random) -> str:
"""Replace {placeholder} tokens with plausible random values."""
def _sub(m: re.Match) -> str:
import re
key = m.group(1)
if key == "ip": return f"10.{rng.randint(0,255)}.{rng.randint(0,255)}.{rng.randint(1,254)}"
if key == "port": return str(rng.randint(1024, 65535))
if key == "n": return str(rng.randint(1, 9999))
if key == "pct": return str(rng.randint(50, 99))
if key == "ms": return str(rng.randint(1, 5000))
if key == "unit": return rng.choice(_JOURNALD_UNITS)
if key == "user": return rng.choice(_USERS)
if key == "host": return rng.choice(_HOSTS)
if key == "module": return rng.choice(_MODULES)
if key == "msg": return rng.choice(["unexpected EOF", "connection reset", "no such file"])
if key == "key": return rng.choice(["auth", "jobs", "cache", "index", "sessions", "queue"])
return m.group(0)
import re
return re.sub(r"\{(\w+)\}", _sub, template)
def _pick_msg(library: dict[str, list[str]], severity: str, rng: random.Random) -> str:
candidates = library.get(severity) or library.get("INFO", ["log entry"])
return _fill(rng.choice(candidates), rng)
# ── Per-format generators ──────────────────────────────────────────────────────
def gen_journald(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
"""Emit journald JSON lines (-o json format)."""
lines = 0
hostname = rng.choice(_HOSTS)
with path.open("w") as fh:
for dt in _ts_seq(start, end, rng):
severity = _pick_severity(rng, error_rate)
unit = rng.choice(_JOURNALD_UNITS)
msg = _pick_msg(_JOURNALD_MESSAGES, severity, rng)
entry = {
"__REALTIME_TIMESTAMP": _micros(dt),
"MESSAGE": msg,
"PRIORITY": _SYSLOG_PRIORITY.get(severity, "6"),
"_HOSTNAME": hostname,
"_SYSTEMD_UNIT": unit,
"SYSLOG_IDENTIFIER": unit.replace(".service", ""),
}
fh.write(json.dumps(entry) + "\n")
lines += 1
return lines
def gen_docker(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
"""Emit Docker-format JSON lines (SOURCE + MESSAGE envelope)."""
lines = 0
with path.open("w") as fh:
for dt in _ts_seq(start, end, rng):
severity = _pick_severity(rng, error_rate)
service = rng.choice(_DOCKER_SERVICES)
msg = _pick_msg(_DOCKER_MESSAGES, severity, rng)
entry = {
"SOURCE": service,
"MESSAGE": msg,
}
fh.write(json.dumps(entry) + "\n")
lines += 1
return lines
def gen_qbittorrent(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
"""Emit hotio-format qBittorrent plaintext log."""
_CODE = {"INFO": "N", "WARN": "W", "CRITICAL": "C", "ERROR": "C", "DEBUG": "N"}
lines = 0
with path.open("w") as fh:
for dt in _ts_seq(start, end, rng):
severity = _pick_severity(rng, error_rate)
msg = _pick_msg(_QBT_MESSAGES, severity, rng)
code = _CODE.get(severity, "N")
ts_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
fh.write(f"({code}) {ts_str} - {msg}\n")
lines += 1
return lines
def gen_avcx(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
"""Emit AVCX device plaintext log (ISO timestamp + level + ERR/SYS/NET code + message)."""
lines = 0
with path.open("w") as fh:
for dt in _ts_seq(start, end, rng):
severity = _pick_severity(rng, error_rate)
msg = _pick_msg(_AVCX_CODES, severity, rng)
ts_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
fh.write(f"{ts_str} [{severity}] {msg}\n")
lines += 1
return lines
# ── Orchestration ──────────────────────────────────────────────────────────────
_GENERATORS: list[tuple[str, str, Callable]] = [
("journald", "system.jsonl", gen_journald),
("docker", "services.jsonl", gen_docker),
("qbittorrent", "qbt.log", gen_qbittorrent),
("avcx", "device.log", gen_avcx),
]
def generate(
out: Path,
days: int,
seed: int | None,
error_rate: float,
reference_time: datetime | None = None,
) -> dict[str, int]:
rng = random.Random(seed)
end = reference_time or datetime.now(tz=timezone.utc)
start = end - timedelta(days=days)
totals: dict[str, int] = {}
for subdir, filename, gen_fn in _GENERATORS:
dest = out / subdir / filename
dest.parent.mkdir(parents=True, exist_ok=True)
# Each source gets its own seeded sub-RNG so streams are independent
sub_rng = random.Random(rng.randint(0, 2**31))
count = gen_fn(dest, start, end, sub_rng, error_rate)
totals[str(dest.relative_to(out))] = count
print(f" {dest.relative_to(out)}: {count:,} lines")
return totals
# ── CLI ────────────────────────────────────────────────────────────────────────
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Generate a synthetic Turnstone log corpus for demos and testing."
)
parser.add_argument("--days", type=int, default=7, help="Days of history to generate (default: 7)")
parser.add_argument("--out", type=Path, required=True, help="Output directory")
parser.add_argument("--seed", type=int, default=None, help="RNG seed for reproducibility")
parser.add_argument("--error-rate", type=float, default=0.05, help="Error injection rate 0.0-1.0 (default: 0.05)")
args = parser.parse_args(argv)
if not 0.0 <= args.error_rate <= 1.0:
print("ERROR: --error-rate must be between 0.0 and 1.0", file=sys.stderr)
return 1
args.out.mkdir(parents=True, exist_ok=True)
print(f"Generating {args.days}-day corpus → {args.out} (seed={args.seed}, error_rate={args.error_rate})")
totals = generate(args.out, args.days, args.seed, args.error_rate)
total_lines = sum(totals.values())
print(f"Done — {total_lines:,} total log lines across {len(totals)} files")
return 0
if __name__ == "__main__":
sys.exit(main())