turnstone/tests/test_ingest_syslog.py
pyr0ball 9ec60ea7ff feat: syslog and dmesg parsers with graceful journald fallback
- Add syslog.py — RFC 3164 parser for /var/log/syslog, /var/log/messages,
  auth.log, kern.log; ident prepended to message text for searchability
- Add dmesg_log.py — handles both relative [secs.usecs] and human-readable
  [Dow Mon DD HH:MM:SS YYYY] formats; relative timestamps preserved as raw
- Wire both into pipeline.py auto-detection (before plaintext fallback)
- Update export_journal.sh: checks for journalctl availability, falls back
  gracefully on non-systemd systems; adds dmesg -T export (falls back to
  plain dmesg on older kernels)
- Add syslog entries (commented) + dmesg source to sources.yaml
- 30 tests covering both parsers (detection + parse correctness)
2026-05-11 06:57:38 -07:00

70 lines
2.7 KiB
Python

"""Tests for the syslog (RFC 3164) ingestor."""
from __future__ import annotations
from app.ingest.syslog import is_syslog, parse
SYSLOG_SAMPLE = """\
May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
"""
class TestDetector:
def test_detects_standard_line(self):
assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
def test_detects_no_pid(self):
assert is_syslog("May 11 14:23:01 example-node kernel: message")
def test_detects_space_padded_day(self):
assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
def test_rejects_servarr(self):
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")
def test_rejects_journald_json(self):
assert not is_syslog('{"__REALTIME_TIMESTAMP": "12345", "MESSAGE": "hi"}')
def test_rejects_dmesg_relative(self):
assert not is_syslog("[ 0.000000] Linux version 6.8.0")
def test_rejects_plaintext(self):
assert not is_syslog("just a plain text line with no structure")
class TestParser:
def _parse(self, text: str) -> list:
return list(parse(iter(text.splitlines(keepends=True)), "syslog_test", []))
def test_entry_count(self):
assert len(self._parse(SYSLOG_SAMPLE)) == 6
def test_ident_prepended(self):
entries = self._parse(SYSLOG_SAMPLE)
assert entries[0].text.startswith("[sshd]")
def test_timestamp_parsed(self):
entries = self._parse(SYSLOG_SAMPLE)
assert "14:23:01" in entries[0].timestamp_iso
def test_space_padded_day(self):
entries = self._parse(SYSLOG_SAMPLE)
cron_entry = entries[4]
assert "04:00:00" in cron_entry.timestamp_iso
def test_source_id_propagated(self):
assert all(e.source_id == "syslog_test" for e in self._parse(SYSLOG_SAMPLE))
def test_sequence_is_monotonic(self):
entries = self._parse(SYSLOG_SAMPLE)
seqs = [e.sequence for e in entries]
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
def test_severity_fallback(self):
# No explicit severity in syslog RFC3164 body — falls back to detect_severity
entries = self._parse(SYSLOG_SAMPLE)
assert entries[0].severity is None or isinstance(entries[0].severity, str)