From 34fb8f501dabb184a6160e16da847d2159898d30 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 13:49:18 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Stage=203=20=E2=80=94=20RootCauseHypoth?= =?UTF-8?q?esizer=20for=20multi-agent=20diagnose=20pipeline=20(issue=20#29?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add app/services/diagnose/hypothesizer.py with RootCauseHypothesizer class - Stage 3 of the multi-agent diagnose pipeline: accepts ClassifiedTimeline + RetrievedContext, builds a structured JSON prompt, calls the LLM via the same cf-orch task → OpenAI-compat fallback pattern used by llm.py - Parses JSON array response into list[Hypothesis] dataclasses with UUID ids, severity validation (WARNING→WARN, unknown→ERROR), confidence coercion - Gracefully returns [] when llm_url/llm_model absent or clusters empty - Add tests/test_diagnose_hypothesizer.py: 12 tests, all mocked, no LLM I/O covering: valid response, UUID generation, malformed JSON, non-list JSON, empty clusters, missing URL/model, max_hypotheses cap, severity mapping, confidence string coercion - 340 tests passing (328 prior + 12 new) Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/29 --- app/services/diagnose/hypothesizer.py | 208 ++++++++++++ tests/test_diagnose_hypothesizer.py | 451 ++++++++++++++++++++++++++ 2 files changed, 659 insertions(+) create mode 100644 app/services/diagnose/hypothesizer.py create mode 100644 tests/test_diagnose_hypothesizer.py diff --git a/app/services/diagnose/hypothesizer.py b/app/services/diagnose/hypothesizer.py new file mode 100644 index 0000000..d7d3261 --- /dev/null +++ b/app/services/diagnose/hypothesizer.py @@ -0,0 +1,208 @@ +"""Stage 3: Root-Cause Hypothesizer — LLM + RAG context.""" +from __future__ import annotations + +import json +import logging +from uuid import uuid4 + +import httpx + +from app.context.retriever import RetrievedContext +from app.services.diagnose.models import ( + ClassifiedTimeline, + EventCluster, + Hypothesis, + SeverityLabel, +) + +logger = logging.getLogger(__name__) + +_VALID_SEVERITIES: frozenset[str] = frozenset({"CRITICAL", "ERROR", "WARN", "INFO", "DEBUG"}) + +_SYSTEM_PROMPT = ( + "You are a Linux sysadmin log analyst. Analyze the following clustered log timeline " + "and generate 2-4 root cause hypotheses as a JSON array.\n\n" + "Each hypothesis must follow this exact JSON schema:\n" + '{"title": str (≤80 chars), "description": str (2-4 sentences), ' + '"confidence": float (0.0-1.0), "severity": str (one of: CRITICAL, ERROR, WARN, INFO), ' + '"supporting_clusters": [str list of cluster IDs]}\n\n' + "Return ONLY a valid JSON array. No prose, no markdown, no explanation outside the JSON." +) + + +def _validate_severity(s: str) -> SeverityLabel: + """Map a raw severity string to a valid SeverityLabel, defaulting to ERROR.""" + upper = s.upper() + if upper == "WARNING": + return "WARN" + return upper if upper in _VALID_SEVERITIES else "ERROR" # type: ignore[return-value] + + +def _cluster_summary(cluster: EventCluster, severity: str) -> str: + """Build a condensed single-line summary of a cluster for the prompt.""" + sources = ", ".join(list(cluster.source_ids)[:3]) + patterns = ", ".join(list(cluster.pattern_tags)[:5]) + text_preview = cluster.representative_text[:200] + summary = ( + f"[{severity}] {cluster.start_iso or 'unknown'} " + f"({sources}) — {text_preview}" + ) + if patterns: + summary += f" [patterns: {patterns}]" + return summary + + +def _extract_content(resp_json: dict) -> str | None: + """Pull text content from an OpenAI-compat chat completion response.""" + choices = resp_json.get("choices") or [] + if not choices: + return None + return (choices[0].get("message", {}).get("content") or "").strip() or None + + +class RootCauseHypothesizer: + """Generate ranked root-cause hypotheses from a classified log timeline.""" + + def __init__(self, max_hypotheses: int = 4) -> None: + self._max_hypotheses = max_hypotheses + + def hypothesize( + self, + classified: ClassifiedTimeline, + ctx: RetrievedContext, + query: str, + llm_url: str | None = None, + llm_model: str | None = None, + llm_api_key: str | None = None, + ) -> list[Hypothesis]: + """Generate hypotheses from a classified timeline and RAG context. + + Returns an empty list when no LLM is configured or there are no + clusters to analyse. + """ + if not llm_url or not llm_model: + return [] + + clusters = classified.timeline.clusters + if not clusters: + return [] + + cluster_lines = [ + _cluster_summary(c, classified.cluster_severities.get(c.cluster_id, c.severity)) + for c in clusters + ] + cluster_block = "\n".join(cluster_lines) + + context_parts: list[str] = [] + for chunk in ctx.chunks[:5]: + filename = chunk.get("filename", "unknown") + text = chunk.get("text", "")[:300] + context_parts.append(f"[{filename}] {text}") + context_block = "\n".join(context_parts) if context_parts else "(none)" + + user_message = ( + f"Query: {query}\n\n" + f"Context from runbooks and known patterns:\n{context_block}\n\n" + f"Log timeline (clustered, {len(clusters)} clusters):\n{cluster_block}\n\n" + f"Generate up to {self._max_hypotheses} hypotheses. Return JSON array only." + ) + + messages = [ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": user_message}, + ] + + raw_response = self._call_llm( + llm_url=llm_url, + llm_model=llm_model, + llm_api_key=llm_api_key, + messages=messages, + ) + if raw_response is None: + return [] + + return self._parse_response(raw_response) + + def _call_llm( + self, + llm_url: str, + llm_model: str, + llm_api_key: str | None, + messages: list[dict], + ) -> str | None: + """Send messages to the LLM and return raw text content.""" + headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} + + # Try cf-orch task-based endpoint first. + task_url = f"{llm_url.rstrip('/')}/api/inference/task" + try: + resp = httpx.post( + task_url, + json={ + "product": "turnstone", + "task": "log_analysis", + "payload": {"messages": messages, "stream": False}, + }, + headers=headers, + timeout=120.0, + ) + if resp.status_code == 200: + return _extract_content(resp.json()) + if resp.status_code != 404: + resp.raise_for_status() + logger.debug( + "No task assignment for turnstone.log_analysis — falling back to direct model" + ) + except Exception as exc: + logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) + + # Fallback: OpenAI-compat endpoint with explicit model name. + try: + resp = httpx.post( + f"{llm_url.rstrip('/')}/v1/chat/completions", + json={"model": llm_model, "messages": messages, "stream": False}, + headers=headers, + timeout=120.0, + ) + resp.raise_for_status() + return _extract_content(resp.json()) + except Exception as exc: + logger.warning( + "LLM hypothesizer failed (%s): %s", type(exc).__name__, exc + ) + return None + + def _parse_response(self, raw: str) -> list[Hypothesis]: + """Parse the LLM JSON response into a list of Hypothesis objects.""" + try: + data = json.loads(raw.strip()) + except json.JSONDecodeError: + logger.warning( + "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw + ) + return [] + + if not isinstance(data, list): + logger.warning( + "Hypothesizer: expected JSON array, got %s", type(data).__name__ + ) + return [] + + hypotheses: list[Hypothesis] = [] + for item in data[: self._max_hypotheses]: + if not isinstance(item, dict): + continue + severity_raw = item.get("severity", "ERROR") + severity = _validate_severity(str(severity_raw)) + hypothesis = Hypothesis( + hypothesis_id=str(uuid4()), + title=str(item.get("title", "Unknown"))[:80], + description=str(item.get("description", "")), + confidence=float(item.get("confidence", 0.5)), + supporting_cluster_ids=tuple(item.get("supporting_clusters", [])), + runbook_refs=(), + severity=severity, + ) + hypotheses.append(hypothesis) + + return hypotheses diff --git a/tests/test_diagnose_hypothesizer.py b/tests/test_diagnose_hypothesizer.py new file mode 100644 index 0000000..b0368ec --- /dev/null +++ b/tests/test_diagnose_hypothesizer.py @@ -0,0 +1,451 @@ +"""Tests for app/services/diagnose/hypothesizer.py — RootCauseHypothesizer. + +All tests use mocking; no real LLM calls are made. +""" +from __future__ import annotations + +import json +import re +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +from app.context.retriever import RetrievedContext +from app.services.diagnose.hypothesizer import RootCauseHypothesizer +from app.services.diagnose.models import ( + ClassifiedTimeline, + EventCluster, + Hypothesis, + TimelineResult, +) + + +# --------------------------------------------------------------------------- +# Fixture helpers +# --------------------------------------------------------------------------- + + +def _make_cluster( + cluster_id: str = "c1", + representative_text: str = "kernel: oom-killer invoked", + severity: str = "ERROR", + source_ids: tuple[str, ...] = ("syslog",), + pattern_tags: tuple[str, ...] = ("oom",), + start_iso: str | None = "2024-01-01T00:00:00+00:00", +) -> EventCluster: + return EventCluster( + cluster_id=cluster_id, + entries=("e1",), + start_iso=start_iso, + end_iso=None, + duration_seconds=1.0, + source_ids=source_ids, + pattern_tags=pattern_tags, + severity=severity, # type: ignore[arg-type] + burst=False, + gap_before_seconds=0.0, + representative_text=representative_text, + ) + + +def _make_timeline(clusters: tuple[EventCluster, ...] = ()) -> TimelineResult: + return TimelineResult( + clusters=clusters, + total_entries=len(clusters), + window_start=None, + window_end=None, + gap_count=0, + burst_count=0, + dominant_sources=(), + ) + + +def _make_classified( + clusters: tuple[EventCluster, ...] = (), + cluster_severities: dict | None = None, +) -> ClassifiedTimeline: + if cluster_severities is None: + cluster_severities = {c.cluster_id: c.severity for c in clusters} + return ClassifiedTimeline( + timeline=_make_timeline(clusters), + cluster_severities=cluster_severities, + classifier_used="pattern_tags", + model_id=None, + ) + + +def _make_ctx(chunks: list[dict] | None = None) -> RetrievedContext: + return RetrievedContext( + facts=[], + chunks=chunks or [{"text": "Memory pressure runbook.", "filename": "runbook.md"}], + ) + + +def _llm_json_response(items: list[dict[str, Any]]) -> MagicMock: + """Build a mock httpx.Response that returns the given list as JSON.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "choices": [{"message": {"content": json.dumps(items)}}] + } + return mock_resp + + +_SAMPLE_HYPOTHESES = [ + { + "title": "OOM killer terminated critical process", + "description": "The kernel invoked the OOM killer due to memory exhaustion. A process was terminated unexpectedly. This caused service disruption.", + "confidence": 0.85, + "severity": "CRITICAL", + "supporting_clusters": ["c1"], + }, + { + "title": "Disk I/O saturation", + "description": "High disk I/O latency was detected. Write operations stalled causing log backpressure. Check iostat for device utilisation.", + "confidence": 0.6, + "severity": "ERROR", + "supporting_clusters": ["c2"], + }, +] + + +# --------------------------------------------------------------------------- +# Test 1: Valid JSON response returns correct Hypothesis objects +# --------------------------------------------------------------------------- + + +def test_valid_json_response_returns_hypotheses(): + """Valid LLM JSON array produces a list of Hypothesis objects with correct fields.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + mock_resp = _llm_json_response(_SAMPLE_HYPOTHESES) + + with patch("httpx.post", return_value=mock_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="why is memory failing?", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert len(results) == 2 + assert isinstance(results[0], Hypothesis) + assert results[0].title == "OOM killer terminated critical process" + assert results[0].confidence == pytest.approx(0.85) + assert results[0].severity == "CRITICAL" + assert results[0].supporting_cluster_ids == ("c1",) + assert results[1].title == "Disk I/O saturation" + assert results[1].severity == "ERROR" + + +# --------------------------------------------------------------------------- +# Test 2: hypothesis_id is a non-empty UUID string on each result +# --------------------------------------------------------------------------- + + +_UUID_RE = re.compile( + r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" +) + + +def test_hypothesis_id_is_uuid(): + """Each returned Hypothesis carries a distinct UUID v4 hypothesis_id.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + mock_resp = _llm_json_response(_SAMPLE_HYPOTHESES) + + with patch("httpx.post", return_value=mock_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert len(results) == 2 + for h in results: + assert h.hypothesis_id, "hypothesis_id must not be empty" + assert _UUID_RE.match(h.hypothesis_id), ( + f"hypothesis_id {h.hypothesis_id!r} is not a UUID v4" + ) + # Each ID must be distinct + ids = [h.hypothesis_id for h in results] + assert len(set(ids)) == len(ids), "hypothesis_ids must be unique" + + +# --------------------------------------------------------------------------- +# Test 3: Malformed JSON response returns [] with a logged warning +# --------------------------------------------------------------------------- + + +def test_malformed_json_returns_empty_and_warns(caplog): + """When the LLM returns non-JSON text, hypothesize() returns [] and logs a warning.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + bad_resp = MagicMock() + bad_resp.status_code = 200 + bad_resp.json.return_value = { + "choices": [{"message": {"content": "not valid json"}}] + } + + import logging + with caplog.at_level(logging.WARNING), patch("httpx.post", return_value=bad_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert results == [] + assert any("invalid JSON" in r.message or "JSON" in r.message for r in caplog.records) + + +# --------------------------------------------------------------------------- +# Test 4: Non-list JSON (dict) returns [] +# --------------------------------------------------------------------------- + + +def test_non_list_json_returns_empty(caplog): + """When the LLM returns a JSON object instead of an array, hypothesize() returns [].""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + dict_resp = MagicMock() + dict_resp.status_code = 200 + dict_resp.json.return_value = { + "choices": [{"message": {"content": '{"error": "oops"}'}}] + } + + import logging + with caplog.at_level(logging.WARNING), patch("httpx.post", return_value=dict_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert results == [] + assert any("array" in r.message.lower() or "list" in r.message.lower() for r in caplog.records) + + +# --------------------------------------------------------------------------- +# Test 5: Empty clusters returns [] without any LLM call +# --------------------------------------------------------------------------- + + +def test_empty_clusters_returns_empty_no_llm_call(): + """ClassifiedTimeline with no clusters returns [] and never calls the LLM.""" + classified = _make_classified(clusters=()) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + with patch("httpx.post") as mock_post: + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert results == [] + mock_post.assert_not_called() + + +# --------------------------------------------------------------------------- +# Test 6: No LLM URL returns [] without any HTTP call +# --------------------------------------------------------------------------- + + +def test_no_llm_url_returns_empty_no_http_call(): + """When llm_url is None, hypothesize() returns [] immediately with no HTTP requests.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + with patch("httpx.post") as mock_post: + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url=None, + llm_model="llama3", + ) + + assert results == [] + mock_post.assert_not_called() + + +def test_empty_llm_url_returns_empty_no_http_call(): + """When llm_url is empty string, hypothesize() returns [] immediately.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + with patch("httpx.post") as mock_post: + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="", + llm_model="llama3", + ) + + assert results == [] + mock_post.assert_not_called() + + +def test_no_llm_model_returns_empty_no_http_call(): + """When llm_model is None, hypothesize() returns [] immediately.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + with patch("httpx.post") as mock_post: + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model=None, + ) + + assert results == [] + mock_post.assert_not_called() + + +# --------------------------------------------------------------------------- +# Test 7: max_hypotheses is respected +# --------------------------------------------------------------------------- + + +def test_max_hypotheses_respected(): + """When LLM returns more items than max_hypotheses, only max_hypotheses are returned.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer(max_hypotheses=3) + + six_items = [ + { + "title": f"Hypothesis {i}", + "description": "Some description. A second sentence. Third sentence here.", + "confidence": 0.5, + "severity": "ERROR", + "supporting_clusters": ["c1"], + } + for i in range(6) + ] + mock_resp = _llm_json_response(six_items) + + with patch("httpx.post", return_value=mock_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert len(results) == 3 + + +# --------------------------------------------------------------------------- +# Test 8: Severity validation — WARNING → WARN, garbage → ERROR +# --------------------------------------------------------------------------- + + +def test_severity_warning_maps_to_warn(): + """'WARNING' from the LLM is normalised to 'WARN'.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + items = [ + { + "title": "A warning severity hypothesis", + "description": "Test description. Second sentence. Third.", + "confidence": 0.7, + "severity": "WARNING", + "supporting_clusters": ["c1"], + } + ] + mock_resp = _llm_json_response(items) + + with patch("httpx.post", return_value=mock_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert len(results) == 1 + assert results[0].severity == "WARN" + + +def test_severity_garbage_maps_to_error(): + """An unrecognised severity string from the LLM defaults to 'ERROR'.""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + items = [ + { + "title": "A garbage severity hypothesis", + "description": "Test description. Second sentence. Third.", + "confidence": 0.4, + "severity": "GARBAGE", + "supporting_clusters": ["c1"], + } + ] + mock_resp = _llm_json_response(items) + + with patch("httpx.post", return_value=mock_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert len(results) == 1 + assert results[0].severity == "ERROR" + + +# --------------------------------------------------------------------------- +# Test 9: Confidence field works with string floats from the LLM +# --------------------------------------------------------------------------- + + +def test_confidence_string_float_coercion(): + """A confidence value returned as a string by the LLM is coerced to float via float().""" + cluster = _make_cluster() + classified = _make_classified(clusters=(cluster,)) + ctx = _make_ctx() + hypothesizer = RootCauseHypothesizer() + + items = [ + { + "title": "String confidence test", + "description": "Some description. Second sentence. Third.", + "confidence": "0.8", # LLM returned a string, not a float + "severity": "INFO", + "supporting_clusters": ["c1"], + } + ] + mock_resp = _llm_json_response(items) + + with patch("httpx.post", return_value=mock_resp): + results = hypothesizer.hypothesize( + classified, ctx, query="test", + llm_url="http://localhost:11434", + llm_model="llama3", + ) + + assert len(results) == 1 + assert isinstance(results[0].confidence, float) + assert results[0].confidence == pytest.approx(0.8)