turnstone/app/ingest/servarr.py
pyr0ball f64b834177 fix: ingestors treat naive log timestamps as local time, not UTC
All five parsers (plex, syslog, servarr, qbittorrent, plaintext) were
using .replace(tzinfo=timezone.utc) on naive datetimes parsed from log
files, which slaps a UTC label on what is actually local-time data.
On a UTC-7 system a 2pm entry was stored as 14:00Z instead of 21:00Z,
causing time-window searches to return zero results.

Fix: use .astimezone(timezone.utc) instead, which treats the naive
datetime as local time and converts correctly.

Tests updated to round-trip back to local time for assertion so they
pass on any timezone, not just UTC.
2026-05-13 18:16:33 -07:00

99 lines
3.1 KiB
Python

"""Servarr (*arr) log parser.
Handles the pipe-delimited format used by Sonarr, Radarr, Lidarr,
Prowlarr, Readarr, Whisparr, and Bazarr:
2026-05-11 02:31:51.5|Info|ComponentName|Message text
2024-05-09 00:02:25|INFO |root |Message text
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_LINE_RE = re.compile(
r"^(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)"
r"\|(?P<level>[^|]+)"
r"\|(?P<component>[^|]*)"
r"\|(?P<msg>.*)$"
)
_LEVEL_MAP: dict[str, str | None] = {
"trace": None,
"debug": "DEBUG",
"info": "INFO",
"warn": "WARN",
"warning": "WARN",
"error": "ERROR",
"fatal": "CRITICAL",
}
def _parse_ts(ts_str: str) -> tuple[str, str]:
base = ts_str.split(".")[0]
try:
dt = datetime.strptime(base, "%Y-%m-%d %H:%M:%S").astimezone(timezone.utc)
return ts_str, dt.isoformat()
except ValueError:
return ts_str, ""
def is_servarr_log(first_line: str) -> bool:
return bool(_LINE_RE.match(first_line.strip()))
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
pending_text: str | None = None
pending_meta: dict = {}
def _emit(text: str, meta: dict) -> RetrievedEntry:
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
matched = apply_patterns(text, compiled_patterns)
return RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, text),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=meta.get("ts_raw", ""),
timestamp_iso=meta.get("ts_iso", ""),
ingest_time=ingest_time,
severity=meta.get("severity"),
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)
for raw_line in lines:
line = raw_line.rstrip("\n")
m = _LINE_RE.match(line)
if m:
if pending_text is not None:
yield _emit(pending_text, pending_meta)
ts_raw, ts_iso = _parse_ts(m.group("ts"))
level_key = m.group("level").strip().lower()
severity = _LEVEL_MAP.get(level_key, detect_severity(m.group("msg")))
component = m.group("component").strip()
msg = m.group("msg")
# Prepend component so it's searchable without needing a separate column
text = f"[{component}] {msg}" if component else msg
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
pending_text = text
elif pending_text is not None:
pending_text += "\n" + line.strip()
if pending_text is not None:
yield _emit(pending_text, pending_meta)