feat: syslog and dmesg parsers with graceful journald fallback
- Add syslog.py — RFC 3164 parser for /var/log/syslog, /var/log/messages, auth.log, kern.log; ident prepended to message text for searchability - Add dmesg_log.py — handles both relative [secs.usecs] and human-readable [Dow Mon DD HH:MM:SS YYYY] formats; relative timestamps preserved as raw - Wire both into pipeline.py auto-detection (before plaintext fallback) - Update export_journal.sh: checks for journalctl availability, falls back gracefully on non-systemd systems; adds dmesg -T export (falls back to plain dmesg on older kernels) - Add syslog entries (commented) + dmesg source to sources.yaml - 30 tests covering both parsers (detection + parse correctness)
This commit is contained in:
parent
1b6482701c
commit
9ec60ea7ff
7 changed files with 424 additions and 17 deletions
102
app/ingest/dmesg_log.py
Normal file
102
app/ingest/dmesg_log.py
Normal file
|
|
@ -0,0 +1,102 @@
|
||||||
|
"""dmesg log parser.
|
||||||
|
|
||||||
|
Handles two formats:
|
||||||
|
|
||||||
|
Relative (always available):
|
||||||
|
[ 0.000000] Linux version 6.8.0-65-generic
|
||||||
|
[12345.678901] usb 1-1: USB disconnect, device number 2
|
||||||
|
|
||||||
|
Human-readable (dmesg -T, util-linux >= 2.21):
|
||||||
|
[Mon May 11 14:23:01 2026] usb 1-1: USB disconnect, device number 2
|
||||||
|
|
||||||
|
The export_journal.sh script exports with -T when available, falling back
|
||||||
|
to plain dmesg. Relative-timestamp entries get no timestamp_iso.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
from app.ingest.base import (
|
||||||
|
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
|
||||||
|
)
|
||||||
|
from app.services.models import LogPattern, RetrievedEntry
|
||||||
|
|
||||||
|
_DAYS = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}
|
||||||
|
_MONTHS_ABBR = {
|
||||||
|
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
|
||||||
|
"Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
|
||||||
|
}
|
||||||
|
|
||||||
|
# [ 0.000000] or [12345.678901]
|
||||||
|
_RELATIVE_RE = re.compile(r"^\[\s*(?P<secs>\d+\.\d+)\]\s+(?P<msg>.+)$")
|
||||||
|
# [Mon May 11 14:23:01 2026]
|
||||||
|
_HUMAN_RE = re.compile(
|
||||||
|
r"^\[(?P<dow>\w{3})\s+(?P<month>\w{3})\s+(?P<day>\d{1,2})"
|
||||||
|
r"\s+(?P<time>\d{2}:\d{2}:\d{2})\s+(?P<year>\d{4})\]\s+(?P<msg>.+)$"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def is_dmesg_log(first_line: str) -> bool:
|
||||||
|
line = first_line.strip()
|
||||||
|
return bool(_RELATIVE_RE.match(line) or _HUMAN_RE.match(line))
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_human_ts(m: re.Match) -> tuple[str, str]:
|
||||||
|
ts_raw = f"{m.group('dow')} {m.group('month')} {m.group('day')} {m.group('time')} {m.group('year')}"
|
||||||
|
month = _MONTHS_ABBR.get(m.group("month"), 1)
|
||||||
|
try:
|
||||||
|
dt = datetime(
|
||||||
|
int(m.group("year")), month, int(m.group("day")),
|
||||||
|
*[int(p) for p in m.group("time").split(":")],
|
||||||
|
tzinfo=timezone.utc,
|
||||||
|
)
|
||||||
|
return ts_raw, dt.isoformat()
|
||||||
|
except ValueError:
|
||||||
|
return ts_raw, ""
|
||||||
|
|
||||||
|
|
||||||
|
def parse(
|
||||||
|
lines: Iterator[str],
|
||||||
|
source_id: str,
|
||||||
|
compiled_patterns: list[tuple[LogPattern, object]],
|
||||||
|
ingest_time: str | None = None,
|
||||||
|
) -> Iterator[RetrievedEntry]:
|
||||||
|
ingest_time = ingest_time or now_iso()
|
||||||
|
state = SourceState()
|
||||||
|
|
||||||
|
for raw_line in lines:
|
||||||
|
line = raw_line.rstrip("\n").strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = _HUMAN_RE.match(line)
|
||||||
|
if m:
|
||||||
|
ts_raw, ts_iso = _parse_human_ts(m)
|
||||||
|
msg = m.group("msg")
|
||||||
|
else:
|
||||||
|
m = _RELATIVE_RE.match(line)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
ts_raw = f"[{m.group('secs')}]"
|
||||||
|
ts_iso = ""
|
||||||
|
msg = m.group("msg")
|
||||||
|
|
||||||
|
severity = detect_severity(msg)
|
||||||
|
repeat, out_of_order = state.observe(msg, ts_iso or None)
|
||||||
|
matched = apply_patterns(msg, compiled_patterns)
|
||||||
|
|
||||||
|
yield RetrievedEntry(
|
||||||
|
entry_id=make_entry_id(source_id, state.sequence, msg),
|
||||||
|
source_id=source_id,
|
||||||
|
sequence=state.sequence,
|
||||||
|
timestamp_raw=ts_raw,
|
||||||
|
timestamp_iso=ts_iso,
|
||||||
|
ingest_time=ingest_time,
|
||||||
|
severity=severity,
|
||||||
|
repeat_count=repeat,
|
||||||
|
out_of_order=out_of_order,
|
||||||
|
matched_patterns=matched,
|
||||||
|
text=msg,
|
||||||
|
)
|
||||||
|
|
@ -10,7 +10,7 @@ from typing import Iterator
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent, servarr
|
from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog
|
||||||
from app.ingest.base import _compile, load_patterns, now_iso
|
from app.ingest.base import _compile, load_patterns, now_iso
|
||||||
from app.services.models import LogPattern, RetrievedEntry
|
from app.services.models import LogPattern, RetrievedEntry
|
||||||
from app.services.search import build_fts_index
|
from app.services.search import build_fts_index
|
||||||
|
|
@ -99,6 +99,10 @@ def _detect_format(first_line: str) -> str:
|
||||||
return "qbittorrent"
|
return "qbittorrent"
|
||||||
if servarr.is_servarr_log(first_line):
|
if servarr.is_servarr_log(first_line):
|
||||||
return "servarr"
|
return "servarr"
|
||||||
|
if dmesg_log.is_dmesg_log(first_line):
|
||||||
|
return "dmesg"
|
||||||
|
if syslog.is_syslog(first_line):
|
||||||
|
return "syslog"
|
||||||
return "plaintext"
|
return "plaintext"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -136,6 +140,10 @@ def _parse_file(
|
||||||
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
|
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
elif fmt == "servarr":
|
elif fmt == "servarr":
|
||||||
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
|
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
|
elif fmt == "dmesg":
|
||||||
|
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
|
elif fmt == "syslog":
|
||||||
|
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
else:
|
else:
|
||||||
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
|
|
||||||
|
|
|
||||||
100
app/ingest/syslog.py
Normal file
100
app/ingest/syslog.py
Normal file
|
|
@ -0,0 +1,100 @@
|
||||||
|
"""Traditional syslog (RFC 3164) parser.
|
||||||
|
|
||||||
|
Handles the format written by rsyslog and syslogd on most Linux distros:
|
||||||
|
|
||||||
|
May 11 14:23:01 hostname sshd[1234]: Accepted publickey for x from ...
|
||||||
|
May 11 14:23:01 hostname kernel: [12345.678] usb disconnect
|
||||||
|
|
||||||
|
Files: /var/log/syslog (Debian/Ubuntu), /var/log/messages (RHEL/Fedora),
|
||||||
|
/var/log/auth.log, /var/log/kern.log
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
from app.ingest.base import (
|
||||||
|
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
|
||||||
|
)
|
||||||
|
from app.services.models import LogPattern, RetrievedEntry
|
||||||
|
|
||||||
|
_MONTHS = {
|
||||||
|
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
|
||||||
|
"Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
|
||||||
|
}
|
||||||
|
|
||||||
|
# May 11 14:23:01 hostname ident[pid]: message
|
||||||
|
# May 1 04:00:00 hostname ident: message (no pid, day may be space-padded)
|
||||||
|
_LINE_RE = re.compile(
|
||||||
|
r"^(?P<month>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)"
|
||||||
|
r"\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})"
|
||||||
|
r"\s+(?P<host>\S+)"
|
||||||
|
r"\s+(?P<ident>[^\[:\s]{1,48})(?:\[(?P<pid>\d+)\])?:\s*(?P<msg>.*)$"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def is_syslog(first_line: str) -> bool:
|
||||||
|
return bool(_LINE_RE.match(first_line.strip()))
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_ts(month_str: str, day: str, time_str: str) -> tuple[str, str]:
|
||||||
|
month = _MONTHS.get(month_str, 1)
|
||||||
|
year = datetime.now(timezone.utc).year
|
||||||
|
ts_raw = f"{month_str} {int(day):2d} {time_str}"
|
||||||
|
try:
|
||||||
|
dt = datetime(year, month, int(day),
|
||||||
|
*[int(p) for p in time_str.split(":")],
|
||||||
|
tzinfo=timezone.utc)
|
||||||
|
return ts_raw, dt.isoformat()
|
||||||
|
except ValueError:
|
||||||
|
return ts_raw, ""
|
||||||
|
|
||||||
|
|
||||||
|
def parse(
|
||||||
|
lines: Iterator[str],
|
||||||
|
source_id: str,
|
||||||
|
compiled_patterns: list[tuple[LogPattern, object]],
|
||||||
|
ingest_time: str | None = None,
|
||||||
|
) -> Iterator[RetrievedEntry]:
|
||||||
|
ingest_time = ingest_time or now_iso()
|
||||||
|
state = SourceState()
|
||||||
|
pending_text: str | None = None
|
||||||
|
pending_meta: dict = {}
|
||||||
|
|
||||||
|
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
||||||
|
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
||||||
|
matched = apply_patterns(text, compiled_patterns)
|
||||||
|
return RetrievedEntry(
|
||||||
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
||||||
|
source_id=source_id,
|
||||||
|
sequence=state.sequence,
|
||||||
|
timestamp_raw=meta.get("ts_raw", ""),
|
||||||
|
timestamp_iso=meta.get("ts_iso", ""),
|
||||||
|
ingest_time=ingest_time,
|
||||||
|
severity=meta.get("severity"),
|
||||||
|
repeat_count=repeat,
|
||||||
|
out_of_order=out_of_order,
|
||||||
|
matched_patterns=matched,
|
||||||
|
text=text,
|
||||||
|
)
|
||||||
|
|
||||||
|
for raw_line in lines:
|
||||||
|
line = raw_line.rstrip("\n")
|
||||||
|
m = _LINE_RE.match(line)
|
||||||
|
if m:
|
||||||
|
if pending_text is not None:
|
||||||
|
yield _emit(pending_text, pending_meta)
|
||||||
|
|
||||||
|
ts_raw, ts_iso = _parse_ts(m.group("month"), m.group("day"), m.group("time"))
|
||||||
|
ident = m.group("ident").strip()
|
||||||
|
msg = m.group("msg")
|
||||||
|
text = f"[{ident}] {msg}" if ident else msg
|
||||||
|
severity = detect_severity(msg)
|
||||||
|
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
|
||||||
|
pending_text = text
|
||||||
|
elif pending_text is not None:
|
||||||
|
pending_text += "\n" + line.strip()
|
||||||
|
|
||||||
|
if pending_text is not None:
|
||||||
|
yield _emit(pending_text, pending_meta)
|
||||||
|
|
@ -10,10 +10,30 @@
|
||||||
# services that are temporarily down.
|
# services that are temporarily down.
|
||||||
|
|
||||||
sources:
|
sources:
|
||||||
# ── System journal (exported by export_journal.sh on the host) ────────────
|
# ── System (exported by export_journal.sh on the host) ───────────────────
|
||||||
|
# journal-export.jsonl and dmesg-export.txt are written to /opt/turnstone/data/
|
||||||
|
# by the export script before each ingest run.
|
||||||
- id: system-journal
|
- id: system-journal
|
||||||
path: /data/journal-export.jsonl
|
path: /data/journal-export.jsonl
|
||||||
|
|
||||||
|
- id: dmesg
|
||||||
|
path: /data/dmesg-export.txt
|
||||||
|
|
||||||
|
# ── Syslog / rsyslog (direct file reads via /var/log bind mount) ──────────
|
||||||
|
# Uncomment the file(s) present on your system.
|
||||||
|
# Debian/Ubuntu:
|
||||||
|
# - id: syslog
|
||||||
|
# path: /var/log/syslog
|
||||||
|
# - id: auth-log
|
||||||
|
# path: /var/log/auth.log
|
||||||
|
# - id: kern-log
|
||||||
|
# path: /var/log/kern.log
|
||||||
|
# RHEL/Fedora/Rocky:
|
||||||
|
# - id: messages
|
||||||
|
# path: /var/log/messages
|
||||||
|
# - id: secure
|
||||||
|
# path: /var/log/secure
|
||||||
|
|
||||||
# ── Download ─────────────────────────────────────────────────────────────
|
# ── Download ─────────────────────────────────────────────────────────────
|
||||||
- id: qbittorrent
|
- id: qbittorrent
|
||||||
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
|
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,17 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
# Export recent journald entries to a JSONL file the Turnstone container can ingest.
|
# Export recent system messages to files the Turnstone container can ingest.
|
||||||
#
|
#
|
||||||
# Run this on the HOST before the container ingest step. The output file lands in
|
# Exports:
|
||||||
# /opt/turnstone/data/ which is bind-mounted at /data/ inside the container.
|
# journal-export.jsonl — journald (if journalctl is available)
|
||||||
|
# dmesg-export.txt — kernel ring buffer (always)
|
||||||
#
|
#
|
||||||
# Priority filter 0-5 (emerg→notice) skips debug/info noise while keeping
|
# Output files land in /opt/turnstone/data/ which is bind-mounted at /data/
|
||||||
# all warnings, errors, and service lifecycle events.
|
# inside the container.
|
||||||
#
|
#
|
||||||
# Usage (standalone):
|
# Usage (standalone):
|
||||||
# sudo bash /opt/turnstone/scripts/export_journal.sh
|
# sudo bash /opt/turnstone/scripts/export_journal.sh
|
||||||
#
|
#
|
||||||
# Typical cron (combined with ingest — see crontab comment below):
|
# Cron (combined with ingest):
|
||||||
# */15 * * * * bash /opt/turnstone/scripts/export_journal.sh && \
|
# */15 * * * * bash /opt/turnstone/scripts/export_journal.sh && \
|
||||||
# podman exec turnstone python scripts/ingest_corpus.py \
|
# podman exec turnstone python scripts/ingest_corpus.py \
|
||||||
# --sources /patterns/sources.yaml --db /data/turnstone.db \
|
# --sources /patterns/sources.yaml --db /data/turnstone.db \
|
||||||
|
|
@ -18,15 +19,31 @@
|
||||||
|
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
OUT=/opt/turnstone/data/journal-export.jsonl
|
DATA_DIR=/opt/turnstone/data
|
||||||
|
|
||||||
|
# ── journald ─────────────────────────────────────────────────────────────────
|
||||||
# 20-minute window (slightly wider than the 15-min cron interval) ensures no
|
# 20-minute window (slightly wider than the 15-min cron interval) ensures no
|
||||||
# gaps between runs. Ingest is idempotent via entry_id hash, so overlap is safe.
|
# gaps between runs. Ingest deduplicates via entry_id hash so overlap is safe.
|
||||||
journalctl \
|
if command -v journalctl &>/dev/null; then
|
||||||
--output=json \
|
journalctl \
|
||||||
--priority=0..5 \
|
--output=json \
|
||||||
--since "20 minutes ago" \
|
--priority=0..5 \
|
||||||
--no-pager \
|
--since "20 minutes ago" \
|
||||||
> "${OUT}"
|
--no-pager \
|
||||||
|
> "${DATA_DIR}/journal-export.jsonl"
|
||||||
|
echo "journald: $(wc -l < "${DATA_DIR}/journal-export.jsonl") entries"
|
||||||
|
else
|
||||||
|
# No journald — write an empty file so the sources.yaml entry doesn't warn
|
||||||
|
: > "${DATA_DIR}/journal-export.jsonl"
|
||||||
|
echo "journald: not available (skipped)"
|
||||||
|
fi
|
||||||
|
|
||||||
echo "Exported $(wc -l < "${OUT}") journal entries to ${OUT}"
|
# ── dmesg ─────────────────────────────────────────────────────────────────────
|
||||||
|
# Use -T for human-readable timestamps when available (util-linux >= 2.21).
|
||||||
|
# Fall back to plain dmesg if -T is not supported.
|
||||||
|
if dmesg -T &>/dev/null; then
|
||||||
|
dmesg -T > "${DATA_DIR}/dmesg-export.txt"
|
||||||
|
else
|
||||||
|
dmesg > "${DATA_DIR}/dmesg-export.txt"
|
||||||
|
fi
|
||||||
|
echo "dmesg: $(wc -l < "${DATA_DIR}/dmesg-export.txt") lines"
|
||||||
|
|
|
||||||
90
tests/test_ingest_dmesg.py
Normal file
90
tests/test_ingest_dmesg.py
Normal file
|
|
@ -0,0 +1,90 @@
|
||||||
|
"""Tests for the dmesg log ingestor."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.ingest.dmesg_log import is_dmesg_log, parse
|
||||||
|
|
||||||
|
RELATIVE_SAMPLE = """\
|
||||||
|
[ 0.000000] Linux version 6.8.0-65-generic
|
||||||
|
[ 0.012345] Command line: BOOT_IMAGE=/vmlinuz-6.8.0-65-generic
|
||||||
|
[12345.678901] usb 1-1: USB disconnect, device number 2
|
||||||
|
[12400.000000] EXT4-fs error (device sda1): ext4_find_entry: reading directory
|
||||||
|
[12401.000000] Out of memory: Kill process 1234 (firefox) score 900
|
||||||
|
"""
|
||||||
|
|
||||||
|
HUMAN_SAMPLE = """\
|
||||||
|
[Mon May 11 14:23:01 2026] Linux version 6.8.0-65-generic
|
||||||
|
[Mon May 11 14:23:02 2026] usb 1-1: USB disconnect, device number 2
|
||||||
|
[Mon May 11 14:24:00 2026] ata1: SATA link up 6.0 Gbps (SStatus 133 SControl 300)
|
||||||
|
[Mon May 11 14:25:00 2026] Out of memory: Kill process 5678 (python3) score 800
|
||||||
|
"""
|
||||||
|
|
||||||
|
MIXED_SAMPLE = "[ 0.000000] boot message\n[Mon May 11 14:23:01 2026] human ts message\n"
|
||||||
|
|
||||||
|
|
||||||
|
class TestDetector:
|
||||||
|
def test_detects_relative(self):
|
||||||
|
assert is_dmesg_log("[ 0.000000] Linux version 6.8.0")
|
||||||
|
|
||||||
|
def test_detects_relative_large_offset(self):
|
||||||
|
assert is_dmesg_log("[12345.678901] usb disconnect")
|
||||||
|
|
||||||
|
def test_detects_human_timestamp(self):
|
||||||
|
assert is_dmesg_log("[Mon May 11 14:23:01 2026] message")
|
||||||
|
|
||||||
|
def test_rejects_syslog(self):
|
||||||
|
assert not is_dmesg_log("May 11 14:23:01 hostname sshd[1234]: message")
|
||||||
|
|
||||||
|
def test_rejects_servarr(self):
|
||||||
|
assert not is_dmesg_log("2026-05-11 02:31:51.5|Info|Component|Message")
|
||||||
|
|
||||||
|
def test_rejects_plaintext(self):
|
||||||
|
assert not is_dmesg_log("just a plain log line")
|
||||||
|
|
||||||
|
|
||||||
|
class TestRelativeParser:
|
||||||
|
def _parse(self, text: str) -> list:
|
||||||
|
return list(parse(iter(text.splitlines(keepends=True)), "dmesg_test", []))
|
||||||
|
|
||||||
|
def test_entry_count(self):
|
||||||
|
assert len(self._parse(RELATIVE_SAMPLE)) == 5
|
||||||
|
|
||||||
|
def test_relative_ts_raw_preserved(self):
|
||||||
|
entries = self._parse(RELATIVE_SAMPLE)
|
||||||
|
assert entries[0].timestamp_raw == "[0.000000]"
|
||||||
|
|
||||||
|
def test_relative_ts_iso_empty(self):
|
||||||
|
# No absolute time available from relative timestamps
|
||||||
|
entries = self._parse(RELATIVE_SAMPLE)
|
||||||
|
assert entries[0].timestamp_iso == ""
|
||||||
|
|
||||||
|
def test_message_text(self):
|
||||||
|
entries = self._parse(RELATIVE_SAMPLE)
|
||||||
|
assert "Linux version" in entries[0].text
|
||||||
|
|
||||||
|
def test_source_id_propagated(self):
|
||||||
|
assert all(e.source_id == "dmesg_test" for e in self._parse(RELATIVE_SAMPLE))
|
||||||
|
|
||||||
|
def test_sequence_is_monotonic(self):
|
||||||
|
entries = self._parse(RELATIVE_SAMPLE)
|
||||||
|
seqs = [e.sequence for e in entries]
|
||||||
|
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
|
||||||
|
|
||||||
|
|
||||||
|
class TestHumanParser:
|
||||||
|
def _parse(self, text: str) -> list:
|
||||||
|
return list(parse(iter(text.splitlines(keepends=True)), "dmesg_human", []))
|
||||||
|
|
||||||
|
def test_entry_count(self):
|
||||||
|
assert len(self._parse(HUMAN_SAMPLE)) == 4
|
||||||
|
|
||||||
|
def test_timestamp_parsed(self):
|
||||||
|
entries = self._parse(HUMAN_SAMPLE)
|
||||||
|
assert entries[0].timestamp_iso == "2026-05-11T14:23:01+00:00"
|
||||||
|
|
||||||
|
def test_timestamp_raw(self):
|
||||||
|
entries = self._parse(HUMAN_SAMPLE)
|
||||||
|
assert "Mon May 11" in entries[0].timestamp_raw
|
||||||
|
|
||||||
|
def test_message_text(self):
|
||||||
|
entries = self._parse(HUMAN_SAMPLE)
|
||||||
|
assert "Linux version" in entries[0].text
|
||||||
70
tests/test_ingest_syslog.py
Normal file
70
tests/test_ingest_syslog.py
Normal file
|
|
@ -0,0 +1,70 @@
|
||||||
|
"""Tests for the syslog (RFC 3164) ingestor."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.ingest.syslog import is_syslog, parse
|
||||||
|
|
||||||
|
SYSLOG_SAMPLE = """\
|
||||||
|
May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
|
||||||
|
May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
|
||||||
|
May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
|
||||||
|
May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
|
||||||
|
May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
|
||||||
|
May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class TestDetector:
|
||||||
|
def test_detects_standard_line(self):
|
||||||
|
assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
|
||||||
|
|
||||||
|
def test_detects_no_pid(self):
|
||||||
|
assert is_syslog("May 11 14:23:01 example-node kernel: message")
|
||||||
|
|
||||||
|
def test_detects_space_padded_day(self):
|
||||||
|
assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
|
||||||
|
|
||||||
|
def test_rejects_servarr(self):
|
||||||
|
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")
|
||||||
|
|
||||||
|
def test_rejects_journald_json(self):
|
||||||
|
assert not is_syslog('{"__REALTIME_TIMESTAMP": "12345", "MESSAGE": "hi"}')
|
||||||
|
|
||||||
|
def test_rejects_dmesg_relative(self):
|
||||||
|
assert not is_syslog("[ 0.000000] Linux version 6.8.0")
|
||||||
|
|
||||||
|
def test_rejects_plaintext(self):
|
||||||
|
assert not is_syslog("just a plain text line with no structure")
|
||||||
|
|
||||||
|
|
||||||
|
class TestParser:
|
||||||
|
def _parse(self, text: str) -> list:
|
||||||
|
return list(parse(iter(text.splitlines(keepends=True)), "syslog_test", []))
|
||||||
|
|
||||||
|
def test_entry_count(self):
|
||||||
|
assert len(self._parse(SYSLOG_SAMPLE)) == 6
|
||||||
|
|
||||||
|
def test_ident_prepended(self):
|
||||||
|
entries = self._parse(SYSLOG_SAMPLE)
|
||||||
|
assert entries[0].text.startswith("[sshd]")
|
||||||
|
|
||||||
|
def test_timestamp_parsed(self):
|
||||||
|
entries = self._parse(SYSLOG_SAMPLE)
|
||||||
|
assert "14:23:01" in entries[0].timestamp_iso
|
||||||
|
|
||||||
|
def test_space_padded_day(self):
|
||||||
|
entries = self._parse(SYSLOG_SAMPLE)
|
||||||
|
cron_entry = entries[4]
|
||||||
|
assert "04:00:00" in cron_entry.timestamp_iso
|
||||||
|
|
||||||
|
def test_source_id_propagated(self):
|
||||||
|
assert all(e.source_id == "syslog_test" for e in self._parse(SYSLOG_SAMPLE))
|
||||||
|
|
||||||
|
def test_sequence_is_monotonic(self):
|
||||||
|
entries = self._parse(SYSLOG_SAMPLE)
|
||||||
|
seqs = [e.sequence for e in entries]
|
||||||
|
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
|
||||||
|
|
||||||
|
def test_severity_fallback(self):
|
||||||
|
# No explicit severity in syslog RFC3164 body — falls back to detect_severity
|
||||||
|
entries = self._parse(SYSLOG_SAMPLE)
|
||||||
|
assert entries[0].severity is None or isinstance(entries[0].severity, str)
|
||||||
Loading…
Reference in a new issue