turnstone/app/ingest/plex.py
pyr0ball f8a2f8007b feat: plain-text and Plex log ingestors
- app/ingest/plex.py: Plex Media Server log parser
  Regex-based line parser for 'Mon DD, YYYY HH:MM:SS.mmm [pid] LEVEL - msg'
  format. Handles multi-line entries (stack traces). Detects plex_eae_failure
  and all other patterns via shared pattern library.
- app/ingest/plaintext.py: generic fallback parser for unrecognized formats
  Extracts timestamps (ISO 8601, syslog, common log) and severity via regex.
- pipeline.py: detect plex format via is_plex_log(); fall back to plaintext
  instead of skipping; process *.log files alongside *.jsonl; add ingest_file()
  for single-file ingestion.
- scripts/ingest_corpus.py: accept single file or directory as target
- manage.sh: ingest-plex command SSHes to Cass (or HOST arg), pulls
  Plex Media Server.log, and ingests it directly
2026-05-08 17:50:01 -07:00

103 lines
3.2 KiB
Python

"""Plex Media Server log parser.
Handles the standard Plex log format:
Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message text here
Severity is read directly from the log level field. The EAE crash signature
(plex_eae_failure pattern) is matched by the shared pattern library.
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
# Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message
_LINE_RE = re.compile(
r"^(?P<month>\w{3})\s+(?P<day>\d{1,2}),\s+(?P<year>\d{4})"
r"\s+(?P<time>\d{2}:\d{2}:\d{2}\.\d+)"
r"\s+\[(?P<pid>\d+)\]"
r"\s+(?P<level>[A-Z]+)"
r"\s+-\s+(?P<msg>.*)$"
)
_LEVEL_MAP = {
"DEBUG": "DEBUG",
"INFO": "INFO",
"WARN": "WARN",
"WARNING": "WARN",
"ERROR": "ERROR",
"FATAL": "CRITICAL",
}
def _parse_ts(month: str, day: str, year: str, time: str) -> tuple[str, str]:
raw = f"{month} {day}, {year} {time}"
try:
# Plex logs are local time — treat as UTC for now (no TZ in log)
dt = datetime.strptime(raw, "%b %d, %Y %H:%M:%S.%f").replace(tzinfo=timezone.utc)
return raw, dt.isoformat()
except ValueError:
return raw, ""
def is_plex_log(first_line: str) -> bool:
return bool(_LINE_RE.match(first_line.strip()))
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
pending_text: str | None = None
pending_meta: dict = {}
def _emit(text: str, meta: dict) -> RetrievedEntry:
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
matched = apply_patterns(text, compiled_patterns)
return RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, text),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=meta.get("ts_raw", ""),
timestamp_iso=meta.get("ts_iso", ""),
ingest_time=ingest_time,
severity=meta.get("severity"),
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)
for raw_line in lines:
line = raw_line.rstrip("\n")
m = _LINE_RE.match(line)
if m:
# Flush any accumulated multi-line entry
if pending_text is not None:
yield _emit(pending_text, pending_meta)
ts_raw, ts_iso = _parse_ts(
m.group("month"), m.group("day"), m.group("year"), m.group("time")
)
pending_meta = {
"ts_raw": ts_raw,
"ts_iso": ts_iso,
"severity": _LEVEL_MAP.get(m.group("level").upper()),
}
pending_text = m.group("msg")
elif pending_text is not None:
# Continuation line (stack trace, wrapped message)
pending_text += "\n" + line.strip()
if pending_text is not None:
yield _emit(pending_text, pending_meta)