From 24dd4bc568f949fee2a57fa2c8efa1197a1e0f00 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 13 May 2026 18:41:03 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Tautulli=20webhook=20ingest=20endpoint?= =?UTF-8?q?=20=E2=80=94=20plex=20events=20->=20log=5Fentries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /turnstone/api/ingest/tautulli accepts Tautulli notification agent payloads and stores them as log_entries under source 'tautulli'. Severity maps error->CRITICAL, buffer->WARN, all others->None. Optional bearer token auth via X-Tautulli-Token header + tautulli_token pref. FTS index rebuilt as a background task after each write. 28 new tests, all passing. --- app/ingest/tautulli.py | 99 ++++++++++++++ app/rest.py | 59 ++++++++- docs/tautulli-setup.md | 63 +++++++++ tests/test_ingest_tautulli.py | 234 ++++++++++++++++++++++++++++++++++ 4 files changed, 453 insertions(+), 2 deletions(-) create mode 100644 app/ingest/tautulli.py create mode 100644 docs/tautulli-setup.md create mode 100644 tests/test_ingest_tautulli.py diff --git a/app/ingest/tautulli.py b/app/ingest/tautulli.py new file mode 100644 index 0000000..19878bc --- /dev/null +++ b/app/ingest/tautulli.py @@ -0,0 +1,99 @@ +"""Tautulli webhook ingestor. + +Parses a Tautulli notification agent JSON payload into a single RetrievedEntry. +Tautulli sends all template values as strings, so all fields are treated as str. +""" +from __future__ import annotations + +from app.ingest.base import ( + apply_patterns, + epoch_float_to_iso, + make_entry_id, + now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + +_ACTION_SEVERITY: dict[str, str | None] = { + "error": "CRITICAL", + "buffer": "WARN", +} + + +def _severity(action: str) -> str | None: + return _ACTION_SEVERITY.get(action.lower()) + + +def _format_text(p: dict) -> str: + action = p.get("action", "").lower() + user = p.get("user") or "unknown" + player = p.get("player") or "unknown player" + grandparent = p.get("grandparent_title", "").strip() + title = p.get("title", "").strip() + media = f'"{grandparent} — {title}"' if grandparent else f'"{title}"' + quality = p.get("quality", "") + video_dec = p.get("video_decision", "") + stream = f"{quality}, {video_dec}" if quality and video_dec else quality or video_dec + err = p.get("error_message", "").strip() + + if action == "error": + base = f"[plex:error] {user} on {player}: {media}" + return f"{base} — {err}" if err else base + if action == "buffer": + return f"[plex:buffer] {user} on {player}: {media} is buffering" + if action in ("play", "resume"): + parts = [f"[plex:{action}] {user} on {player}: {media}"] + if stream: + parts.append(f"({stream})") + return " ".join(parts) + if action == "stop": + return f"[plex:stop] {user} stopped {media} on {player}" + if action == "pause": + return f"[plex:pause] {user} paused {media} on {player}" + return f"[plex:{action}] {user}: {media} on {player}" + + +def is_tautulli_payload(payload: dict) -> bool: + """Return True if the payload looks like a Tautulli webhook.""" + return "action" in payload and "session_key" in payload + + +def parse_webhook( + payload: dict, + compiled_patterns: list[tuple[LogPattern, object]], +) -> RetrievedEntry: + """Parse a Tautulli webhook payload into a single RetrievedEntry.""" + source_id = "tautulli" + action = payload.get("action", "") + text = _format_text(payload) + + raw_ts = payload.get("timestamp") or "" + try: + ts_float = float(raw_ts) if raw_ts else 0.0 + except (ValueError, TypeError): + ts_float = 0.0 + + if ts_float: + timestamp_iso: str | None = epoch_float_to_iso(ts_float) + timestamp_raw: str | None = raw_ts + else: + timestamp_iso = now_iso() + timestamp_raw = None + + ingest_time = now_iso() + severity = _severity(action) + matched = apply_patterns(text, compiled_patterns) + entry_id = make_entry_id(source_id, 0, str(raw_ts) + text) + + return RetrievedEntry( + entry_id=entry_id, + source_id=source_id, + sequence=0, + timestamp_raw=timestamp_raw, + timestamp_iso=timestamp_iso, + ingest_time=ingest_time, + severity=severity, + repeat_count=1, + out_of_order=False, + matched_patterns=matched, + text=text, + ) diff --git a/app/rest.py b/app/rest.py index 1355cf6..fa9e031 100644 --- a/app/rest.py +++ b/app/rest.py @@ -10,19 +10,22 @@ import asyncio import dataclasses import json import os +import sqlite3 import urllib.error import urllib.request from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated -from fastapi import APIRouter, FastAPI, HTTPException, Query, UploadFile +from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, Query, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from app.ingest.pipeline import ensure_schema +from app.ingest.base import _compile, load_patterns +from app.ingest.tautulli import parse_webhook as _parse_tautulli from app.services.incidents import ( build_bundle, create_incident, @@ -40,6 +43,7 @@ from app.services.search import ( recent_source_errors as _source_errors, stats_summary as _stats, format_results, + build_fts_index, ) from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream from app.watch.watcher import Watcher, load_watch_config @@ -61,8 +65,9 @@ DIST_DIR = Path(__file__).parent.parent / "web" / "dist" SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) +PATTERN_FILE = PATTERN_DIR / "default.yaml" -_watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml") +_watcher = Watcher(DB_PATH, PATTERN_FILE) @asynccontextmanager @@ -459,6 +464,56 @@ def get_bundle_endpoint(bundle_id: str) -> dict: return dataclasses.asdict(bundle) +def _tautulli_write_entry(conn: sqlite3.Connection, entry) -> None: + conn.execute( + """ + INSERT OR IGNORE INTO log_entries + (id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text) + VALUES (?,?,?,?,?,?,?,?,?,?,?) + """, + ( + entry.entry_id, entry.source_id, entry.sequence, + entry.timestamp_raw, entry.timestamp_iso, entry.ingest_time, + entry.severity, entry.repeat_count, int(entry.out_of_order), + json.dumps(list(entry.matched_patterns)), entry.text, + ), + ) + + +@router.post("/api/ingest/tautulli") +def ingest_tautulli( + payload: dict, + request: Request, + background_tasks: BackgroundTasks, +) -> dict: + """Accept a Tautulli webhook POST and store the event as a log entry.""" + prefs = _load_prefs() + token = prefs.get("tautulli_token", "") + if token: + header_token = request.headers.get("X-Tautulli-Token", "") + if header_token != token: + raise HTTPException(status_code=403, detail="Invalid Tautulli token") + + if "action" not in payload: + raise HTTPException(status_code=400, detail="Missing required field: action") + + compiled = _compile(load_patterns(PATTERN_FILE)) + entry = _parse_tautulli(payload, compiled) + + conn = sqlite3.connect(str(DB_PATH)) + conn.execute("PRAGMA journal_mode=WAL") + try: + _tautulli_write_entry(conn, entry) + conn.commit() + finally: + conn.close() + + background_tasks.add_task(build_fts_index, DB_PATH) + return {"stored": 1, "entry_id": entry.entry_id, "action": payload.get("action")} + + app.include_router(router) _ctx = APIRouter(prefix="/turnstone/api/context") diff --git a/docs/tautulli-setup.md b/docs/tautulli-setup.md new file mode 100644 index 0000000..dce1da9 --- /dev/null +++ b/docs/tautulli-setup.md @@ -0,0 +1,63 @@ +# Tautulli Webhook Setup + +Tautulli is a Plex Media Server (PMS) monitoring application. This guide shows +how to configure Tautulli to send playback events to Turnstone. + +## Triggers to enable + +In your notification agent, enable these triggers: + +- Playback Start +- Playback Stop +- Playback Pause +- Playback Resume +- Playback Error +- Playback Buffering + +## JSON body template + +Paste this into the **JSON Data** field of the Tautulli Custom Script / Webhook +notification agent: + +```json +{ + "action": "{action}", + "timestamp": "{timestamp}", + "user": "{user}", + "player": "{player}", + "media_type": "{media_type}", + "title": "{title}", + "grandparent_title": "{grandparent_title}", + "quality": "{quality}", + "video_decision": "{video_decision}", + "audio_decision": "{audio_decision}", + "error_message": "{error_message}", + "session_key": "{session_key}" +} +``` + +## Webhook URL + +``` +http://:8534/turnstone/api/ingest/tautulli +``` + +Replace `` with the hostname or IP of the machine running +Turnstone. If Turnstone is behind a reverse proxy, use the proxy URL instead. + +## Optional token authentication + +If you set `tautulli_token` in Turnstone settings, every webhook request must +include a matching header: + +``` +X-Tautulli-Token: +``` + +Add this header in the Tautulli notification agent's **Headers** section. +Requests with a missing or wrong token are rejected with HTTP 403. + +## Searching events + +All events are stored under source `tautulli` and are immediately searchable +in Turnstone after each webhook is received. diff --git a/tests/test_ingest_tautulli.py b/tests/test_ingest_tautulli.py new file mode 100644 index 0000000..28f5d3d --- /dev/null +++ b/tests/test_ingest_tautulli.py @@ -0,0 +1,234 @@ +"""Tests for the Tautulli webhook ingestor.""" +from __future__ import annotations + +import pytest + +from app.ingest.tautulli import is_tautulli_payload, parse_webhook + + +# --------------------------------------------------------------------------- +# Sample payloads +# --------------------------------------------------------------------------- + +_ERROR_PAYLOAD = { + "action": "error", + "timestamp": "1747195200", + "user": "pyroballpcs", + "player": "LG TV", + "media_type": "episode", + "title": "Episode 7", + "grandparent_title": "Oshi no Ko", + "quality": "1080p", + "video_decision": "transcode", + "audio_decision": "direct play", + "error_message": "Lost connection to streaming client", + "session_key": "abc123", +} + +_BUFFER_PAYLOAD = { + "action": "buffer", + "timestamp": "1747195300", + "user": "pyroballpcs", + "player": "Roku", + "media_type": "movie", + "title": "Dune: Part Two", + "grandparent_title": "", + "quality": "4K", + "video_decision": "direct play", + "audio_decision": "direct play", + "error_message": "", + "session_key": "xyz789", +} + +_PLAY_PAYLOAD = { + "action": "play", + "timestamp": "1747195400", + "user": "alice", + "player": "Chrome", + "media_type": "episode", + "title": "Pilot", + "grandparent_title": "Severance", + "quality": "1080p", + "video_decision": "transcode", + "audio_decision": "direct play", + "error_message": "", + "session_key": "def456", +} + +_STOP_PAYLOAD = { + "action": "stop", + "timestamp": "1747195500", + "user": "bob", + "player": "Apple TV", + "media_type": "movie", + "title": "Arrival", + "grandparent_title": "", + "quality": "1080p", + "video_decision": "direct play", + "audio_decision": "direct play", + "error_message": "", + "session_key": "ghi789", +} + + +# --------------------------------------------------------------------------- +# is_tautulli_payload +# --------------------------------------------------------------------------- + +class TestIsPayload: + def test_valid_payload_detected(self): + assert is_tautulli_payload(_ERROR_PAYLOAD) + + def test_missing_action_rejected(self): + payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "action"} + assert not is_tautulli_payload(payload) + + def test_missing_session_key_rejected(self): + payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "session_key"} + assert not is_tautulli_payload(payload) + + def test_empty_dict_rejected(self): + assert not is_tautulli_payload({}) + + def test_journald_like_rejected(self): + assert not is_tautulli_payload({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"}) + + +# --------------------------------------------------------------------------- +# parse_webhook — severity mapping +# --------------------------------------------------------------------------- + +class TestSeverity: + def test_error_action_maps_to_critical(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert entry.severity == "CRITICAL" + + def test_buffer_action_maps_to_warn(self): + entry = parse_webhook(_BUFFER_PAYLOAD, []) + assert entry.severity == "WARN" + + def test_play_action_maps_to_none(self): + entry = parse_webhook(_PLAY_PAYLOAD, []) + assert entry.severity is None + + def test_stop_action_maps_to_none(self): + entry = parse_webhook(_STOP_PAYLOAD, []) + assert entry.severity is None + + def test_unknown_action_maps_to_none(self): + payload = {**_ERROR_PAYLOAD, "action": "watched"} + entry = parse_webhook(payload, []) + assert entry.severity is None + + +# --------------------------------------------------------------------------- +# parse_webhook — text formatting +# --------------------------------------------------------------------------- + +class TestTextFormat: + def test_error_text_starts_with_plex_error(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert entry.text.startswith("[plex:error]") + + def test_error_text_includes_error_message(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert "Lost connection to streaming client" in entry.text + + def test_buffer_text_starts_with_plex_buffer(self): + entry = parse_webhook(_BUFFER_PAYLOAD, []) + assert entry.text.startswith("[plex:buffer]") + assert "buffering" in entry.text + + def test_play_text_starts_with_plex_play(self): + entry = parse_webhook(_PLAY_PAYLOAD, []) + assert entry.text.startswith("[plex:play]") + + def test_stop_text_starts_with_plex_stop(self): + entry = parse_webhook(_STOP_PAYLOAD, []) + assert entry.text.startswith("[plex:stop]") + + def test_tv_episode_formats_show_and_title(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert '"Oshi no Ko — Episode 7"' in entry.text + + def test_movie_omits_grandparent(self): + entry = parse_webhook(_BUFFER_PAYLOAD, []) + # grandparent_title is empty — should be just the title, no em dash + assert '"Dune: Part Two"' in entry.text + assert " — " not in entry.text + + def test_play_includes_stream_info(self): + entry = parse_webhook(_PLAY_PAYLOAD, []) + assert "1080p" in entry.text + assert "transcode" in entry.text + + +# --------------------------------------------------------------------------- +# parse_webhook — timestamp handling +# --------------------------------------------------------------------------- + +class TestTimestamp: + def test_valid_timestamp_parsed(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + # epoch 1747195200 is a valid UTC time; iso must be non-empty + assert entry.timestamp_iso + assert "T" in entry.timestamp_iso # ISO 8601 format + + def test_missing_timestamp_falls_back_to_now(self): + payload = {**_ERROR_PAYLOAD, "timestamp": ""} + entry = parse_webhook(payload, []) + # Should still produce a valid ISO timestamp, not empty + assert entry.timestamp_iso + assert "T" in entry.timestamp_iso + + def test_zero_timestamp_falls_back_to_now(self): + payload = {**_ERROR_PAYLOAD, "timestamp": "0"} + entry = parse_webhook(payload, []) + assert entry.timestamp_iso + assert "T" in entry.timestamp_iso + + def test_missing_timestamp_key_falls_back(self): + payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "timestamp"} + entry = parse_webhook(payload, []) + assert entry.timestamp_iso + assert "T" in entry.timestamp_iso + + +# --------------------------------------------------------------------------- +# parse_webhook — entry_id uniqueness +# --------------------------------------------------------------------------- + +class TestEntryId: + def test_different_timestamps_produce_different_ids(self): + payload_a = {**_ERROR_PAYLOAD, "timestamp": "1747195200"} + payload_b = {**_ERROR_PAYLOAD, "timestamp": "1747195201"} + entry_a = parse_webhook(payload_a, []) + entry_b = parse_webhook(payload_b, []) + assert entry_a.entry_id != entry_b.entry_id + + def test_same_payload_produces_same_id(self): + entry_a = parse_webhook(_ERROR_PAYLOAD, []) + entry_b = parse_webhook(_ERROR_PAYLOAD, []) + assert entry_a.entry_id == entry_b.entry_id + + +# --------------------------------------------------------------------------- +# parse_webhook — fixed fields +# --------------------------------------------------------------------------- + +class TestFixedFields: + def test_source_id_is_tautulli(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert entry.source_id == "tautulli" + + def test_repeat_count_is_one(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert entry.repeat_count == 1 + + def test_out_of_order_is_false(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert entry.out_of_order is False + + def test_entry_id_is_nonempty_string(self): + entry = parse_webhook(_ERROR_PAYLOAD, []) + assert isinstance(entry.entry_id, str) and len(entry.entry_id) > 0