feat: Tautulli webhook ingest endpoint — plex events -> log_entries

POST /turnstone/api/ingest/tautulli accepts Tautulli notification agent
payloads and stores them as log_entries under source 'tautulli'. Severity
maps error->CRITICAL, buffer->WARN, all others->None. Optional bearer token
auth via X-Tautulli-Token header + tautulli_token pref. FTS index rebuilt
as a background task after each write. 28 new tests, all passing.
This commit is contained in:
pyr0ball 2026-05-13 18:41:03 -07:00
parent 3501240231
commit 4fbac2554e
4 changed files with 453 additions and 2 deletions

99
app/ingest/tautulli.py Normal file
View file

@ -0,0 +1,99 @@
"""Tautulli webhook ingestor.
Parses a Tautulli notification agent JSON payload into a single RetrievedEntry.
Tautulli sends all template values as strings, so all fields are treated as str.
"""
from __future__ import annotations
from app.ingest.base import (
apply_patterns,
epoch_float_to_iso,
make_entry_id,
now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_ACTION_SEVERITY: dict[str, str | None] = {
"error": "CRITICAL",
"buffer": "WARN",
}
def _severity(action: str) -> str | None:
return _ACTION_SEVERITY.get(action.lower())
def _format_text(p: dict) -> str:
action = p.get("action", "").lower()
user = p.get("user") or "unknown"
player = p.get("player") or "unknown player"
grandparent = p.get("grandparent_title", "").strip()
title = p.get("title", "").strip()
media = f'"{grandparent}{title}"' if grandparent else f'"{title}"'
quality = p.get("quality", "")
video_dec = p.get("video_decision", "")
stream = f"{quality}, {video_dec}" if quality and video_dec else quality or video_dec
err = p.get("error_message", "").strip()
if action == "error":
base = f"[plex:error] {user} on {player}: {media}"
return f"{base}{err}" if err else base
if action == "buffer":
return f"[plex:buffer] {user} on {player}: {media} is buffering"
if action in ("play", "resume"):
parts = [f"[plex:{action}] {user} on {player}: {media}"]
if stream:
parts.append(f"({stream})")
return " ".join(parts)
if action == "stop":
return f"[plex:stop] {user} stopped {media} on {player}"
if action == "pause":
return f"[plex:pause] {user} paused {media} on {player}"
return f"[plex:{action}] {user}: {media} on {player}"
def is_tautulli_payload(payload: dict) -> bool:
"""Return True if the payload looks like a Tautulli webhook."""
return "action" in payload and "session_key" in payload
def parse_webhook(
payload: dict,
compiled_patterns: list[tuple[LogPattern, object]],
) -> RetrievedEntry:
"""Parse a Tautulli webhook payload into a single RetrievedEntry."""
source_id = "tautulli"
action = payload.get("action", "")
text = _format_text(payload)
raw_ts = payload.get("timestamp") or ""
try:
ts_float = float(raw_ts) if raw_ts else 0.0
except (ValueError, TypeError):
ts_float = 0.0
if ts_float:
timestamp_iso: str | None = epoch_float_to_iso(ts_float)
timestamp_raw: str | None = raw_ts
else:
timestamp_iso = now_iso()
timestamp_raw = None
ingest_time = now_iso()
severity = _severity(action)
matched = apply_patterns(text, compiled_patterns)
entry_id = make_entry_id(source_id, 0, str(raw_ts) + text)
return RetrievedEntry(
entry_id=entry_id,
source_id=source_id,
sequence=0,
timestamp_raw=timestamp_raw,
timestamp_iso=timestamp_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=1,
out_of_order=False,
matched_patterns=matched,
text=text,
)

View file

@ -10,19 +10,22 @@ import asyncio
import dataclasses import dataclasses
import json import json
import os import os
import sqlite3
import urllib.error import urllib.error
import urllib.request import urllib.request
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, FastAPI, HTTPException, Query, UploadFile from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, Query, Request, UploadFile
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema from app.ingest.pipeline import ensure_schema
from app.ingest.base import _compile, load_patterns
from app.ingest.tautulli import parse_webhook as _parse_tautulli
from app.services.incidents import ( from app.services.incidents import (
build_bundle, build_bundle,
create_incident, create_incident,
@ -40,6 +43,7 @@ from app.services.search import (
recent_source_errors as _source_errors, recent_source_errors as _source_errors,
stats_summary as _stats, stats_summary as _stats,
format_results, format_results,
build_fts_index,
) )
from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream
from app.watch.watcher import Watcher, load_watch_config from app.watch.watcher import Watcher, load_watch_config
@ -61,8 +65,9 @@ DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns"))
PATTERN_FILE = PATTERN_DIR / "default.yaml"
_watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml") _watcher = Watcher(DB_PATH, PATTERN_FILE)
@asynccontextmanager @asynccontextmanager
@ -459,6 +464,56 @@ def get_bundle_endpoint(bundle_id: str) -> dict:
return dataclasses.asdict(bundle) return dataclasses.asdict(bundle)
def _tautulli_write_entry(conn: sqlite3.Connection, entry) -> None:
conn.execute(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
(
entry.entry_id, entry.source_id, entry.sequence,
entry.timestamp_raw, entry.timestamp_iso, entry.ingest_time,
entry.severity, entry.repeat_count, int(entry.out_of_order),
json.dumps(list(entry.matched_patterns)), entry.text,
),
)
@router.post("/api/ingest/tautulli")
def ingest_tautulli(
payload: dict,
request: Request,
background_tasks: BackgroundTasks,
) -> dict:
"""Accept a Tautulli webhook POST and store the event as a log entry."""
prefs = _load_prefs()
token = prefs.get("tautulli_token", "")
if token:
header_token = request.headers.get("X-Tautulli-Token", "")
if header_token != token:
raise HTTPException(status_code=403, detail="Invalid Tautulli token")
if "action" not in payload:
raise HTTPException(status_code=400, detail="Missing required field: action")
compiled = _compile(load_patterns(PATTERN_FILE))
entry = _parse_tautulli(payload, compiled)
conn = sqlite3.connect(str(DB_PATH))
conn.execute("PRAGMA journal_mode=WAL")
try:
_tautulli_write_entry(conn, entry)
conn.commit()
finally:
conn.close()
background_tasks.add_task(build_fts_index, DB_PATH)
return {"stored": 1, "entry_id": entry.entry_id, "action": payload.get("action")}
app.include_router(router) app.include_router(router)
_ctx = APIRouter(prefix="/turnstone/api/context") _ctx = APIRouter(prefix="/turnstone/api/context")

63
docs/tautulli-setup.md Normal file
View file

@ -0,0 +1,63 @@
# Tautulli Webhook Setup
Tautulli is a Plex Media Server (PMS) monitoring application. This guide shows
how to configure Tautulli to send playback events to Turnstone.
## Triggers to enable
In your notification agent, enable these triggers:
- Playback Start
- Playback Stop
- Playback Pause
- Playback Resume
- Playback Error
- Playback Buffering
## JSON body template
Paste this into the **JSON Data** field of the Tautulli Custom Script / Webhook
notification agent:
```json
{
"action": "{action}",
"timestamp": "{timestamp}",
"user": "{user}",
"player": "{player}",
"media_type": "{media_type}",
"title": "{title}",
"grandparent_title": "{grandparent_title}",
"quality": "{quality}",
"video_decision": "{video_decision}",
"audio_decision": "{audio_decision}",
"error_message": "{error_message}",
"session_key": "{session_key}"
}
```
## Webhook URL
```
http://<turnstone-host>:8534/turnstone/api/ingest/tautulli
```
Replace `<turnstone-host>` with the hostname or IP of the machine running
Turnstone. If Turnstone is behind a reverse proxy, use the proxy URL instead.
## Optional token authentication
If you set `tautulli_token` in Turnstone settings, every webhook request must
include a matching header:
```
X-Tautulli-Token: <your-token>
```
Add this header in the Tautulli notification agent's **Headers** section.
Requests with a missing or wrong token are rejected with HTTP 403.
## Searching events
All events are stored under source `tautulli` and are immediately searchable
in Turnstone after each webhook is received.

View file

@ -0,0 +1,234 @@
"""Tests for the Tautulli webhook ingestor."""
from __future__ import annotations
import pytest
from app.ingest.tautulli import is_tautulli_payload, parse_webhook
# ---------------------------------------------------------------------------
# Sample payloads
# ---------------------------------------------------------------------------
_ERROR_PAYLOAD = {
"action": "error",
"timestamp": "1747195200",
"user": "pyroballpcs",
"player": "LG TV",
"media_type": "episode",
"title": "Episode 7",
"grandparent_title": "Oshi no Ko",
"quality": "1080p",
"video_decision": "transcode",
"audio_decision": "direct play",
"error_message": "Lost connection to streaming client",
"session_key": "abc123",
}
_BUFFER_PAYLOAD = {
"action": "buffer",
"timestamp": "1747195300",
"user": "pyroballpcs",
"player": "Roku",
"media_type": "movie",
"title": "Dune: Part Two",
"grandparent_title": "",
"quality": "4K",
"video_decision": "direct play",
"audio_decision": "direct play",
"error_message": "",
"session_key": "xyz789",
}
_PLAY_PAYLOAD = {
"action": "play",
"timestamp": "1747195400",
"user": "alice",
"player": "Chrome",
"media_type": "episode",
"title": "Pilot",
"grandparent_title": "Severance",
"quality": "1080p",
"video_decision": "transcode",
"audio_decision": "direct play",
"error_message": "",
"session_key": "def456",
}
_STOP_PAYLOAD = {
"action": "stop",
"timestamp": "1747195500",
"user": "bob",
"player": "Apple TV",
"media_type": "movie",
"title": "Arrival",
"grandparent_title": "",
"quality": "1080p",
"video_decision": "direct play",
"audio_decision": "direct play",
"error_message": "",
"session_key": "ghi789",
}
# ---------------------------------------------------------------------------
# is_tautulli_payload
# ---------------------------------------------------------------------------
class TestIsPayload:
def test_valid_payload_detected(self):
assert is_tautulli_payload(_ERROR_PAYLOAD)
def test_missing_action_rejected(self):
payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "action"}
assert not is_tautulli_payload(payload)
def test_missing_session_key_rejected(self):
payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "session_key"}
assert not is_tautulli_payload(payload)
def test_empty_dict_rejected(self):
assert not is_tautulli_payload({})
def test_journald_like_rejected(self):
assert not is_tautulli_payload({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"})
# ---------------------------------------------------------------------------
# parse_webhook — severity mapping
# ---------------------------------------------------------------------------
class TestSeverity:
def test_error_action_maps_to_critical(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.severity == "CRITICAL"
def test_buffer_action_maps_to_warn(self):
entry = parse_webhook(_BUFFER_PAYLOAD, [])
assert entry.severity == "WARN"
def test_play_action_maps_to_none(self):
entry = parse_webhook(_PLAY_PAYLOAD, [])
assert entry.severity is None
def test_stop_action_maps_to_none(self):
entry = parse_webhook(_STOP_PAYLOAD, [])
assert entry.severity is None
def test_unknown_action_maps_to_none(self):
payload = {**_ERROR_PAYLOAD, "action": "watched"}
entry = parse_webhook(payload, [])
assert entry.severity is None
# ---------------------------------------------------------------------------
# parse_webhook — text formatting
# ---------------------------------------------------------------------------
class TestTextFormat:
def test_error_text_starts_with_plex_error(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.text.startswith("[plex:error]")
def test_error_text_includes_error_message(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert "Lost connection to streaming client" in entry.text
def test_buffer_text_starts_with_plex_buffer(self):
entry = parse_webhook(_BUFFER_PAYLOAD, [])
assert entry.text.startswith("[plex:buffer]")
assert "buffering" in entry.text
def test_play_text_starts_with_plex_play(self):
entry = parse_webhook(_PLAY_PAYLOAD, [])
assert entry.text.startswith("[plex:play]")
def test_stop_text_starts_with_plex_stop(self):
entry = parse_webhook(_STOP_PAYLOAD, [])
assert entry.text.startswith("[plex:stop]")
def test_tv_episode_formats_show_and_title(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert '"Oshi no Ko — Episode 7"' in entry.text
def test_movie_omits_grandparent(self):
entry = parse_webhook(_BUFFER_PAYLOAD, [])
# grandparent_title is empty — should be just the title, no em dash
assert '"Dune: Part Two"' in entry.text
assert "" not in entry.text
def test_play_includes_stream_info(self):
entry = parse_webhook(_PLAY_PAYLOAD, [])
assert "1080p" in entry.text
assert "transcode" in entry.text
# ---------------------------------------------------------------------------
# parse_webhook — timestamp handling
# ---------------------------------------------------------------------------
class TestTimestamp:
def test_valid_timestamp_parsed(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
# epoch 1747195200 is a valid UTC time; iso must be non-empty
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso # ISO 8601 format
def test_missing_timestamp_falls_back_to_now(self):
payload = {**_ERROR_PAYLOAD, "timestamp": ""}
entry = parse_webhook(payload, [])
# Should still produce a valid ISO timestamp, not empty
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso
def test_zero_timestamp_falls_back_to_now(self):
payload = {**_ERROR_PAYLOAD, "timestamp": "0"}
entry = parse_webhook(payload, [])
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso
def test_missing_timestamp_key_falls_back(self):
payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "timestamp"}
entry = parse_webhook(payload, [])
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso
# ---------------------------------------------------------------------------
# parse_webhook — entry_id uniqueness
# ---------------------------------------------------------------------------
class TestEntryId:
def test_different_timestamps_produce_different_ids(self):
payload_a = {**_ERROR_PAYLOAD, "timestamp": "1747195200"}
payload_b = {**_ERROR_PAYLOAD, "timestamp": "1747195201"}
entry_a = parse_webhook(payload_a, [])
entry_b = parse_webhook(payload_b, [])
assert entry_a.entry_id != entry_b.entry_id
def test_same_payload_produces_same_id(self):
entry_a = parse_webhook(_ERROR_PAYLOAD, [])
entry_b = parse_webhook(_ERROR_PAYLOAD, [])
assert entry_a.entry_id == entry_b.entry_id
# ---------------------------------------------------------------------------
# parse_webhook — fixed fields
# ---------------------------------------------------------------------------
class TestFixedFields:
def test_source_id_is_tautulli(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.source_id == "tautulli"
def test_repeat_count_is_one(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.repeat_count == 1
def test_out_of_order_is_false(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.out_of_order is False
def test_entry_id_is_nonempty_string(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert isinstance(entry.entry_id, str) and len(entry.entry_id) > 0