- Add _coerce_float() module-level helper: catches TypeError/ValueError from
non-numeric LLM output (e.g. 'high', 'N/A') and returns a caller-supplied
default instead of raising.
- Replace float(item.get('confidence', 0.5)) with
_coerce_float(item.get('confidence'), 0.5) in _parse_response.
- Guard supporting_cluster_ids: tuple(item.get(...) or []) so a JSON null
from the LLM does not cause TypeError('NoneType is not iterable').
- runbook_refs is hardcoded as () and not sourced from LLM output; no change
needed there.
- Add test_non_numeric_confidence_uses_default (Test 10) to cover the 'high'
string case: asserts no exception and confidence == 0.5.
- 341 tests passing (+1).
Closes: #29
216 lines
7.5 KiB
Python
216 lines
7.5 KiB
Python
"""Stage 3: Root-Cause Hypothesizer — LLM + RAG context."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from uuid import uuid4
|
|
|
|
import httpx
|
|
|
|
from app.context.retriever import RetrievedContext
|
|
from app.services.diagnose.models import (
|
|
ClassifiedTimeline,
|
|
EventCluster,
|
|
Hypothesis,
|
|
SeverityLabel,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_VALID_SEVERITIES: frozenset[str] = frozenset({"CRITICAL", "ERROR", "WARN", "INFO", "DEBUG"})
|
|
|
|
_SYSTEM_PROMPT = (
|
|
"You are a Linux sysadmin log analyst. Analyze the following clustered log timeline "
|
|
"and generate 2-4 root cause hypotheses as a JSON array.\n\n"
|
|
"Each hypothesis must follow this exact JSON schema:\n"
|
|
'{"title": str (≤80 chars), "description": str (2-4 sentences), '
|
|
'"confidence": float (0.0-1.0), "severity": str (one of: CRITICAL, ERROR, WARN, INFO), '
|
|
'"supporting_clusters": [str list of cluster IDs]}\n\n'
|
|
"Return ONLY a valid JSON array. No prose, no markdown, no explanation outside the JSON."
|
|
)
|
|
|
|
|
|
def _coerce_float(val: object, default: float) -> float:
|
|
"""Safely coerce LLM output to float, returning default on failure."""
|
|
try:
|
|
return float(val) # type: ignore[arg-type]
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _validate_severity(s: str) -> SeverityLabel:
|
|
"""Map a raw severity string to a valid SeverityLabel, defaulting to ERROR."""
|
|
upper = s.upper()
|
|
if upper == "WARNING":
|
|
return "WARN"
|
|
return upper if upper in _VALID_SEVERITIES else "ERROR" # type: ignore[return-value]
|
|
|
|
|
|
def _cluster_summary(cluster: EventCluster, severity: str) -> str:
|
|
"""Build a condensed single-line summary of a cluster for the prompt."""
|
|
sources = ", ".join(list(cluster.source_ids)[:3])
|
|
patterns = ", ".join(list(cluster.pattern_tags)[:5])
|
|
text_preview = cluster.representative_text[:200]
|
|
summary = (
|
|
f"[{severity}] {cluster.start_iso or 'unknown'} "
|
|
f"({sources}) — {text_preview}"
|
|
)
|
|
if patterns:
|
|
summary += f" [patterns: {patterns}]"
|
|
return summary
|
|
|
|
|
|
def _extract_content(resp_json: dict) -> str | None:
|
|
"""Pull text content from an OpenAI-compat chat completion response."""
|
|
choices = resp_json.get("choices") or []
|
|
if not choices:
|
|
return None
|
|
return (choices[0].get("message", {}).get("content") or "").strip() or None
|
|
|
|
|
|
class RootCauseHypothesizer:
|
|
"""Generate ranked root-cause hypotheses from a classified log timeline."""
|
|
|
|
def __init__(self, max_hypotheses: int = 4) -> None:
|
|
self._max_hypotheses = max_hypotheses
|
|
|
|
def hypothesize(
|
|
self,
|
|
classified: ClassifiedTimeline,
|
|
ctx: RetrievedContext,
|
|
query: str,
|
|
llm_url: str | None = None,
|
|
llm_model: str | None = None,
|
|
llm_api_key: str | None = None,
|
|
) -> list[Hypothesis]:
|
|
"""Generate hypotheses from a classified timeline and RAG context.
|
|
|
|
Returns an empty list when no LLM is configured or there are no
|
|
clusters to analyse.
|
|
"""
|
|
if not llm_url or not llm_model:
|
|
return []
|
|
|
|
clusters = classified.timeline.clusters
|
|
if not clusters:
|
|
return []
|
|
|
|
cluster_lines = [
|
|
_cluster_summary(c, classified.cluster_severities.get(c.cluster_id, c.severity))
|
|
for c in clusters
|
|
]
|
|
cluster_block = "\n".join(cluster_lines)
|
|
|
|
context_parts: list[str] = []
|
|
for chunk in ctx.chunks[:5]:
|
|
filename = chunk.get("filename", "unknown")
|
|
text = chunk.get("text", "")[:300]
|
|
context_parts.append(f"[{filename}] {text}")
|
|
context_block = "\n".join(context_parts) if context_parts else "(none)"
|
|
|
|
user_message = (
|
|
f"Query: {query}\n\n"
|
|
f"Context from runbooks and known patterns:\n{context_block}\n\n"
|
|
f"Log timeline (clustered, {len(clusters)} clusters):\n{cluster_block}\n\n"
|
|
f"Generate up to {self._max_hypotheses} hypotheses. Return JSON array only."
|
|
)
|
|
|
|
messages = [
|
|
{"role": "system", "content": _SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_message},
|
|
]
|
|
|
|
raw_response = self._call_llm(
|
|
llm_url=llm_url,
|
|
llm_model=llm_model,
|
|
llm_api_key=llm_api_key,
|
|
messages=messages,
|
|
)
|
|
if raw_response is None:
|
|
return []
|
|
|
|
return self._parse_response(raw_response)
|
|
|
|
def _call_llm(
|
|
self,
|
|
llm_url: str,
|
|
llm_model: str,
|
|
llm_api_key: str | None,
|
|
messages: list[dict],
|
|
) -> str | None:
|
|
"""Send messages to the LLM and return raw text content."""
|
|
headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {}
|
|
|
|
# Try cf-orch task-based endpoint first.
|
|
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
|
|
try:
|
|
resp = httpx.post(
|
|
task_url,
|
|
json={
|
|
"product": "turnstone",
|
|
"task": "log_analysis",
|
|
"payload": {"messages": messages, "stream": False},
|
|
},
|
|
headers=headers,
|
|
timeout=120.0,
|
|
)
|
|
if resp.status_code == 200:
|
|
return _extract_content(resp.json())
|
|
if resp.status_code != 404:
|
|
resp.raise_for_status()
|
|
logger.debug(
|
|
"No task assignment for turnstone.log_analysis — falling back to direct model"
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
|
|
|
|
# Fallback: OpenAI-compat endpoint with explicit model name.
|
|
try:
|
|
resp = httpx.post(
|
|
f"{llm_url.rstrip('/')}/v1/chat/completions",
|
|
json={"model": llm_model, "messages": messages, "stream": False},
|
|
headers=headers,
|
|
timeout=120.0,
|
|
)
|
|
resp.raise_for_status()
|
|
return _extract_content(resp.json())
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"LLM hypothesizer failed (%s): %s", type(exc).__name__, exc
|
|
)
|
|
return None
|
|
|
|
def _parse_response(self, raw: str) -> list[Hypothesis]:
|
|
"""Parse the LLM JSON response into a list of Hypothesis objects."""
|
|
try:
|
|
data = json.loads(raw.strip())
|
|
except json.JSONDecodeError:
|
|
logger.warning(
|
|
"Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw
|
|
)
|
|
return []
|
|
|
|
if not isinstance(data, list):
|
|
logger.warning(
|
|
"Hypothesizer: expected JSON array, got %s", type(data).__name__
|
|
)
|
|
return []
|
|
|
|
hypotheses: list[Hypothesis] = []
|
|
for item in data[: self._max_hypotheses]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
severity_raw = item.get("severity", "ERROR")
|
|
severity = _validate_severity(str(severity_raw))
|
|
hypothesis = Hypothesis(
|
|
hypothesis_id=str(uuid4()),
|
|
title=str(item.get("title", "Unknown"))[:80],
|
|
description=str(item.get("description", "")),
|
|
confidence=_coerce_float(item.get("confidence"), 0.5),
|
|
supporting_cluster_ids=tuple(item.get("supporting_clusters") or []),
|
|
runbook_refs=(),
|
|
severity=severity,
|
|
)
|
|
hypotheses.append(hypothesis)
|
|
|
|
return hypotheses
|