"""Tests for the Tautulli webhook gleaner.""" from __future__ import annotations import pytest from unittest.mock import patch from app.glean.tautulli import is_tautulli_payload, parse_webhook # --------------------------------------------------------------------------- # Sample payloads # --------------------------------------------------------------------------- _ERROR_PAYLOAD = { "action": "error", "timestamp": "1747195200", "user": "pyroballpcs", "player": "LG TV", "media_type": "episode", "title": "Episode 7", "grandparent_title": "Oshi no Ko", "quality": "1080p", "video_decision": "transcode", "audio_decision": "direct play", "error_message": "Lost connection to streaming client", "session_key": "abc123", } _BUFFER_PAYLOAD = { "action": "buffer", "timestamp": "1747195300", "user": "pyroballpcs", "player": "Roku", "media_type": "movie", "title": "Dune: Part Two", "grandparent_title": "", "quality": "4K", "video_decision": "direct play", "audio_decision": "direct play", "error_message": "", "session_key": "xyz789", } _PLAY_PAYLOAD = { "action": "play", "timestamp": "1747195400", "user": "alice", "player": "Chrome", "media_type": "episode", "title": "Pilot", "grandparent_title": "Severance", "quality": "1080p", "video_decision": "transcode", "audio_decision": "direct play", "error_message": "", "session_key": "def456", } _STOP_PAYLOAD = { "action": "stop", "timestamp": "1747195500", "user": "bob", "player": "Apple TV", "media_type": "movie", "title": "Arrival", "grandparent_title": "", "quality": "1080p", "video_decision": "direct play", "audio_decision": "direct play", "error_message": "", "session_key": "ghi789", } # --------------------------------------------------------------------------- # is_tautulli_payload # --------------------------------------------------------------------------- class TestIsPayload: def test_valid_payload_detected(self): assert is_tautulli_payload(_ERROR_PAYLOAD) def test_missing_action_rejected(self): payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "action"} assert not is_tautulli_payload(payload) def test_missing_session_key_rejected(self): payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "session_key"} assert not is_tautulli_payload(payload) def test_empty_dict_rejected(self): assert not is_tautulli_payload({}) def test_journald_like_rejected(self): assert not is_tautulli_payload({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"}) # --------------------------------------------------------------------------- # parse_webhook — severity mapping # --------------------------------------------------------------------------- class TestSeverity: def test_error_action_maps_to_critical(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.severity == "CRITICAL" def test_buffer_action_maps_to_warn(self): entry = parse_webhook(_BUFFER_PAYLOAD, []) assert entry.severity == "WARN" def test_play_action_maps_to_none(self): entry = parse_webhook(_PLAY_PAYLOAD, []) assert entry.severity is None def test_stop_action_maps_to_none(self): entry = parse_webhook(_STOP_PAYLOAD, []) assert entry.severity is None def test_unknown_action_maps_to_none(self): payload = {**_ERROR_PAYLOAD, "action": "watched"} entry = parse_webhook(payload, []) assert entry.severity is None # --------------------------------------------------------------------------- # parse_webhook — text formatting # --------------------------------------------------------------------------- class TestTextFormat: def test_error_text_starts_with_plex_error(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.text.startswith("[plex:error]") def test_error_text_includes_error_message(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert "Lost connection to streaming client" in entry.text def test_buffer_text_starts_with_plex_buffer(self): entry = parse_webhook(_BUFFER_PAYLOAD, []) assert entry.text.startswith("[plex:buffer]") assert "buffering" in entry.text def test_play_text_starts_with_plex_play(self): entry = parse_webhook(_PLAY_PAYLOAD, []) assert entry.text.startswith("[plex:play]") def test_stop_text_starts_with_plex_stop(self): entry = parse_webhook(_STOP_PAYLOAD, []) assert entry.text.startswith("[plex:stop]") def test_pause_action(self): payload = {**_STOP_PAYLOAD, "action": "pause", "timestamp": "1747195600", "session_key": "pause01"} entry = parse_webhook(payload, []) assert entry.text.startswith("[plex:pause]") def test_resume_action(self): payload = {**_STOP_PAYLOAD, "action": "resume", "timestamp": "1747195700", "session_key": "resume01"} entry = parse_webhook(payload, []) assert entry.text.startswith("[plex:resume]") def test_tv_episode_formats_show_and_title(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert '"Oshi no Ko — Episode 7"' in entry.text def test_movie_omits_grandparent(self): entry = parse_webhook(_BUFFER_PAYLOAD, []) # grandparent_title is empty — should be just the title, no em dash assert '"Dune: Part Two"' in entry.text assert " — " not in entry.text def test_play_includes_stream_info(self): entry = parse_webhook(_PLAY_PAYLOAD, []) assert "1080p" in entry.text assert "transcode" in entry.text # --------------------------------------------------------------------------- # parse_webhook — timestamp handling # --------------------------------------------------------------------------- class TestTimestamp: def test_valid_timestamp_parsed(self): entry = parse_webhook(_ERROR_PAYLOAD, []) # epoch 1747195200 is a valid UTC time; iso must be non-empty assert entry.timestamp_iso assert "T" in entry.timestamp_iso # ISO 8601 format def test_missing_timestamp_falls_back_to_now(self): payload = {**_ERROR_PAYLOAD, "timestamp": ""} entry = parse_webhook(payload, []) # Should still produce a valid ISO timestamp, not empty assert entry.timestamp_iso assert "T" in entry.timestamp_iso def test_zero_timestamp_falls_back_to_now(self): payload = {**_ERROR_PAYLOAD, "timestamp": "0"} entry = parse_webhook(payload, []) assert entry.timestamp_iso assert "T" in entry.timestamp_iso def test_missing_timestamp_key_falls_back(self): payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "timestamp"} entry = parse_webhook(payload, []) assert entry.timestamp_iso assert "T" in entry.timestamp_iso # --------------------------------------------------------------------------- # parse_webhook — entry_id uniqueness # --------------------------------------------------------------------------- class TestEntryId: def test_different_timestamps_produce_different_ids(self): payload_a = {**_ERROR_PAYLOAD, "timestamp": "1747195200"} payload_b = {**_ERROR_PAYLOAD, "timestamp": "1747195201"} entry_a = parse_webhook(payload_a, []) entry_b = parse_webhook(payload_b, []) assert entry_a.entry_id != entry_b.entry_id def test_same_payload_produces_same_id(self): entry_a = parse_webhook(_ERROR_PAYLOAD, []) entry_b = parse_webhook(_ERROR_PAYLOAD, []) assert entry_a.entry_id == entry_b.entry_id # --------------------------------------------------------------------------- # parse_webhook — fixed fields # --------------------------------------------------------------------------- class TestFixedFields: def test_source_id_is_tautulli(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.source_id == "tautulli" def test_repeat_count_is_one(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.repeat_count == 1 def test_out_of_order_is_false(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.out_of_order is False def test_entry_id_is_nonempty_string(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert isinstance(entry.entry_id, str) and len(entry.entry_id) > 0 # --------------------------------------------------------------------------- # Endpoint tests — auth, validation, happy path # --------------------------------------------------------------------------- class TestEndpoint: @pytest.fixture def client(self, tmp_path): from fastapi.testclient import TestClient from app.glean.pipeline import ensure_schema import app.rest as rest_module db = tmp_path / "test.db" ensure_schema(db) with patch.object(rest_module, "DB_PATH", db), \ patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \ patch.object(rest_module, "_compiled_patterns", []): with TestClient(rest_module.app, raise_server_exceptions=True) as c: yield c def test_missing_action_returns_400(self, client): resp = client.post( "/turnstone/api/glean/tautulli", json={"session_key": "x"}, ) assert resp.status_code == 400 def test_wrong_token_returns_403(self, tmp_path): from fastapi.testclient import TestClient from app.glean.pipeline import ensure_schema import app.rest as rest_module db = tmp_path / "test.db" ensure_schema(db) prefs_path = tmp_path / "prefs.json" import json as _json prefs_path.write_text(_json.dumps({"tautulli_token": "secret"})) with patch.object(rest_module, "DB_PATH", db), \ patch.object(rest_module, "PREFS_PATH", prefs_path), \ patch.object(rest_module, "_compiled_patterns", []): with TestClient(rest_module.app, raise_server_exceptions=True) as c: resp = c.post( "/turnstone/api/glean/tautulli", json=_ERROR_PAYLOAD, headers={"X-Tautulli-Token": "wrong"}, ) assert resp.status_code == 403 def test_valid_payload_returns_200(self, client): resp = client.post( "/turnstone/api/glean/tautulli", json=_ERROR_PAYLOAD, ) assert resp.status_code == 200 data = resp.json() assert data["stored"] == 1 assert "entry_id" in data