"""Tests for the syslog (RFC 3164) gleaner.""" from __future__ import annotations from app.glean.syslog import is_syslog, parse SYSLOG_SAMPLE = """\ May 11 14:23:01 testhost sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2 May 11 14:23:05 testhost sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2 May 11 14:23:10 testhost sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update May 11 14:23:15 testhost kernel: [12345.678] usb 1-1: USB disconnect, device number 2 May 1 04:00:00 testhost CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh) May 11 14:24:00 testhost systemd[1]: Started NetworkManager. """ class TestDetector: def test_detects_standard_line(self): assert is_syslog("May 11 14:23:01 testhost sshd[1234]: message") def test_detects_no_pid(self): assert is_syslog("May 11 14:23:01 testhost kernel: message") def test_detects_space_padded_day(self): assert is_syslog("May 1 04:00:00 testhost CRON[9999]: message") def test_rejects_servarr(self): assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message") def test_rejects_journald_json(self): assert not is_syslog('{"__REALTIME_TIMESTAMP": "12345", "MESSAGE": "hi"}') def test_rejects_dmesg_relative(self): assert not is_syslog("[ 0.000000] Linux version 6.8.0") def test_rejects_plaintext(self): assert not is_syslog("just a plain text line with no structure") class TestParser: def _parse(self, text: str) -> list: return list(parse(iter(text.splitlines(keepends=True)), "syslog_test", [])) def test_entry_count(self): assert len(self._parse(SYSLOG_SAMPLE)) == 6 def test_ident_prepended(self): entries = self._parse(SYSLOG_SAMPLE) assert entries[0].text.startswith("[sshd]") def test_timestamp_parsed(self): from datetime import datetime entries = self._parse(SYSLOG_SAMPLE) ts = datetime.fromisoformat(entries[0].timestamp_iso) assert ts.utcoffset() is not None # stored as UTC-aware local = ts.astimezone() assert (local.hour, local.minute, local.second) == (14, 23, 1) def test_space_padded_day(self): from datetime import datetime entries = self._parse(SYSLOG_SAMPLE) ts = datetime.fromisoformat(entries[4].timestamp_iso) local = ts.astimezone() assert (local.hour, local.minute, local.second) == (4, 0, 0) def test_source_id_propagated(self): assert all(e.source_id == "syslog_test" for e in self._parse(SYSLOG_SAMPLE)) def test_sequence_is_monotonic(self): entries = self._parse(SYSLOG_SAMPLE) seqs = [e.sequence for e in entries] assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs) def test_severity_fallback(self): # No explicit severity in syslog RFC3164 body — falls back to detect_severity entries = self._parse(SYSLOG_SAMPLE) assert entries[0].severity is None or isinstance(entries[0].severity, str)