- Add servarr.py parser for all *arr services (sonarr/radarr/lidarr/ prowlarr/readarr/whisparr/bazarr) — pipe-delimited format with component prefix prepended for searchability - Add ingest_sources() to pipeline.py; reads sources.yaml, skips missing paths with a warning so cron keeps running if a service is down - Add --sources mode to ingest_corpus.py CLI; legacy positional args unchanged for backward compat - Add patterns/sources.yaml with all of Contributor2's discovered service log paths (qbit, 7 servarr services, nzbget, tautulli, jellyseerr) - Replace per-service volume mounts in podman-standalone.sh with /opt:/opt:ro + /var/log:/var/log:ro; adding a new source now requires only editing sources.yaml — no container restart
99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
"""Servarr (*arr) log parser.
|
|
|
|
Handles the pipe-delimited format used by Sonarr, Radarr, Lidarr,
|
|
Prowlarr, Readarr, Whisparr, and Bazarr:
|
|
|
|
2026-05-11 02:31:51.5|Info|ComponentName|Message text
|
|
2024-05-09 00:02:25|INFO |root |Message text
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Iterator
|
|
|
|
from app.ingest.base import (
|
|
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
_LINE_RE = re.compile(
|
|
r"^(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)"
|
|
r"\|(?P<level>[^|]+)"
|
|
r"\|(?P<component>[^|]*)"
|
|
r"\|(?P<msg>.*)$"
|
|
)
|
|
|
|
_LEVEL_MAP: dict[str, str | None] = {
|
|
"trace": None,
|
|
"debug": "DEBUG",
|
|
"info": "INFO",
|
|
"warn": "WARN",
|
|
"warning": "WARN",
|
|
"error": "ERROR",
|
|
"fatal": "CRITICAL",
|
|
}
|
|
|
|
|
|
def _parse_ts(ts_str: str) -> tuple[str, str]:
|
|
base = ts_str.split(".")[0]
|
|
try:
|
|
dt = datetime.strptime(base, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
|
return ts_str, dt.isoformat()
|
|
except ValueError:
|
|
return ts_str, ""
|
|
|
|
|
|
def is_servarr_log(first_line: str) -> bool:
|
|
return bool(_LINE_RE.match(first_line.strip()))
|
|
|
|
|
|
def parse(
|
|
lines: Iterator[str],
|
|
source_id: str,
|
|
compiled_patterns: list[tuple[LogPattern, object]],
|
|
ingest_time: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
ingest_time = ingest_time or now_iso()
|
|
state = SourceState()
|
|
pending_text: str | None = None
|
|
pending_meta: dict = {}
|
|
|
|
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
|
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
|
matched = apply_patterns(text, compiled_patterns)
|
|
return RetrievedEntry(
|
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
|
source_id=source_id,
|
|
sequence=state.sequence,
|
|
timestamp_raw=meta.get("ts_raw", ""),
|
|
timestamp_iso=meta.get("ts_iso", ""),
|
|
ingest_time=ingest_time,
|
|
severity=meta.get("severity"),
|
|
repeat_count=repeat,
|
|
out_of_order=out_of_order,
|
|
matched_patterns=matched,
|
|
text=text,
|
|
)
|
|
|
|
for raw_line in lines:
|
|
line = raw_line.rstrip("\n")
|
|
m = _LINE_RE.match(line)
|
|
if m:
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|
|
|
|
ts_raw, ts_iso = _parse_ts(m.group("ts"))
|
|
level_key = m.group("level").strip().lower()
|
|
severity = _LEVEL_MAP.get(level_key, detect_severity(m.group("msg")))
|
|
component = m.group("component").strip()
|
|
msg = m.group("msg")
|
|
# Prepend component so it's searchable without needing a separate column
|
|
text = f"[{component}] {msg}" if component else msg
|
|
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
|
|
pending_text = text
|
|
elif pending_text is not None:
|
|
pending_text += "\n" + line.strip()
|
|
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|