turnstone/app/glean/tautulli.py
pyr0ball 12cd0a23d5 refactor: rename ingest → glean throughout codebase
Renames the app/ingest/ package to app/glean/ and updates all
references across Python modules, shell scripts, Vue components,
tests, and documentation.

Intentionally preserved:
- SQLite column name ingest_time (avoids schema migration)
- RetrievedEntry.ingest_time field (maps to the column above)
- Any public-facing JSON keys that reference ingest_time

Changes by category:
- app/ingest/ → app/glean/ (full package move, all parsers)
- app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py
- scripts/ingest_corpus.py → scripts/glean_corpus.py
- tests/test_ingest_*.py → tests/test_glean_*.py
- Docstrings, log messages, comments: ingest → glean
- Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL
- Shell scripts: glean.log, glean_corpus.py references
- README.md: multi-source ingest → multi-source glean
- .env.example: updated env var name
- patterns/: new diagnostic patterns from 2026-05-20 SSH incident
  (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict)
- SourcesView.vue: pipeline label updated
- All test import paths updated to app.glean.*

285 tests passing.
2026-05-20 23:02:55 -07:00

100 lines
3.1 KiB
Python

"""Tautulli webhook ingestor.
Parses a Tautulli notification agent JSON payload into a single RetrievedEntry.
Tautulli sends all template values as strings, so all fields are treated as str.
"""
from __future__ import annotations
from app.glean.base import (
apply_patterns,
epoch_float_to_iso,
make_entry_id,
now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_ACTION_SEVERITY: dict[str, str | None] = {
"error": "CRITICAL",
"buffer": "WARN",
}
def _severity(action: str) -> str | None:
return _ACTION_SEVERITY.get(action.lower())
def _format_text(p: dict) -> str:
action = p.get("action", "").lower()
user = p.get("user") or "unknown"
player = p.get("player") or "unknown player"
grandparent = p.get("grandparent_title", "").strip()
title = p.get("title", "").strip()
media = f'"{grandparent}{title}"' if grandparent else f'"{title}"'
quality = p.get("quality", "")
video_dec = p.get("video_decision", "")
stream = f"{quality}, {video_dec}" if quality and video_dec else quality or video_dec
err = p.get("error_message", "").strip()
if action == "error":
base = f"[plex:error] {user} on {player}: {media}"
return f"{base}{err}" if err else base
if action == "buffer":
return f"[plex:buffer] {user} on {player}: {media} is buffering"
if action in ("play", "resume"):
parts = [f"[plex:{action}] {user} on {player}: {media}"]
if stream:
parts.append(f"({stream})")
return " ".join(parts)
if action == "stop":
return f"[plex:stop] {user} stopped {media} on {player}"
if action == "pause":
return f"[plex:pause] {user} paused {media} on {player}"
return f"[plex:{action}] {user}: {media} on {player}"
def is_tautulli_payload(payload: dict) -> bool:
"""Return True if the payload looks like a Tautulli webhook."""
return "action" in payload and "session_key" in payload
def parse_webhook(
payload: dict,
compiled_patterns: list[tuple[LogPattern, object]],
) -> RetrievedEntry:
"""Parse a Tautulli webhook payload into a single RetrievedEntry."""
source_id = "tautulli"
action = payload.get("action", "")
text = _format_text(payload)
raw_ts = payload.get("timestamp") or ""
try:
ts_float = float(raw_ts) if raw_ts else 0.0
except (ValueError, TypeError):
ts_float = 0.0
if ts_float:
timestamp_iso: str | None = epoch_float_to_iso(ts_float)
timestamp_raw: str | None = raw_ts
else:
timestamp_iso = now_iso()
timestamp_raw = None
ingest_time = now_iso()
severity = _severity(action)
matched = apply_patterns(text, compiled_patterns)
id_ts = str(raw_ts) if raw_ts else ingest_time
entry_id = make_entry_id(source_id, 0, id_ts + text)
return RetrievedEntry(
entry_id=entry_id,
source_id=source_id,
sequence=0,
timestamp_raw=timestamp_raw,
timestamp_iso=timestamp_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=1,
out_of_order=False,
matched_patterns=matched,
text=text,
)