"""Servarr (*arr) log parser. Handles the pipe-delimited format used by Sonarr, Radarr, Lidarr, Prowlarr, Readarr, Whisparr, and Bazarr: 2026-05-11 02:31:51.5|Info|ComponentName|Message text 2024-05-09 00:02:25|INFO |root |Message text """ from __future__ import annotations import re from datetime import datetime, timezone from typing import Iterator from app.ingest.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry _LINE_RE = re.compile( r"^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)" r"\|(?P[^|]+)" r"\|(?P[^|]*)" r"\|(?P.*)$" ) _LEVEL_MAP: dict[str, str | None] = { "trace": None, "debug": "DEBUG", "info": "INFO", "warn": "WARN", "warning": "WARN", "error": "ERROR", "fatal": "CRITICAL", } def _parse_ts(ts_str: str) -> tuple[str, str]: base = ts_str.split(".")[0] try: dt = datetime.strptime(base, "%Y-%m-%d %H:%M:%S").astimezone(timezone.utc) return ts_str, dt.isoformat() except ValueError: return ts_str, "" def is_servarr_log(first_line: str) -> bool: return bool(_LINE_RE.match(first_line.strip())) def parse( lines: Iterator[str], source_id: str, compiled_patterns: list[tuple[LogPattern, object]], ingest_time: str | None = None, ) -> Iterator[RetrievedEntry]: ingest_time = ingest_time or now_iso() state = SourceState() pending_text: str | None = None pending_meta: dict = {} def _emit(text: str, meta: dict) -> RetrievedEntry: repeat, out_of_order = state.observe(text, meta.get("ts_iso")) matched = apply_patterns(text, compiled_patterns) return RetrievedEntry( entry_id=make_entry_id(source_id, state.sequence, text), source_id=source_id, sequence=state.sequence, timestamp_raw=meta.get("ts_raw", ""), timestamp_iso=meta.get("ts_iso", ""), ingest_time=ingest_time, severity=meta.get("severity"), repeat_count=repeat, out_of_order=out_of_order, matched_patterns=matched, text=text, ) for raw_line in lines: line = raw_line.rstrip("\n") m = _LINE_RE.match(line) if m: if pending_text is not None: yield _emit(pending_text, pending_meta) ts_raw, ts_iso = _parse_ts(m.group("ts")) level_key = m.group("level").strip().lower() severity = _LEVEL_MAP.get(level_key, detect_severity(m.group("msg"))) component = m.group("component").strip() msg = m.group("msg") # Prepend component so it's searchable without needing a separate column text = f"[{component}] {msg}" if component else msg pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity} pending_text = text elif pending_text is not None: pending_text += "\n" + line.strip() if pending_text is not None: yield _emit(pending_text, pending_meta)