"""Tests for app/services/diagnose/synthesizer.py — SummarySynthesizer. All tests use mocking; no real LLM calls are made. """ from __future__ import annotations from unittest.mock import MagicMock, patch from app.context.retriever import RetrievedContext from app.services.diagnose.models import EventCluster, Hypothesis, RankedHypothesis, TimelineResult from app.services.diagnose.synthesizer import SummarySynthesizer, _build_timeline_block # --------------------------------------------------------------------------- # Fixture helpers # --------------------------------------------------------------------------- def _make_hypothesis( hypothesis_id: str = "h1", title: str = "SSH flood from external IPs", description: str = "Repeated failed login attempts from multiple IPs.", confidence: float = 0.87, severity: str = "CRITICAL", ) -> Hypothesis: return Hypothesis( hypothesis_id=hypothesis_id, title=title, description=description, confidence=confidence, supporting_cluster_ids=("c1",), runbook_refs=(), severity=severity, # type: ignore[arg-type] ) def _make_ranked( hypothesis: Hypothesis | None = None, novelty_score: float = 0.95, similarity_to_known: float = 0.05, suppress: bool = False, suppression_reason: str | None = None, ) -> RankedHypothesis: h = hypothesis or _make_hypothesis() return RankedHypothesis( hypothesis=h, novelty_score=novelty_score, similarity_to_known=similarity_to_known, suppress=suppress, suppression_reason=suppression_reason, ) def _make_cluster( cluster_id: str = "c1", start_iso: str | None = "2026-01-01T00:05:00+00:00", severity: str = "ERROR", source_ids: tuple[str, ...] = ("syslog",), pattern_tags: tuple[str, ...] = ("ssh_auth_failure",), burst: bool = False, gap_before_seconds: float = 0.0, representative_text: str = "Failed password for root from 1.2.3.4 port 22", ) -> EventCluster: return EventCluster( cluster_id=cluster_id, entries=("e1",), start_iso=start_iso, end_iso=None, duration_seconds=30.0, source_ids=source_ids, pattern_tags=pattern_tags, severity=severity, # type: ignore[arg-type] burst=burst, gap_before_seconds=gap_before_seconds, representative_text=representative_text, ) def _make_timeline( total_entries: int = 42, n_clusters: int = 3, clusters: tuple[EventCluster, ...] | None = None, ) -> TimelineResult: return TimelineResult( clusters=clusters if clusters is not None else tuple(), total_entries=total_entries, window_start="2026-01-01T00:00:00+00:00", window_end="2026-01-01T01:00:00+00:00", gap_count=1, burst_count=2, dominant_sources=("syslog", "auth"), ) def _make_ctx(chunks: list[dict] | None = None) -> RetrievedContext: return RetrievedContext( facts=[{"category": "network", "key": "host", "value": "heimdall", "source": "facts"}], chunks=chunks or [{"filename": "runbook.md", "text": "Restart sshd if flooded"}], ) # --------------------------------------------------------------------------- # Test cases # --------------------------------------------------------------------------- class TestSynthesizerWithHypotheses: """With hypotheses, result must contain VERDICT.""" def test_returns_verdict_string_with_llm(self): synthesizer = SummarySynthesizer() ranked = [_make_ranked()] timeline = _make_timeline() ctx = _make_ctx() mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = { "choices": [{"message": {"content": "VERDICT: CRITICAL — SSH flood (87% confidence)\nTIMELINE: lots of hits."}}] } with patch("httpx.post", return_value=mock_resp): result = synthesizer.synthesize( ranked=ranked, timeline=timeline, ctx=ctx, query="ssh brute force", llm_url="http://localhost:11434", llm_model="llama3", ) assert "VERDICT" in result def test_returns_nonempty_string(self): synthesizer = SummarySynthesizer() ranked = [_make_ranked()] timeline = _make_timeline() ctx = _make_ctx() mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = { "choices": [{"message": {"content": "VERDICT: CRITICAL — SSH flood (87% confidence)"}}] } with patch("httpx.post", return_value=mock_resp): result = synthesizer.synthesize( ranked=ranked, timeline=timeline, ctx=ctx, query="why is auth failing", llm_url="http://localhost:11434", llm_model="llama3", ) assert isinstance(result, str) assert len(result) > 0 class TestSynthesizerSuppressedHypotheses: """Suppressed hypotheses must be excluded from the LLM prompt.""" def test_suppressed_hypotheses_excluded_from_prompt(self): suppressed = _make_ranked( hypothesis=_make_hypothesis( hypothesis_id="h2", title="Wazuh alert processing backlog", severity="ERROR", confidence=0.72, ), suppress=True, suppression_reason="similar to 2025-04 SSH incident", novelty_score=0.1, ) active = _make_ranked( hypothesis=_make_hypothesis( hypothesis_id="h1", title="SSH flood from external IPs", severity="CRITICAL", confidence=0.87, ), suppress=False, novelty_score=0.95, ) captured_messages: list = [] def fake_post(url, json=None, headers=None, timeout=None): if json and "payload" in json: captured_messages.extend(json["payload"].get("messages", [])) elif json and "messages" in json: captured_messages.extend(json.get("messages", [])) mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = { "choices": [{"message": {"content": "VERDICT: CRITICAL — SSH flood"}}] } return mock_resp synthesizer = SummarySynthesizer() with patch("httpx.post", side_effect=fake_post): synthesizer.synthesize( ranked=[active, suppressed], timeline=_make_timeline(), ctx=_make_ctx(), query="auth failures", llm_url="http://localhost:11434", llm_model="llama3", ) # The user message should contain the active hypothesis title # and NOT contain the suppressed one (or mark it suppressed) user_content = next( (m["content"] for m in captured_messages if m.get("role") == "user"), "" ) assert "SSH flood from external IPs" in user_content # Wazuh should not appear as a standalone top-level hypothesis # (suppressed items are excluded from the active list sent to the LLM) assert "Wazuh alert processing backlog" not in user_content class TestSynthesizerNoLLM: """No LLM configured: must return deterministic fallback (not empty).""" def test_no_llm_url_returns_fallback(self): synthesizer = SummarySynthesizer() ranked = [_make_ranked()] timeline = _make_timeline() ctx = _make_ctx() result = synthesizer.synthesize( ranked=ranked, timeline=timeline, ctx=ctx, query="disk errors", ) assert isinstance(result, str) assert len(result) > 0 assert "VERDICT" in result def test_no_llm_model_returns_fallback(self): synthesizer = SummarySynthesizer() ranked = [_make_ranked()] result = synthesizer.synthesize( ranked=ranked, timeline=_make_timeline(), ctx=_make_ctx(), query="oom killer", llm_url="http://localhost:11434", # llm_model omitted ) assert "VERDICT" in result assert "SSH flood from external IPs" in result def test_llm_failure_returns_fallback(self): synthesizer = SummarySynthesizer() ranked = [_make_ranked()] with patch("httpx.post", side_effect=ConnectionError("refused")): result = synthesizer.synthesize( ranked=ranked, timeline=_make_timeline(), ctx=_make_ctx(), query="why is disk full", llm_url="http://localhost:11434", llm_model="llama3", ) assert "VERDICT" in result assert len(result) > 0 class TestSynthesizerEmptyRanked: """Empty ranked list: must return deterministic fallback text, not raise.""" def test_empty_ranked_no_llm_returns_fallback(self): synthesizer = SummarySynthesizer() result = synthesizer.synthesize( ranked=[], timeline=_make_timeline(), ctx=_make_ctx(), query="check everything", ) assert isinstance(result, str) assert len(result) > 0 assert "VERDICT" in result def test_empty_ranked_with_llm_returns_fallback_or_llm_text(self): """Even with empty ranked, we attempt LLM and return something.""" synthesizer = SummarySynthesizer() mock_resp = MagicMock() mock_resp.status_code = 200 mock_resp.json.return_value = { "choices": [{"message": {"content": "VERDICT: UNKNOWN — no hypotheses generated"}}] } with patch("httpx.post", return_value=mock_resp): result = synthesizer.synthesize( ranked=[], timeline=_make_timeline(), ctx=_make_ctx(), query="nothing found", llm_url="http://localhost:11434", llm_model="llama3", ) assert isinstance(result, str) assert len(result) > 0 class TestBuildTimelineBlock: """Unit tests for _build_timeline_block helper.""" def test_empty_clusters_returns_placeholder(self): timeline = _make_timeline(clusters=tuple()) assert _build_timeline_block(timeline) == "(no clusters)" def test_single_cluster_basic_fields(self): cluster = _make_cluster( start_iso="2026-01-01T00:05:00+00:00", severity="ERROR", source_ids=("syslog",), representative_text="Failed password for root", ) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "Cluster 1" in block assert "2026-01-01T00:05:00+00:00" in block assert "[ERROR]" in block assert "syslog" in block assert "Failed password for root" in block def test_burst_label_applied(self): cluster = _make_cluster(burst=True) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "[BURST]" in block def test_no_burst_label_when_not_burst(self): cluster = _make_cluster(burst=False) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "[BURST]" not in block def test_gap_label_applied_when_over_threshold(self): cluster = _make_cluster(gap_before_seconds=120.0) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "silence" in block assert "120s" in block def test_gap_label_omitted_when_under_threshold(self): cluster = _make_cluster(gap_before_seconds=10.0) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "silence" not in block def test_pattern_tags_included(self): cluster = _make_cluster(pattern_tags=("ssh_auth_failure", "brute_force")) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "ssh_auth_failure" in block assert "brute_force" in block def test_no_patterns_section_when_empty(self): cluster = _make_cluster(pattern_tags=tuple()) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "[patterns:" not in block def test_multiple_clusters_numbered(self): c1 = _make_cluster(cluster_id="c1", representative_text="first event") c2 = _make_cluster(cluster_id="c2", representative_text="second event") timeline = _make_timeline(clusters=(c1, c2)) block = _build_timeline_block(timeline) assert "Cluster 1" in block assert "Cluster 2" in block assert "first event" in block assert "second event" in block def test_representative_text_truncated_at_200_chars(self): long_text = "x" * 300 cluster = _make_cluster(representative_text=long_text) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "x" * 200 in block assert "x" * 201 not in block def test_null_start_iso_renders_as_unknown(self): cluster = _make_cluster(start_iso=None) timeline = _make_timeline(clusters=(cluster,)) block = _build_timeline_block(timeline) assert "unknown" in block