feat: syslog and dmesg parsers with graceful journald fallback

- Add syslog.py — RFC 3164 parser for /var/log/syslog, /var/log/messages,
  auth.log, kern.log; ident prepended to message text for searchability
- Add dmesg_log.py — handles both relative [secs.usecs] and human-readable
  [Dow Mon DD HH:MM:SS YYYY] formats; relative timestamps preserved as raw
- Wire both into pipeline.py auto-detection (before plaintext fallback)
- Update export_journal.sh: checks for journalctl availability, falls back
  gracefully on non-systemd systems; adds dmesg -T export (falls back to
  plain dmesg on older kernels)
- Add syslog entries (commented) + dmesg source to sources.yaml
- 30 tests covering both parsers (detection + parse correctness)
This commit is contained in:
pyr0ball 2026-05-11 06:57:38 -07:00
parent 286778d6a9
commit 346ea6e0c6
7 changed files with 424 additions and 17 deletions

102
app/ingest/dmesg_log.py Normal file
View file

@ -0,0 +1,102 @@
"""dmesg log parser.
Handles two formats:
Relative (always available):
[ 0.000000] Linux version 6.8.0-65-generic
[12345.678901] usb 1-1: USB disconnect, device number 2
Human-readable (dmesg -T, util-linux >= 2.21):
[Mon May 11 14:23:01 2026] usb 1-1: USB disconnect, device number 2
The export_journal.sh script exports with -T when available, falling back
to plain dmesg. Relative-timestamp entries get no timestamp_iso.
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_DAYS = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}
_MONTHS_ABBR = {
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
"Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
}
# [ 0.000000] or [12345.678901]
_RELATIVE_RE = re.compile(r"^\[\s*(?P<secs>\d+\.\d+)\]\s+(?P<msg>.+)$")
# [Mon May 11 14:23:01 2026]
_HUMAN_RE = re.compile(
r"^\[(?P<dow>\w{3})\s+(?P<month>\w{3})\s+(?P<day>\d{1,2})"
r"\s+(?P<time>\d{2}:\d{2}:\d{2})\s+(?P<year>\d{4})\]\s+(?P<msg>.+)$"
)
def is_dmesg_log(first_line: str) -> bool:
line = first_line.strip()
return bool(_RELATIVE_RE.match(line) or _HUMAN_RE.match(line))
def _parse_human_ts(m: re.Match) -> tuple[str, str]:
ts_raw = f"{m.group('dow')} {m.group('month')} {m.group('day')} {m.group('time')} {m.group('year')}"
month = _MONTHS_ABBR.get(m.group("month"), 1)
try:
dt = datetime(
int(m.group("year")), month, int(m.group("day")),
*[int(p) for p in m.group("time").split(":")],
tzinfo=timezone.utc,
)
return ts_raw, dt.isoformat()
except ValueError:
return ts_raw, ""
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
for raw_line in lines:
line = raw_line.rstrip("\n").strip()
if not line:
continue
m = _HUMAN_RE.match(line)
if m:
ts_raw, ts_iso = _parse_human_ts(m)
msg = m.group("msg")
else:
m = _RELATIVE_RE.match(line)
if not m:
continue
ts_raw = f"[{m.group('secs')}]"
ts_iso = ""
msg = m.group("msg")
severity = detect_severity(msg)
repeat, out_of_order = state.observe(msg, ts_iso or None)
matched = apply_patterns(msg, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, msg),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=ts_raw,
timestamp_iso=ts_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=msg,
)

View file

@ -10,7 +10,7 @@ from typing import Iterator
import yaml import yaml
from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent, servarr from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog
from app.ingest.base import _compile, load_patterns, now_iso from app.ingest.base import _compile, load_patterns, now_iso
from app.services.models import LogPattern, RetrievedEntry from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index from app.services.search import build_fts_index
@ -99,6 +99,10 @@ def _detect_format(first_line: str) -> str:
return "qbittorrent" return "qbittorrent"
if servarr.is_servarr_log(first_line): if servarr.is_servarr_log(first_line):
return "servarr" return "servarr"
if dmesg_log.is_dmesg_log(first_line):
return "dmesg"
if syslog.is_syslog(first_line):
return "syslog"
return "plaintext" return "plaintext"
@ -136,6 +140,10 @@ def _parse_file(
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "servarr": elif fmt == "servarr":
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time) yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "dmesg":
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "syslog":
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
else: else:
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)

100
app/ingest/syslog.py Normal file
View file

@ -0,0 +1,100 @@
"""Traditional syslog (RFC 3164) parser.
Handles the format written by rsyslog and syslogd on most Linux distros:
May 11 14:23:01 hostname sshd[1234]: Accepted publickey for x from ...
May 11 14:23:01 hostname kernel: [12345.678] usb disconnect
Files: /var/log/syslog (Debian/Ubuntu), /var/log/messages (RHEL/Fedora),
/var/log/auth.log, /var/log/kern.log
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_MONTHS = {
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
"Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
}
# May 11 14:23:01 hostname ident[pid]: message
# May 1 04:00:00 hostname ident: message (no pid, day may be space-padded)
_LINE_RE = re.compile(
r"^(?P<month>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)"
r"\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})"
r"\s+(?P<host>\S+)"
r"\s+(?P<ident>[^\[:\s]{1,48})(?:\[(?P<pid>\d+)\])?:\s*(?P<msg>.*)$"
)
def is_syslog(first_line: str) -> bool:
return bool(_LINE_RE.match(first_line.strip()))
def _parse_ts(month_str: str, day: str, time_str: str) -> tuple[str, str]:
month = _MONTHS.get(month_str, 1)
year = datetime.now(timezone.utc).year
ts_raw = f"{month_str} {int(day):2d} {time_str}"
try:
dt = datetime(year, month, int(day),
*[int(p) for p in time_str.split(":")],
tzinfo=timezone.utc)
return ts_raw, dt.isoformat()
except ValueError:
return ts_raw, ""
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
pending_text: str | None = None
pending_meta: dict = {}
def _emit(text: str, meta: dict) -> RetrievedEntry:
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
matched = apply_patterns(text, compiled_patterns)
return RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, text),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=meta.get("ts_raw", ""),
timestamp_iso=meta.get("ts_iso", ""),
ingest_time=ingest_time,
severity=meta.get("severity"),
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)
for raw_line in lines:
line = raw_line.rstrip("\n")
m = _LINE_RE.match(line)
if m:
if pending_text is not None:
yield _emit(pending_text, pending_meta)
ts_raw, ts_iso = _parse_ts(m.group("month"), m.group("day"), m.group("time"))
ident = m.group("ident").strip()
msg = m.group("msg")
text = f"[{ident}] {msg}" if ident else msg
severity = detect_severity(msg)
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
pending_text = text
elif pending_text is not None:
pending_text += "\n" + line.strip()
if pending_text is not None:
yield _emit(pending_text, pending_meta)

