"""Stage 3: Root-Cause Hypothesizer — LLM + RAG context.""" from __future__ import annotations import json import logging from uuid import uuid4 import httpx from app.context.retriever import RetrievedContext from app.services.diagnose.models import ( ClassifiedTimeline, EventCluster, Hypothesis, SeverityLabel, ) logger = logging.getLogger(__name__) _VALID_SEVERITIES: frozenset[str] = frozenset({"CRITICAL", "ERROR", "WARN", "INFO", "DEBUG"}) _SYSTEM_PROMPT = ( "You are a Linux sysadmin log analyst. Analyze the following clustered log timeline " "and generate 2-4 root cause hypotheses as a JSON array.\n\n" "Each hypothesis must follow this exact JSON schema:\n" '{"title": str (≤80 chars), "description": str (2-4 sentences), ' '"confidence": float (0.0-1.0), "severity": str (one of: CRITICAL, ERROR, WARN, INFO), ' '"supporting_clusters": [str list of cluster IDs]}\n\n' "Return ONLY a valid JSON array. No prose, no markdown, no explanation outside the JSON." ) def _coerce_float(val: object, default: float) -> float: """Safely coerce LLM output to float, returning default on failure.""" try: return float(val) # type: ignore[arg-type] except (TypeError, ValueError): return default def _validate_severity(s: str) -> SeverityLabel: """Map a raw severity string to a valid SeverityLabel, defaulting to ERROR.""" upper = s.upper() if upper == "WARNING": return "WARN" return upper if upper in _VALID_SEVERITIES else "ERROR" # type: ignore[return-value] def _cluster_summary(cluster: EventCluster, severity: str) -> str: """Build a condensed single-line summary of a cluster for the prompt.""" sources = ", ".join(list(cluster.source_ids)[:3]) patterns = ", ".join(list(cluster.pattern_tags)[:5]) text_preview = cluster.representative_text[:200] summary = ( f"[{severity}] {cluster.start_iso or 'unknown'} " f"({sources}) — {text_preview}" ) if patterns: summary += f" [patterns: {patterns}]" return summary def _extract_content(resp_json: dict) -> str | None: """Pull text content from an OpenAI-compat chat completion response.""" choices = resp_json.get("choices") or [] if not choices: return None return (choices[0].get("message", {}).get("content") or "").strip() or None class RootCauseHypothesizer: """Generate ranked root-cause hypotheses from a classified log timeline.""" def __init__(self, max_hypotheses: int = 4) -> None: self._max_hypotheses = max_hypotheses def hypothesize( self, classified: ClassifiedTimeline, ctx: RetrievedContext, query: str, llm_url: str | None = None, llm_model: str | None = None, llm_api_key: str | None = None, ) -> list[Hypothesis]: """Generate hypotheses from a classified timeline and RAG context. Returns an empty list when no LLM is configured or there are no clusters to analyse. """ if not llm_url or not llm_model: return [] clusters = classified.timeline.clusters if not clusters: return [] cluster_lines = [ _cluster_summary(c, classified.cluster_severities.get(c.cluster_id, c.severity)) for c in clusters ] cluster_block = "\n".join(cluster_lines) context_parts: list[str] = [] for chunk in ctx.chunks[:5]: filename = chunk.get("filename", "unknown") text = chunk.get("text", "")[:300] context_parts.append(f"[{filename}] {text}") context_block = "\n".join(context_parts) if context_parts else "(none)" user_message = ( f"Query: {query}\n\n" f"Context from runbooks and known patterns:\n{context_block}\n\n" f"Log timeline (clustered, {len(clusters)} clusters):\n{cluster_block}\n\n" f"Generate up to {self._max_hypotheses} hypotheses. Return JSON array only." ) messages = [ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": user_message}, ] raw_response = self._call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, messages=messages, ) if raw_response is None: return [] return self._parse_response(raw_response) def _call_llm( self, llm_url: str, llm_model: str, llm_api_key: str | None, messages: list[dict], ) -> str | None: """Send messages to the LLM and return raw text content.""" headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} # Try cf-orch task-based endpoint first. task_url = f"{llm_url.rstrip('/')}/api/inference/task" try: resp = httpx.post( task_url, json={ "product": "turnstone", "task": "log_analysis", "payload": {"messages": messages, "stream": False}, }, headers=headers, timeout=120.0, ) if resp.status_code == 200: return _extract_content(resp.json()) if resp.status_code != 404: resp.raise_for_status() logger.debug( "No task assignment for turnstone.log_analysis — falling back to direct model" ) except Exception as exc: logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) # Fallback: OpenAI-compat endpoint with explicit model name. try: resp = httpx.post( f"{llm_url.rstrip('/')}/v1/chat/completions", json={"model": llm_model, "messages": messages, "stream": False}, headers=headers, timeout=120.0, ) resp.raise_for_status() return _extract_content(resp.json()) except Exception as exc: logger.warning( "LLM hypothesizer failed (%s): %s", type(exc).__name__, exc ) return None def _parse_response(self, raw: str) -> list[Hypothesis]: """Parse the LLM JSON response into a list of Hypothesis objects.""" try: data = json.loads(raw.strip()) except json.JSONDecodeError: logger.warning( "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw ) return [] if not isinstance(data, list): logger.warning( "Hypothesizer: expected JSON array, got %s", type(data).__name__ ) return [] hypotheses: list[Hypothesis] = [] for item in data[: self._max_hypotheses]: if not isinstance(item, dict): continue severity_raw = item.get("severity", "ERROR") severity = _validate_severity(str(severity_raw)) hypothesis = Hypothesis( hypothesis_id=str(uuid4()), title=str(item.get("title", "Unknown"))[:80], description=str(item.get("description", "")), confidence=_coerce_float(item.get("confidence"), 0.5), supporting_cluster_ids=tuple(item.get("supporting_clusters") or []), runbook_refs=(), severity=severity, ) hypotheses.append(hypothesis) return hypotheses