"""Tests for the Tautulli webhook ingestor.""" from __future__ import annotations import pytest from app.ingest.tautulli import is_tautulli_payload, parse_webhook # --------------------------------------------------------------------------- # Sample payloads # --------------------------------------------------------------------------- _ERROR_PAYLOAD = { "action": "error", "timestamp": "1747195200", "user": "pyroballpcs", "player": "LG TV", "media_type": "episode", "title": "Episode 7", "grandparent_title": "Oshi no Ko", "quality": "1080p", "video_decision": "transcode", "audio_decision": "direct play", "error_message": "Lost connection to streaming client", "session_key": "abc123", } _BUFFER_PAYLOAD = { "action": "buffer", "timestamp": "1747195300", "user": "pyroballpcs", "player": "Roku", "media_type": "movie", "title": "Dune: Part Two", "grandparent_title": "", "quality": "4K", "video_decision": "direct play", "audio_decision": "direct play", "error_message": "", "session_key": "xyz789", } _PLAY_PAYLOAD = { "action": "play", "timestamp": "1747195400", "user": "alice", "player": "Chrome", "media_type": "episode", "title": "Pilot", "grandparent_title": "Severance", "quality": "1080p", "video_decision": "transcode", "audio_decision": "direct play", "error_message": "", "session_key": "def456", } _STOP_PAYLOAD = { "action": "stop", "timestamp": "1747195500", "user": "bob", "player": "Apple TV", "media_type": "movie", "title": "Arrival", "grandparent_title": "", "quality": "1080p", "video_decision": "direct play", "audio_decision": "direct play", "error_message": "", "session_key": "ghi789", } # --------------------------------------------------------------------------- # is_tautulli_payload # --------------------------------------------------------------------------- class TestIsPayload: def test_valid_payload_detected(self): assert is_tautulli_payload(_ERROR_PAYLOAD) def test_missing_action_rejected(self): payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "action"} assert not is_tautulli_payload(payload) def test_missing_session_key_rejected(self): payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "session_key"} assert not is_tautulli_payload(payload) def test_empty_dict_rejected(self): assert not is_tautulli_payload({}) def test_journald_like_rejected(self): assert not is_tautulli_payload({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"}) # --------------------------------------------------------------------------- # parse_webhook — severity mapping # --------------------------------------------------------------------------- class TestSeverity: def test_error_action_maps_to_critical(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.severity == "CRITICAL" def test_buffer_action_maps_to_warn(self): entry = parse_webhook(_BUFFER_PAYLOAD, []) assert entry.severity == "WARN" def test_play_action_maps_to_none(self): entry = parse_webhook(_PLAY_PAYLOAD, []) assert entry.severity is None def test_stop_action_maps_to_none(self): entry = parse_webhook(_STOP_PAYLOAD, []) assert entry.severity is None def test_unknown_action_maps_to_none(self): payload = {**_ERROR_PAYLOAD, "action": "watched"} entry = parse_webhook(payload, []) assert entry.severity is None # --------------------------------------------------------------------------- # parse_webhook — text formatting # --------------------------------------------------------------------------- class TestTextFormat: def test_error_text_starts_with_plex_error(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.text.startswith("[plex:error]") def test_error_text_includes_error_message(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert "Lost connection to streaming client" in entry.text def test_buffer_text_starts_with_plex_buffer(self): entry = parse_webhook(_BUFFER_PAYLOAD, []) assert entry.text.startswith("[plex:buffer]") assert "buffering" in entry.text def test_play_text_starts_with_plex_play(self): entry = parse_webhook(_PLAY_PAYLOAD, []) assert entry.text.startswith("[plex:play]") def test_stop_text_starts_with_plex_stop(self): entry = parse_webhook(_STOP_PAYLOAD, []) assert entry.text.startswith("[plex:stop]") def test_tv_episode_formats_show_and_title(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert '"Oshi no Ko — Episode 7"' in entry.text def test_movie_omits_grandparent(self): entry = parse_webhook(_BUFFER_PAYLOAD, []) # grandparent_title is empty — should be just the title, no em dash assert '"Dune: Part Two"' in entry.text assert " — " not in entry.text def test_play_includes_stream_info(self): entry = parse_webhook(_PLAY_PAYLOAD, []) assert "1080p" in entry.text assert "transcode" in entry.text # --------------------------------------------------------------------------- # parse_webhook — timestamp handling # --------------------------------------------------------------------------- class TestTimestamp: def test_valid_timestamp_parsed(self): entry = parse_webhook(_ERROR_PAYLOAD, []) # epoch 1747195200 is a valid UTC time; iso must be non-empty assert entry.timestamp_iso assert "T" in entry.timestamp_iso # ISO 8601 format def test_missing_timestamp_falls_back_to_now(self): payload = {**_ERROR_PAYLOAD, "timestamp": ""} entry = parse_webhook(payload, []) # Should still produce a valid ISO timestamp, not empty assert entry.timestamp_iso assert "T" in entry.timestamp_iso def test_zero_timestamp_falls_back_to_now(self): payload = {**_ERROR_PAYLOAD, "timestamp": "0"} entry = parse_webhook(payload, []) assert entry.timestamp_iso assert "T" in entry.timestamp_iso def test_missing_timestamp_key_falls_back(self): payload = {k: v for k, v in _ERROR_PAYLOAD.items() if k != "timestamp"} entry = parse_webhook(payload, []) assert entry.timestamp_iso assert "T" in entry.timestamp_iso # --------------------------------------------------------------------------- # parse_webhook — entry_id uniqueness # --------------------------------------------------------------------------- class TestEntryId: def test_different_timestamps_produce_different_ids(self): payload_a = {**_ERROR_PAYLOAD, "timestamp": "1747195200"} payload_b = {**_ERROR_PAYLOAD, "timestamp": "1747195201"} entry_a = parse_webhook(payload_a, []) entry_b = parse_webhook(payload_b, []) assert entry_a.entry_id != entry_b.entry_id def test_same_payload_produces_same_id(self): entry_a = parse_webhook(_ERROR_PAYLOAD, []) entry_b = parse_webhook(_ERROR_PAYLOAD, []) assert entry_a.entry_id == entry_b.entry_id # --------------------------------------------------------------------------- # parse_webhook — fixed fields # --------------------------------------------------------------------------- class TestFixedFields: def test_source_id_is_tautulli(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.source_id == "tautulli" def test_repeat_count_is_one(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.repeat_count == 1 def test_out_of_order_is_false(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert entry.out_of_order is False def test_entry_id_is_nonempty_string(self): entry = parse_webhook(_ERROR_PAYLOAD, []) assert isinstance(entry.entry_id, str) and len(entry.entry_id) > 0