View file

@ -10,10 +10,30 @@
# services that are temporarily down. # services that are temporarily down.
sources: sources:
# ── System journal (exported by export_journal.sh on the host) ──────────── # ── System (exported by export_journal.sh on the host) ───────────────────
# journal-export.jsonl and dmesg-export.txt are written to /opt/turnstone/data/
# by the export script before each ingest run.
- id: system-journal - id: system-journal
path: /data/journal-export.jsonl path: /data/journal-export.jsonl
- id: dmesg
path: /data/dmesg-export.txt
# ── Syslog / rsyslog (direct file reads via /var/log bind mount) ──────────
# Uncomment the file(s) present on your system.
# Debian/Ubuntu:
# - id: syslog
# path: /var/log/syslog
# - id: auth-log
# path: /var/log/auth.log
# - id: kern-log
# path: /var/log/kern.log
# RHEL/Fedora/Rocky:
# - id: messages
# path: /var/log/messages
# - id: secure
# path: /var/log/secure
# ── Download ───────────────────────────────────────────────────────────── # ── Download ─────────────────────────────────────────────────────────────
- id: qbittorrent - id: qbittorrent
path: /opt/qbittorrent/config/data/logs/qbittorrent.log path: /opt/qbittorrent/config/data/logs/qbittorrent.log

View file

