- app/ingest/plex.py: Plex Media Server log parser Regex-based line parser for 'Mon DD, YYYY HH:MM:SS.mmm [pid] LEVEL - msg' format. Handles multi-line entries (stack traces). Detects plex_eae_failure and all other patterns via shared pattern library. - app/ingest/plaintext.py: generic fallback parser for unrecognized formats Extracts timestamps (ISO 8601, syslog, common log) and severity via regex. - pipeline.py: detect plex format via is_plex_log(); fall back to plaintext instead of skipping; process *.log files alongside *.jsonl; add ingest_file() for single-file ingestion. - scripts/ingest_corpus.py: accept single file or directory as target - manage.sh: ingest-plex command SSHes to Cass (or HOST arg), pulls Plex Media Server.log, and ingests it directly
103 lines
3.2 KiB
Python
103 lines
3.2 KiB
Python
"""Plex Media Server log parser.
|
|
|
|
Handles the standard Plex log format:
|
|
Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message text here
|
|
|
|
Severity is read directly from the log level field. The EAE crash signature
|
|
(plex_eae_failure pattern) is matched by the shared pattern library.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Iterator
|
|
|
|
from app.ingest.base import (
|
|
SourceState, apply_patterns, make_entry_id, now_iso,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
# Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message
|
|
_LINE_RE = re.compile(
|
|
r"^(?P<month>\w{3})\s+(?P<day>\d{1,2}),\s+(?P<year>\d{4})"
|
|
r"\s+(?P<time>\d{2}:\d{2}:\d{2}\.\d+)"
|
|
r"\s+\[(?P<pid>\d+)\]"
|
|
r"\s+(?P<level>[A-Z]+)"
|
|
r"\s+-\s+(?P<msg>.*)$"
|
|
)
|
|
|
|
_LEVEL_MAP = {
|
|
"DEBUG": "DEBUG",
|
|
"INFO": "INFO",
|
|
"WARN": "WARN",
|
|
"WARNING": "WARN",
|
|
"ERROR": "ERROR",
|
|
"FATAL": "CRITICAL",
|
|
}
|
|
|
|
|
|
def _parse_ts(month: str, day: str, year: str, time: str) -> tuple[str, str]:
|
|
raw = f"{month} {day}, {year} {time}"
|
|
try:
|
|
# Plex logs are local time — treat as UTC for now (no TZ in log)
|
|
dt = datetime.strptime(raw, "%b %d, %Y %H:%M:%S.%f").replace(tzinfo=timezone.utc)
|
|
return raw, dt.isoformat()
|
|
except ValueError:
|
|
return raw, ""
|
|
|
|
|
|
def is_plex_log(first_line: str) -> bool:
|
|
return bool(_LINE_RE.match(first_line.strip()))
|
|
|
|
|
|
def parse(
|
|
lines: Iterator[str],
|
|
source_id: str,
|
|
compiled_patterns: list[tuple[LogPattern, object]],
|
|
ingest_time: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
ingest_time = ingest_time or now_iso()
|
|
state = SourceState()
|
|
pending_text: str | None = None
|
|
pending_meta: dict = {}
|
|
|
|
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
|
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
|
matched = apply_patterns(text, compiled_patterns)
|
|
return RetrievedEntry(
|
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
|
source_id=source_id,
|
|
sequence=state.sequence,
|
|
timestamp_raw=meta.get("ts_raw", ""),
|
|
timestamp_iso=meta.get("ts_iso", ""),
|
|
ingest_time=ingest_time,
|
|
severity=meta.get("severity"),
|
|
repeat_count=repeat,
|
|
out_of_order=out_of_order,
|
|
matched_patterns=matched,
|
|
text=text,
|
|
)
|
|
|
|
for raw_line in lines:
|
|
line = raw_line.rstrip("\n")
|
|
m = _LINE_RE.match(line)
|
|
if m:
|
|
# Flush any accumulated multi-line entry
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|
|
|
|
ts_raw, ts_iso = _parse_ts(
|
|
m.group("month"), m.group("day"), m.group("year"), m.group("time")
|
|
)
|
|
pending_meta = {
|
|
"ts_raw": ts_raw,
|
|
"ts_iso": ts_iso,
|
|
"severity": _LEVEL_MAP.get(m.group("level").upper()),
|
|
}
|
|
pending_text = m.group("msg")
|
|
elif pending_text is not None:
|
|
# Continuation line (stack trace, wrapped message)
|
|
pending_text += "\n" + line.strip()
|
|
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|