- Changed glob → rglob in glean_dir so corpus directories with format subfolders (journald/, docker/, etc.) are fully ingested - Fixed gen_corpus.py docker SOURCE to emit "docker:<service>" prefix so the pipeline correctly detects format as 'docker' not 'plaintext' - 17/17 gen_corpus tests passing Closes: #46
383 lines
16 KiB
Python
383 lines
16 KiB
Python
"""Synthetic log corpus generator.
|
|
|
|
Produces realistic-but-entirely-artificial log files for demos, load tests,
|
|
and parser regression suites — no production data required.
|
|
|
|
Usage:
|
|
python scripts/gen_corpus.py --days 7 --out /tmp/demo-corpus/
|
|
python scripts/gen_corpus.py --days 1 --out /tmp/test-run/ --seed 42 --error-rate 0.15
|
|
python scripts/gen_corpus.py --help
|
|
|
|
Output tree:
|
|
<out>/journald/system.jsonl — systemd/kernel journald JSON
|
|
<out>/docker/services.jsonl — containerised app stdout
|
|
<out>/qbittorrent/qbt.log — hotio-format qBittorrent log
|
|
<out>/avcx/device.log — AVCX device plaintext log
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import random
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
# ── Severity distribution ──────────────────────────────────────────────────────
|
|
|
|
_SYSLOG_PRIORITY = {
|
|
"CRITICAL": "2",
|
|
"ERROR": "3",
|
|
"WARN": "4",
|
|
"INFO": "6",
|
|
"DEBUG": "7",
|
|
}
|
|
|
|
_SEVERITY_WEIGHTS = {
|
|
"INFO": 0.70,
|
|
"DEBUG": 0.10,
|
|
"WARN": 0.12,
|
|
"ERROR": 0.06,
|
|
"CRITICAL": 0.02,
|
|
}
|
|
|
|
|
|
def _pick_severity(rng: random.Random, error_rate: float) -> str:
|
|
"""Return a severity string, boosting ERROR/CRITICAL by error_rate."""
|
|
weights = dict(_SEVERITY_WEIGHTS)
|
|
boost = error_rate * 0.08 # distribute extra weight to error tiers
|
|
weights["ERROR"] += boost
|
|
weights["CRITICAL"] += boost / 2
|
|
weights["INFO"] -= boost * 1.2
|
|
weights["DEBUG"] -= boost * 0.3
|
|
choices = list(weights.keys())
|
|
probs = [max(0.0, weights[k]) for k in choices]
|
|
return rng.choices(choices, weights=probs, k=1)[0]
|
|
|
|
|
|
# ── Timestamp helpers ──────────────────────────────────────────────────────────
|
|
|
|
def _ts_seq(start: datetime, end: datetime, rng: random.Random) -> list[datetime]:
|
|
"""Return a sorted list of random timestamps between start and end."""
|
|
total_seconds = (end - start).total_seconds()
|
|
# Roughly 1 event every ~4 seconds on average across all sources
|
|
count = int(total_seconds / 4)
|
|
offsets = sorted(rng.uniform(0, total_seconds) for _ in range(count))
|
|
return [start + timedelta(seconds=o) for o in offsets]
|
|
|
|
|
|
def _micros(dt: datetime) -> str:
|
|
"""Journald __REALTIME_TIMESTAMP: microseconds since epoch, as string."""
|
|
return str(int(dt.timestamp() * 1_000_000))
|
|
|
|
|
|
# ── Message libraries ──────────────────────────────────────────────────────────
|
|
|
|
_JOURNALD_UNITS = [
|
|
"sshd.service", "nginx.service", "docker.service", "systemd-resolved.service",
|
|
"cron.service", "systemd-journald.service", "NetworkManager.service",
|
|
"turnstone.service", "podman.service", "fail2ban.service",
|
|
]
|
|
|
|
_JOURNALD_MESSAGES: dict[str, list[str]] = {
|
|
"INFO": [
|
|
"Started {unit}.",
|
|
"Listening on {port}/tcp.",
|
|
"Reloaded configuration for {unit}.",
|
|
"New connection from {ip}:{port}",
|
|
"Session opened for user {user} by (uid=0)",
|
|
"Accepted publickey for {user} from {ip} port {port}",
|
|
"System time synchronized from NTP server {ip}",
|
|
"Unit {unit} entered active state.",
|
|
"Loaded kernel module {module}.",
|
|
"DNS query resolved: {host} -> {ip}",
|
|
],
|
|
"DEBUG": [
|
|
"Polling interval set to {n}ms",
|
|
"Cache hit for key '{key}'",
|
|
"Heartbeat OK from {host}",
|
|
"Timer {n} fired",
|
|
"Worker {n} idle",
|
|
],
|
|
"WARN": [
|
|
"High memory usage on {unit}: {pct}% used",
|
|
"Slow DNS response ({ms}ms) for {host}",
|
|
"Deprecated option '{key}' in config — will be removed in next release",
|
|
"Retrying connection to {host} (attempt {n}/5)",
|
|
"Journal size limit reached, rotating",
|
|
"Disk usage at {pct}% on /dev/sda1",
|
|
],
|
|
"ERROR": [
|
|
"Failed to start {unit}: exit code {n}",
|
|
"Connection refused to {host}:{port}",
|
|
"Segmentation fault in {unit} (core dumped)",
|
|
"Authentication failure for user {user} from {ip}",
|
|
"Timeout waiting for {unit} to become ready",
|
|
"Failed to bind {port}/tcp: address already in use",
|
|
],
|
|
"CRITICAL": [
|
|
"Kernel panic — not syncing: {msg}",
|
|
"Out of memory: killed process {n} ({unit})",
|
|
"Hardware error on /dev/sda1: I/O error",
|
|
"Disk quota exceeded on /home for user {user}",
|
|
"Critical service {unit} failed; system may be unstable",
|
|
],
|
|
}
|
|
|
|
_DOCKER_SERVICES = [
|
|
"caddy", "postgres", "redis", "turnstone", "avocet",
|
|
"prometheus", "grafana", "loki", "minio", "vllm",
|
|
]
|
|
|
|
_DOCKER_MESSAGES: dict[str, list[str]] = {
|
|
"INFO": [
|
|
"level=info msg=\"Server listening on 0.0.0.0:{port}\"",
|
|
"level=info msg=\"Connected to database at {host}:5432\"",
|
|
'level=info msg="GET /api/health 200 {ms}ms" user={user}',
|
|
'level=info msg="POST /api/v1/jobs 201 {ms}ms"',
|
|
"INFO: Worker pool size: {n}",
|
|
"INFO: Cache warmed — {n} entries loaded",
|
|
"INFO: Startup complete in {ms}ms",
|
|
"INFO: Scheduled job '{key}' executed successfully",
|
|
],
|
|
"DEBUG": [
|
|
"DEBUG: SQL query took {ms}ms: SELECT * FROM {key}",
|
|
"DEBUG: Redis HIT for key {key}",
|
|
"level=debug msg=\"span {key} completed\" duration={ms}ms",
|
|
"DEBUG: Trace ID {key}: handler returned 200",
|
|
],
|
|
"WARN": [
|
|
"level=warn msg=\"Slow query ({ms}ms) on table {key}\"",
|
|
"WARN: Connection pool at {pct}% capacity",
|
|
"WARN: Rate limit approaching for client {ip}",
|
|
"WARN: Deprecated endpoint /v1/{key} called by {ip}",
|
|
"level=warn msg=\"GC pause {ms}ms — possible memory pressure\"",
|
|
],
|
|
"ERROR": [
|
|
"level=error msg=\"Unhandled exception in handler '{key}'\" err={msg}",
|
|
"ERROR: Database connection lost: {msg}",
|
|
"level=error msg=\"Failed to acquire lock on {key} after {ms}ms\"",
|
|
"ERROR: HTTP 500 POST /api/v1/{key}: internal server error",
|
|
"ERROR: Redis NOAUTH: authentication required",
|
|
],
|
|
"CRITICAL": [
|
|
"level=critical msg=\"Panic: nil pointer dereference in {key}\"",
|
|
"CRITICAL: Fatal: cannot open database: {msg}",
|
|
"CRITICAL: OOM killer invoked — process {n} terminated",
|
|
],
|
|
}
|
|
|
|
_QBT_MESSAGES: dict[str, list[str]] = {
|
|
"INFO": [
|
|
"Successfully listening on IP: 0.0.0.0; port: {port}",
|
|
"Torrent '{key}' added to download queue",
|
|
"Download of '{key}' complete ({n} MB)",
|
|
"Seeding '{key}' at {n} KB/s",
|
|
"Tracker '{host}' working, {n} seeds",
|
|
"Peer {ip} connected to torrent '{key}'",
|
|
"Free disk space: {n} GB",
|
|
],
|
|
"WARN": [
|
|
"Tracker '{host}' is not working (retrying)",
|
|
"Slow download speed ({n} KB/s) for '{key}'",
|
|
"Too many open files — reducing connection limit",
|
|
"DHT bootstrap failed, retrying in {n}s",
|
|
],
|
|
"CRITICAL": [
|
|
"Not enough space on disk to download '{key}'",
|
|
"File I/O error for torrent '{key}': {msg}",
|
|
"Unable to bind listen port {port}",
|
|
],
|
|
}
|
|
|
|
_AVCX_CODES: dict[str, list[str]] = {
|
|
"INFO": [
|
|
"SYS-0100 Device boot complete, firmware v{n}.{n}.{n}",
|
|
"SYS-0101 Sensor array calibration OK",
|
|
"NET-0200 Link established on interface eth{n}",
|
|
"CFG-0300 Configuration loaded from flash",
|
|
"HW-0400 Fan speed nominal: {n} RPM",
|
|
],
|
|
"WARN": [
|
|
"NET-0210 Link quality degraded: RSSI -{n} dBm",
|
|
"HW-0410 Fan speed elevated: {n} RPM (threshold: {n} RPM)",
|
|
"CFG-0310 Unknown config key '{key}' ignored",
|
|
"SYS-0110 Watchdog near timeout — {n}ms remaining",
|
|
],
|
|
"ERROR": [
|
|
"ERR-1001 Sensor read failure on channel {n}: timeout",
|
|
"ERR-1002 I2C bus {n} NACK from address 0x{key}",
|
|
"ERR-2001 Network tx queue overflow — dropped {n} packets",
|
|
"ERR-3001 Flash write error at sector {n}",
|
|
],
|
|
"CRITICAL": [
|
|
"ERR-9001 Thermal runaway detected — initiating shutdown",
|
|
"ERR-9002 Supply voltage out of range: {n}mV",
|
|
"ERR-9003 Memory parity error at address 0x{key}",
|
|
],
|
|
}
|
|
|
|
|
|
# ── Template substitution ──────────────────────────────────────────────────────
|
|
|
|
_HOSTS = ["heimdall", "navi", "sif", "strahl", "bastion", "example-node"]
|
|
_USERS = ["alan", "root", "deployer", "backup", "nobody"]
|
|
_MODULES = ["btrfs", "xfs", "nf_conntrack", "ip6table_filter", "overlay"]
|
|
|
|
def _fill(template: str, rng: random.Random) -> str:
|
|
"""Replace {placeholder} tokens with plausible random values."""
|
|
def _sub(m: re.Match) -> str:
|
|
import re
|
|
key = m.group(1)
|
|
if key == "ip": return f"10.{rng.randint(0,255)}.{rng.randint(0,255)}.{rng.randint(1,254)}"
|
|
if key == "port": return str(rng.randint(1024, 65535))
|
|
if key == "n": return str(rng.randint(1, 9999))
|
|
if key == "pct": return str(rng.randint(50, 99))
|
|
if key == "ms": return str(rng.randint(1, 5000))
|
|
if key == "unit": return rng.choice(_JOURNALD_UNITS)
|
|
if key == "user": return rng.choice(_USERS)
|
|
if key == "host": return rng.choice(_HOSTS)
|
|
if key == "module": return rng.choice(_MODULES)
|
|
if key == "msg": return rng.choice(["unexpected EOF", "connection reset", "no such file"])
|
|
if key == "key": return rng.choice(["auth", "jobs", "cache", "index", "sessions", "queue"])
|
|
return m.group(0)
|
|
import re
|
|
return re.sub(r"\{(\w+)\}", _sub, template)
|
|
|
|
|
|
def _pick_msg(library: dict[str, list[str]], severity: str, rng: random.Random) -> str:
|
|
candidates = library.get(severity) or library.get("INFO", ["log entry"])
|
|
return _fill(rng.choice(candidates), rng)
|
|
|
|
|
|
# ── Per-format generators ──────────────────────────────────────────────────────
|
|
|
|
def gen_journald(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
|
|
"""Emit journald JSON lines (-o json format)."""
|
|
lines = 0
|
|
hostname = rng.choice(_HOSTS)
|
|
with path.open("w") as fh:
|
|
for dt in _ts_seq(start, end, rng):
|
|
severity = _pick_severity(rng, error_rate)
|
|
unit = rng.choice(_JOURNALD_UNITS)
|
|
msg = _pick_msg(_JOURNALD_MESSAGES, severity, rng)
|
|
entry = {
|
|
"__REALTIME_TIMESTAMP": _micros(dt),
|
|
"MESSAGE": msg,
|
|
"PRIORITY": _SYSLOG_PRIORITY.get(severity, "6"),
|
|
"_HOSTNAME": hostname,
|
|
"_SYSTEMD_UNIT": unit,
|
|
"SYSLOG_IDENTIFIER": unit.replace(".service", ""),
|
|
}
|
|
fh.write(json.dumps(entry) + "\n")
|
|
lines += 1
|
|
return lines
|
|
|
|
|
|
def gen_docker(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
|
|
"""Emit Docker-format JSON lines (SOURCE + MESSAGE envelope)."""
|
|
lines = 0
|
|
with path.open("w") as fh:
|
|
for dt in _ts_seq(start, end, rng):
|
|
severity = _pick_severity(rng, error_rate)
|
|
service = rng.choice(_DOCKER_SERVICES)
|
|
msg = _pick_msg(_DOCKER_MESSAGES, severity, rng)
|
|
entry = {
|
|
"SOURCE": f"docker:{service}",
|
|
"MESSAGE": msg,
|
|
}
|
|
fh.write(json.dumps(entry) + "\n")
|
|
lines += 1
|
|
return lines
|
|
|
|
|
|
def gen_qbittorrent(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
|
|
"""Emit hotio-format qBittorrent plaintext log."""
|
|
_CODE = {"INFO": "N", "WARN": "W", "CRITICAL": "C", "ERROR": "C", "DEBUG": "N"}
|
|
lines = 0
|
|
with path.open("w") as fh:
|
|
for dt in _ts_seq(start, end, rng):
|
|
severity = _pick_severity(rng, error_rate)
|
|
msg = _pick_msg(_QBT_MESSAGES, severity, rng)
|
|
code = _CODE.get(severity, "N")
|
|
ts_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
|
|
fh.write(f"({code}) {ts_str} - {msg}\n")
|
|
lines += 1
|
|
return lines
|
|
|
|
|
|
def gen_avcx(path: Path, start: datetime, end: datetime, rng: random.Random, error_rate: float) -> int:
|
|
"""Emit AVCX device plaintext log (ISO timestamp + level + ERR/SYS/NET code + message)."""
|
|
lines = 0
|
|
with path.open("w") as fh:
|
|
for dt in _ts_seq(start, end, rng):
|
|
severity = _pick_severity(rng, error_rate)
|
|
msg = _pick_msg(_AVCX_CODES, severity, rng)
|
|
ts_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
|
|
fh.write(f"{ts_str} [{severity}] {msg}\n")
|
|
lines += 1
|
|
return lines
|
|
|
|
|
|
# ── Orchestration ──────────────────────────────────────────────────────────────
|
|
|
|
_GENERATORS: list[tuple[str, str, Callable]] = [
|
|
("journald", "system.jsonl", gen_journald),
|
|
("docker", "services.jsonl", gen_docker),
|
|
("qbittorrent", "qbt.log", gen_qbittorrent),
|
|
("avcx", "device.log", gen_avcx),
|
|
]
|
|
|
|
|
|
def generate(
|
|
out: Path,
|
|
days: int,
|
|
seed: int | None,
|
|
error_rate: float,
|
|
reference_time: datetime | None = None,
|
|
) -> dict[str, int]:
|
|
rng = random.Random(seed)
|
|
end = reference_time or datetime.now(tz=timezone.utc)
|
|
start = end - timedelta(days=days)
|
|
|
|
totals: dict[str, int] = {}
|
|
for subdir, filename, gen_fn in _GENERATORS:
|
|
dest = out / subdir / filename
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
# Each source gets its own seeded sub-RNG so streams are independent
|
|
sub_rng = random.Random(rng.randint(0, 2**31))
|
|
count = gen_fn(dest, start, end, sub_rng, error_rate)
|
|
totals[str(dest.relative_to(out))] = count
|
|
print(f" {dest.relative_to(out)}: {count:,} lines")
|
|
|
|
return totals
|
|
|
|
|
|
# ── CLI ────────────────────────────────────────────────────────────────────────
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate a synthetic Turnstone log corpus for demos and testing."
|
|
)
|
|
parser.add_argument("--days", type=int, default=7, help="Days of history to generate (default: 7)")
|
|
parser.add_argument("--out", type=Path, required=True, help="Output directory")
|
|
parser.add_argument("--seed", type=int, default=None, help="RNG seed for reproducibility")
|
|
parser.add_argument("--error-rate", type=float, default=0.05, help="Error injection rate 0.0-1.0 (default: 0.05)")
|
|
args = parser.parse_args(argv)
|
|
|
|
if not 0.0 <= args.error_rate <= 1.0:
|
|
print("ERROR: --error-rate must be between 0.0 and 1.0", file=sys.stderr)
|
|
return 1
|
|
|
|
args.out.mkdir(parents=True, exist_ok=True)
|
|
print(f"Generating {args.days}-day corpus → {args.out} (seed={args.seed}, error_rate={args.error_rate})")
|
|
|
|
totals = generate(args.out, args.days, args.seed, args.error_rate)
|
|
total_lines = sum(totals.values())
|
|
print(f"Done — {total_lines:,} total log lines across {len(totals)} files")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|