@ -1,16 +1,17 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Export recent journald entries to a JSONL file the Turnstone container can ingest. # Export recent system messages to files the Turnstone container can ingest.
# #
# Run this on the HOST before the container ingest step. The output file lands in # Exports:
# /opt/turnstone/data/ which is bind-mounted at /data/ inside the container. # journal-export.jsonl — journald (if journalctl is available)
# dmesg-export.txt — kernel ring buffer (always)
# #
# Priority filter 0-5 (emerg→notice) skips debug/info noise while keeping # Output files land in /opt/turnstone/data/ which is bind-mounted at /data/
# all warnings, errors, and service lifecycle events. # inside the container.
# #
# Usage (standalone): # Usage (standalone):
# sudo bash /opt/turnstone/scripts/export_journal.sh # sudo bash /opt/turnstone/scripts/export_journal.sh
# #
# Typical cron (combined with ingest — see crontab comment below): # Cron (combined with ingest):
# */15 * * * * bash /opt/turnstone/scripts/export_journal.sh && \ # */15 * * * * bash /opt/turnstone/scripts/export_journal.sh && \
# podman exec turnstone python scripts/ingest_corpus.py \ # podman exec turnstone python scripts/ingest_corpus.py \
# --sources /patterns/sources.yaml --db /data/turnstone.db \ # --sources /patterns/sources.yaml --db /data/turnstone.db \
@ -18,15 +19,31 @@
set -euo pipefail set -euo pipefail
OUT=/opt/turnstone/data/journal-export.jsonl DATA_DIR=/opt/turnstone/data
# ── journald ─────────────────────────────────────────────────────────────────
# 20-minute window (slightly wider than the 15-min cron interval) ensures no # 20-minute window (slightly wider than the 15-min cron interval) ensures no
# gaps between runs. Ingest is idempotent via entry_id hash, so overlap is safe. # gaps between runs. Ingest deduplicates via entry_id hash so overlap is safe.
journalctl \ if command -v journalctl &>/dev/null; then
--output=json \ journalctl \
--priority=0..5 \ --output=json \
--since "20 minutes ago" \ --priority=0..5 \
--no-pager \ --since "20 minutes ago" \
> "${OUT}" --no-pager \
> "${DATA_DIR}/journal-export.jsonl"
echo "journald: $(wc -l < "${DATA_DIR}/journal-export.jsonl") entries"
else
# No journald — write an empty file so the sources.yaml entry doesn't warn
: > "${DATA_DIR}/journal-export.jsonl"
echo "journald: not available (skipped)"
fi
echo "Exported $(wc -l < "${OUT}") journal entries to ${OUT}" # ── dmesg ─────────────────────────────────────────────────────────────────────
# Use -T for human-readable timestamps when available (util-linux >= 2.21).
# Fall back to plain dmesg if -T is not supported.
if dmesg -T &>/dev/null; then
dmesg -T > "${DATA_DIR}/dmesg-export.txt"
else
dmesg > "${DATA_DIR}/dmesg-export.txt"
fi
echo "dmesg: $(wc -l < "${DATA_DIR}/dmesg-export.txt") lines"

View file

