Renames the app/ingest/ package to app/glean/ and updates all references across Python modules, shell scripts, Vue components, tests, and documentation. Intentionally preserved: - SQLite column name ingest_time (avoids schema migration) - RetrievedEntry.ingest_time field (maps to the column above) - Any public-facing JSON keys that reference ingest_time Changes by category: - app/ingest/ → app/glean/ (full package move, all parsers) - app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py - scripts/ingest_corpus.py → scripts/glean_corpus.py - tests/test_ingest_*.py → tests/test_glean_*.py - Docstrings, log messages, comments: ingest → glean - Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL - Shell scripts: glean.log, glean_corpus.py references - README.md: multi-source ingest → multi-source glean - .env.example: updated env var name - patterns/: new diagnostic patterns from 2026-05-20 SSH incident (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict) - SourcesView.vue: pipeline label updated - All test import paths updated to app.glean.* 285 tests passing.
103 lines
3.2 KiB
Python
103 lines
3.2 KiB
Python
"""Plex Media Server log parser.
|
|
|
|
Handles the standard Plex log format:
|
|
Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message text here
|
|
|
|
Severity is read directly from the log level field. The EAE crash signature
|
|
(plex_eae_failure pattern) is matched by the shared pattern library.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Iterator
|
|
|
|
from app.glean.base import (
|
|
SourceState, apply_patterns, make_entry_id, now_iso,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
|
|
# Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message
|
|
_LINE_RE = re.compile(
|
|
r"^(?P<month>\w{3})\s+(?P<day>\d{1,2}),\s+(?P<year>\d{4})"
|
|
r"\s+(?P<time>\d{2}:\d{2}:\d{2}\.\d+)"
|
|
r"\s+\[(?P<pid>\d+)\]"
|
|
r"\s+(?P<level>[A-Z]+)"
|
|
r"\s+-\s+(?P<msg>.*)$"
|
|
)
|
|
|
|
_LEVEL_MAP = {
|
|
"DEBUG": "DEBUG",
|
|
"INFO": "INFO",
|
|
"WARN": "WARN",
|
|
"WARNING": "WARN",
|
|
"ERROR": "ERROR",
|
|
"FATAL": "CRITICAL",
|
|
}
|
|
|
|
|
|
def _parse_ts(month: str, day: str, year: str, time: str) -> tuple[str, str]:
|
|
raw = f"{month} {day}, {year} {time}"
|
|
try:
|
|
# Plex logs use local time; convert to UTC for consistent DB storage
|
|
dt = datetime.strptime(raw, "%b %d, %Y %H:%M:%S.%f").astimezone(timezone.utc)
|
|
return raw, dt.isoformat()
|
|
except ValueError:
|
|
return raw, ""
|
|
|
|
|
|
def is_plex_log(first_line: str) -> bool:
|
|
return bool(_LINE_RE.match(first_line.strip()))
|
|
|
|
|
|
def parse(
|
|
lines: Iterator[str],
|
|
source_id: str,
|
|
compiled_patterns: list[tuple[LogPattern, object]],
|
|
ingest_time: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
ingest_time = ingest_time or now_iso()
|
|
state = SourceState()
|
|
pending_text: str | None = None
|
|
pending_meta: dict = {}
|
|
|
|
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
|
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
|
matched = apply_patterns(text, compiled_patterns)
|
|
return RetrievedEntry(
|
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
|
source_id=source_id,
|
|
sequence=state.sequence,
|
|
timestamp_raw=meta.get("ts_raw", ""),
|
|
timestamp_iso=meta.get("ts_iso", ""),
|
|
ingest_time=ingest_time,
|
|
severity=meta.get("severity"),
|
|
repeat_count=repeat,
|
|
out_of_order=out_of_order,
|
|
matched_patterns=matched,
|
|
text=text,
|
|
)
|
|
|
|
for raw_line in lines:
|
|
line = raw_line.rstrip("\n")
|
|
m = _LINE_RE.match(line)
|
|
if m:
|
|
# Flush any accumulated multi-line entry
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|
|
|
|
ts_raw, ts_iso = _parse_ts(
|
|
m.group("month"), m.group("day"), m.group("year"), m.group("time")
|
|
)
|
|
pending_meta = {
|
|
"ts_raw": ts_raw,
|
|
"ts_iso": ts_iso,
|
|
"severity": _LEVEL_MAP.get(m.group("level").upper()),
|
|
}
|
|
pending_text = m.group("msg")
|
|
elif pending_text is not None:
|
|
# Continuation line (stack trace, wrapped message)
|
|
pending_text += "\n" + line.strip()
|
|
|
|
if pending_text is not None:
|
|
yield _emit(pending_text, pending_meta)
|