turnstone/tests/test_ingest_tautulli.py
pyr0ball 24dd4bc568 feat: Tautulli webhook ingest endpoint — plex events -> log_entries
POST /turnstone/api/ingest/tautulli accepts Tautulli notification agent
payloads and stores them as log_entries under source 'tautulli'. Severity
maps error->CRITICAL, buffer->WARN, all others->None. Optional bearer token
auth via X-Tautulli-Token header + tautulli_token pref. FTS index rebuilt
as a background task after each write. 28 new tests, all passing.
2026-05-13 18:41:03 -07:00

234 lines
7.9 KiB
Python

"""Tests for the Tautulli webhook ingestor."""
from __future__ import annotations
import pytest
from app.ingest.tautulli import is_tautulli_payload, parse_webhook
# ---------------------------------------------------------------------------
# Sample payloads
# ---------------------------------------------------------------------------
_ERROR_PAYLOAD = {
"action": "error",
"timestamp": "1747195200",
"user": "pyroballpcs",
"player": "LG TV",
"media_type": "episode",
"title": "Episode 7",
"grandparent_title": "Oshi no Ko",
"quality": "1080p",
"video_decision": "transcode",
"audio_decision": "direct play",
"error_message": "Lost connection to streaming client",
"session_key": "abc123",
}
_BUFFER_PAYLOAD = {
"action": "buffer",
"timestamp": "1747195300",
"user": "pyroballpcs",
"player": "Roku",
"media_type": "movie",
"title": "Dune: Part Two",
"grandparent_title": "",
"quality": "4K",
"video_decision": "direct play",
"audio_decision": "direct play",
"error_message": "",
"session_key": "xyz789",
}
_PLAY_PAYLOAD = {
"action": "play",
"timestamp": "1747195400",
"user": "alice",
"player": "Chrome",
"media_type": "episode",
"title": "Pilot",
"grandparent_title": "Severance",
"quality": "1080p",
"video_decision": "transcode",
"audio_decision": "direct play",
"error_message": "",
"session_key": "def456",
}
_STOP_PAYLOAD = {
"action": "stop",
"timestamp": "1747195500",
"user": "bob",
"player": "Apple TV",
"media_type": "movie",
"title": "Arrival",
"grandparent_title": "",
"quality": "1080p",
"video_decision": "direct play",
"audio_decision": "direct play",
"error_message": "",
"session_key": "ghi789",
}
# ---------------------------------------------------------------------------
# is_tautulli_payload
# ---------------------------------------------------------------------------
class TestIsPayload:
def test_valid_payload_detected(self):
assert is_tautulli_payload(_ERROR_PAYLOAD)
def test_missing_action_rejected(self):
payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "action"}
assert not is_tautulli_payload(payload)
def test_missing_session_key_rejected(self):
payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "session_key"}
assert not is_tautulli_payload(payload)
def test_empty_dict_rejected(self):
assert not is_tautulli_payload({})
def test_journald_like_rejected(self):
assert not is_tautulli_payload({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"})
# ---------------------------------------------------------------------------
# parse_webhook — severity mapping
# ---------------------------------------------------------------------------
class TestSeverity:
def test_error_action_maps_to_critical(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.severity == "CRITICAL"
def test_buffer_action_maps_to_warn(self):
entry = parse_webhook(_BUFFER_PAYLOAD, [])
assert entry.severity == "WARN"
def test_play_action_maps_to_none(self):
entry = parse_webhook(_PLAY_PAYLOAD, [])
assert entry.severity is None
def test_stop_action_maps_to_none(self):
entry = parse_webhook(_STOP_PAYLOAD, [])
assert entry.severity is None
def test_unknown_action_maps_to_none(self):
payload = {**_ERROR_PAYLOAD, "action": "watched"}
entry = parse_webhook(payload, [])
assert entry.severity is None
# ---------------------------------------------------------------------------
# parse_webhook — text formatting
# ---------------------------------------------------------------------------
class TestTextFormat:
def test_error_text_starts_with_plex_error(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.text.startswith("[plex:error]")
def test_error_text_includes_error_message(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert "Lost connection to streaming client" in entry.text
def test_buffer_text_starts_with_plex_buffer(self):
entry = parse_webhook(_BUFFER_PAYLOAD, [])
assert entry.text.startswith("[plex:buffer]")
assert "buffering" in entry.text
def test_play_text_starts_with_plex_play(self):
entry = parse_webhook(_PLAY_PAYLOAD, [])
assert entry.text.startswith("[plex:play]")
def test_stop_text_starts_with_plex_stop(self):
entry = parse_webhook(_STOP_PAYLOAD, [])
assert entry.text.startswith("[plex:stop]")
def test_tv_episode_formats_show_and_title(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert '"Oshi no Ko — Episode 7"' in entry.text
def test_movie_omits_grandparent(self):
entry = parse_webhook(_BUFFER_PAYLOAD, [])
# grandparent_title is empty — should be just the title, no em dash
assert '"Dune: Part Two"' in entry.text
assert "" not in entry.text
def test_play_includes_stream_info(self):
entry = parse_webhook(_PLAY_PAYLOAD, [])
assert "1080p" in entry.text
assert "transcode" in entry.text
# ---------------------------------------------------------------------------
# parse_webhook — timestamp handling
# ---------------------------------------------------------------------------
class TestTimestamp:
def test_valid_timestamp_parsed(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
# epoch 1747195200 is a valid UTC time; iso must be non-empty
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso # ISO 8601 format
def test_missing_timestamp_falls_back_to_now(self):
payload = {**_ERROR_PAYLOAD, "timestamp": ""}
entry = parse_webhook(payload, [])
# Should still produce a valid ISO timestamp, not empty
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso
def test_zero_timestamp_falls_back_to_now(self):
payload = {**_ERROR_PAYLOAD, "timestamp": "0"}
entry = parse_webhook(payload, [])
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso
def test_missing_timestamp_key_falls_back(self):
payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "timestamp"}
entry = parse_webhook(payload, [])
assert entry.timestamp_iso
assert "T" in entry.timestamp_iso
# ---------------------------------------------------------------------------
# parse_webhook — entry_id uniqueness
# ---------------------------------------------------------------------------
class TestEntryId:
def test_different_timestamps_produce_different_ids(self):
payload_a = {**_ERROR_PAYLOAD, "timestamp": "1747195200"}
payload_b = {**_ERROR_PAYLOAD, "timestamp": "1747195201"}
entry_a = parse_webhook(payload_a, [])
entry_b = parse_webhook(payload_b, [])
assert entry_a.entry_id != entry_b.entry_id
def test_same_payload_produces_same_id(self):
entry_a = parse_webhook(_ERROR_PAYLOAD, [])
entry_b = parse_webhook(_ERROR_PAYLOAD, [])
assert entry_a.entry_id == entry_b.entry_id
# ---------------------------------------------------------------------------
# parse_webhook — fixed fields
# ---------------------------------------------------------------------------
class TestFixedFields:
def test_source_id_is_tautulli(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.source_id == "tautulli"
def test_repeat_count_is_one(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.repeat_count == 1
def test_out_of_order_is_false(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert entry.out_of_order is False
def test_entry_id_is_nonempty_string(self):
entry = parse_webhook(_ERROR_PAYLOAD, [])
assert isinstance(entry.entry_id, str) and len(entry.entry_id) > 0