@ -0,0 +1,90 @@
"""Tests for the dmesg log ingestor."""
from __future__ import annotations
from app.ingest.dmesg_log import is_dmesg_log, parse
RELATIVE_SAMPLE = """\
[ 0.000000] Linux version 6.8.0-65-generic
[ 0.012345] Command line: BOOT_IMAGE=/vmlinuz-6.8.0-65-generic
[12345.678901] usb 1-1: USB disconnect, device number 2
[12400.000000] EXT4-fs error (device sda1): ext4_find_entry: reading directory
[12401.000000] Out of memory: Kill process 1234 (firefox) score 900
"""
HUMAN_SAMPLE = """\
[Mon May 11 14:23:01 2026] Linux version 6.8.0-65-generic
[Mon May 11 14:23:02 2026] usb 1-1: USB disconnect, device number 2
[Mon May 11 14:24:00 2026] ata1: SATA link up 6.0 Gbps (SStatus 133 SControl 300)
[Mon May 11 14:25:00 2026] Out of memory: Kill process 5678 (python3) score 800
"""
MIXED_SAMPLE = "[ 0.000000] boot message\n[Mon May 11 14:23:01 2026] human ts message\n"
class TestDetector:
def test_detects_relative(self):
assert is_dmesg_log("[ 0.000000] Linux version 6.8.0")
def test_detects_relative_large_offset(self):
assert is_dmesg_log("[12345.678901] usb disconnect")
def test_detects_human_timestamp(self):
assert is_dmesg_log("[Mon May 11 14:23:01 2026] message")
def test_rejects_syslog(self):
assert not is_dmesg_log("May 11 14:23:01 hostname sshd[1234]: message")
def test_rejects_servarr(self):
assert not is_dmesg_log("2026-05-11 02:31:51.5|Info|Component|Message")
def test_rejects_plaintext(self):
assert not is_dmesg_log("just a plain log line")
class TestRelativeParser:
def _parse(self, text: str) -> list:
return list(parse(iter(text.splitlines(keepends=True)), "dmesg_test", []))
def test_entry_count(self):
assert len(self._parse(RELATIVE_SAMPLE)) == 5
def test_relative_ts_raw_preserved(self):
entries = self._parse(RELATIVE_SAMPLE)
assert entries[0].timestamp_raw == "[0.000000]"
def test_relative_ts_iso_empty(self):
# No absolute time available from relative timestamps
entries = self._parse(RELATIVE_SAMPLE)
assert entries[0].timestamp_iso == ""
def test_message_text(self):
entries = self._parse(RELATIVE_SAMPLE)
assert "Linux version" in entries[0].text
def test_source_id_propagated(self):
assert all(e.source_id == "dmesg_test" for e in self._parse(RELATIVE_SAMPLE))
def test_sequence_is_monotonic(self):
entries = self._parse(RELATIVE_SAMPLE)
seqs = [e.sequence for e in entries]
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
class TestHumanParser:
def _parse(self, text: str) -> list:
return list(parse(iter(text.splitlines(keepends=True)), "dmesg_human", []))
def test_entry_count(self):
assert len(self._parse(HUMAN_SAMPLE)) == 4
def test_timestamp_parsed(self):
entries = self._parse(HUMAN_SAMPLE)
assert entries[0].timestamp_iso == "2026-05-11T14:23:01+00:00"
def test_timestamp_raw(self):
entries = self._parse(HUMAN_SAMPLE)
assert "Mon May 11" in entries[0].timestamp_raw
def test_message_text(self):
entries = self._parse(HUMAN_SAMPLE)
assert "Linux version" in entries[0].text

View file

@ -0,0 +1,70 @@
"""Tests for the syslog (RFC 3164) ingestor."""
from __future__ import annotations
from app.ingest.syslog import is_syslog, parse
SYSLOG_SAMPLE = """\
May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
"""
class TestDetector:
def test_detects_standard_line(self):
assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
def test_detects_no_pid(self):
assert is_syslog("May 11 14:23:01 example-node kernel: message")
def test_detects_space_padded_day(self):
assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
def test_rejects_servarr(self):
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")
def test_rejects_journald_json(self):
assert not is_syslog('{"__REALTIME_TIMESTAMP": "12345", "MESSAGE": "hi"}')
def test_rejects_dmesg_relative(self):
assert not is_syslog("[ 0.000000] Linux version 6.8.0")
def test_rejects_plaintext(self):
assert not is_syslog("just a plain text line with no structure")
class TestParser:
def _parse(self, text: str) -> list:
return list(parse(iter(text.splitlines(keepends=True)), "syslog_test", []))
def test_entry_count(self):
assert len(self._parse(SYSLOG_SAMPLE)) == 6
def test_ident_prepended(self):
entries = self._parse(SYSLOG_SAMPLE)
assert entries[0].text.startswith("[sshd]")
def test_timestamp_parsed(self):
entries = self._parse(SYSLOG_SAMPLE)
assert "14:23:01" in entries[0].timestamp_iso
def test_space_padded_day(self):
entries = self._parse(SYSLOG_SAMPLE)
cron_entry = entries[4]
assert "04:00:00" in cron_entry.timestamp_iso
def test_source_id_propagated(self):
assert all(e.source_id == "syslog_test" for e in self._parse(SYSLOG_SAMPLE))
def test_sequence_is_monotonic(self):
entries = self._parse(SYSLOG_SAMPLE)
seqs = [e.sequence for e in entries]
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
def test_severity_fallback(self):
# No explicit severity in syslog RFC3164 body — falls back to detect_severity
entries = self._parse(SYSLOG_SAMPLE)
assert entries[0].severity is None or isinstance(entries[0].severity, str)