Ingest pipeline (journald / Caddy / Docker-wrapped formats) with per-source state tracking (repeat dedup, out-of-order detection), named pattern tagging at ingest time, and idempotent SHA1-keyed writes. FTS5 search layer with porter stemmer, severity/source/pattern/time filters, and BM25 ranking. MCP server (FastMCP stdio) with three tools: search_logs, diagnose, list_log_sources — compatible with both Claude Code and Copilot CLI. WAL mode enabled on all connections. FTS index auto-built after ingest. MCP configs included for Claude Code (.mcp.json) and Copilot CLI (.github/copilot/mcp.json).
75 lines
2.1 KiB
Python
75 lines
2.1 KiB
Python
"""Journald JSON (-o json) log parser."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Iterator
|
|
|
|
from app.ingest.base import (
|
|
SourceState, apply_patterns, epoch_micros_to_iso,
|
|
make_entry_id, now_iso, SYSLOG_PRIORITY,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
|
|
def _extract_message(raw: dict) -> str:
|
|
msg = raw.get("MESSAGE", "")
|
|
# journald encodes binary messages as arrays of ints
|
|
if isinstance(msg, list):
|
|
try:
|
|
return bytes(msg).decode("utf-8", errors="replace")
|
|
except Exception:
|
|
return repr(msg)
|
|
return str(msg)
|
|
|
|
|
|
def parse(
|
|
lines: Iterator[str],
|
|
source_id: str,
|
|
compiled_patterns: list[tuple[LogPattern, object]],
|
|
ingest_time: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
ingest_time = ingest_time or now_iso()
|
|
state = SourceState()
|
|
|
|
for raw_line in lines:
|
|
raw_line = raw_line.strip()
|
|
if not raw_line:
|
|
continue
|
|
try:
|
|
entry = json.loads(raw_line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if "__REALTIME_TIMESTAMP" not in entry:
|
|
continue
|
|
|
|
text = _extract_message(entry)
|
|
if not text:
|
|
continue
|
|
|
|
ts_raw = entry["__REALTIME_TIMESTAMP"]
|
|
ts_iso = epoch_micros_to_iso(ts_raw)
|
|
|
|
priority = entry.get("PRIORITY", "")
|
|
severity = SYSLOG_PRIORITY.get(str(priority))
|
|
|
|
hostname = entry.get("_HOSTNAME", "")
|
|
unit = entry.get("_SYSTEMD_UNIT") or entry.get("SYSLOG_IDENTIFIER", "")
|
|
src = f"{source_id}:{hostname}:{unit}" if hostname else source_id
|
|
|
|
repeat, out_of_order = state.observe(text, ts_iso)
|
|
matched = apply_patterns(text, compiled_patterns)
|
|
|
|
yield RetrievedEntry(
|
|
entry_id=make_entry_id(src, state.sequence, text),
|
|
source_id=src,
|
|
sequence=state.sequence,
|
|
timestamp_raw=ts_raw,
|
|
timestamp_iso=ts_iso,
|
|
ingest_time=ingest_time,
|
|
severity=severity,
|
|
repeat_count=repeat,
|
|
out_of_order=out_of_order,
|
|
matched_patterns=matched,
|
|
text=text,
|
|
)
|