scripts/syslog_receiver.py: asyncio UDP server listening on port 5140, appends raw syslog lines to network-syslog.txt for the Turnstone live watcher to tail. Requires no root — port 5140 is non-privileged. scripts/turnstone-syslog-receiver.service: systemd unit for auto-start. app/ingest/syslog.py: strip optional RFC 3164 <PRI> prefix before parsing so network-forwarded syslog (OpenWRT logd, Arista EOS, etc.) is handled correctly without the PRI value breaking the regex.
103 lines
3.6 KiB
Python
103 lines
3.6 KiB
Python
"""Traditional syslog (RFC 3164) parser.
|
|
|
|
Handles the format written by rsyslog and syslogd on most Linux distros:
|
|
|
|
May 11 14:23:01 hostname sshd[1234]: Accepted publickey for x from ...
|
|
May 11 14:23:01 hostname kernel: [12345.678] usb disconnect
|
|
|
|
Files: /var/log/syslog (Debian/Ubuntu), /var/log/messages (RHEL/Fedora),
|
|
/var/log/auth.log, /var/log/kern.log
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Iterator
|
|
|
|
from app.ingest.base import (
|
|
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
_MONTHS = {
|
|
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
|
|
"Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
|
|
}
|
|
|
|
# May 11 14:23:01 hostname ident[pid]: message
|
|
# May 1 04:00:00 hostname ident: message (no pid, day may be space-padded)
|
|
# <134>May 11 14:23:01 ... (optional RFC 3164 PRI prefix from network syslog)
|
|
_PRI_RE = re.compile(r"^<\d{1,3}>")
|
|
_LINE_RE = re.compile(
|
|
r"^(?P<month>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)"
|
|
r"\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})"
|
|
r"\s+(?P<host>\S+)"
|
|
r"\s+(?P<ident>[^\[:\s]{1,48})(?:\[(?P<pid>\d+)\])?:\s*(?P<msg>.*)$"
|
|
)
|
|
|
|
|
|
def is_syslog(first_line: str) -> bool:
|
|
stripped = _PRI_RE.sub("", first_line.strip(), count=1)
|
|
return bool(_LINE_RE.match(stripped))
|
|
|
|
|
|
def _parse_ts(month_str: str, day: str, time_str: str) -> tuple[str, str]:
|
|
month = _MONTHS.get(month_str, 1)
|
|
year = datetime.now(timezone.utc).year
|
|
ts_raw = f"{month_str} {int(day):2d} {time_str}"
|
|
try:
|
|
dt = datetime(year, month, int(day),
|
|
*[int(p) for p in time_str.split(":")],
|
|
tzinfo=timezone.utc)
|
|
return ts_raw, dt.isoformat()
|
|
except ValueError:
|
|
return ts_raw, ""
|
|
|
|
|
|
def parse(
|
|
lines: Iterator[str],
|
|
source_id: str,
|
|
compiled_patterns: list[tuple[LogPattern, object]],
|
|
ingest_time: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
ingest_time = ingest_time or now_iso()
|
|
state = SourceState()
|
|
pending_text: str | None = None
|
|
pending_meta: dict = {}
|
|
|
|
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
|
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
|
matched = apply_patterns(text, compiled_patterns)
|
|
return RetrievedEntry(
|
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
|
source_id=source_id,
|
|
sequence=state.sequence,
|
|
timestamp_raw=meta.get("ts_raw", ""),
|
|
timestamp_iso=meta.get("ts_iso", ""),
|
|
ingest_time=ingest_time,
|
|
severity=meta.get("severity"),
|
|
repeat_count=repeat,
|
|
out_of_order=out_of_order,
|
|
matched_patterns=matched,
|
|
text=text,
|
|
)
|
|
|
|
for raw_line in lines:
|
|
line = _PRI_RE.sub("", raw_line.rstrip("\n"), count=1)
|
|
m = _LINE_RE.match(line)
|
|
if m:
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|
|
|
|
ts_raw, ts_iso = _parse_ts(m.group("month"), m.group("day"), m.group("time"))
|
|
ident = m.group("ident").strip()
|
|
msg = m.group("msg")
|
|
text = f"[{ident}] {msg}" if ident else msg
|
|
severity = detect_severity(msg)
|
|
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
|
|
pending_text = text
|
|
elif pending_text is not None:
|
|
pending_text += "\n" + line.strip()
|
|
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|