"""Stage 3: Root-Cause Hypothesizer — LLM + RAG context.""" from __future__ import annotations import json import logging from uuid import uuid4 from app.context.retriever import RetrievedContext from app.services.diagnose._llm_client import call_llm, extract_first_json_array, strip_json_fences from app.services.diagnose.models import ( ClassifiedTimeline, EventCluster, Hypothesis, SeverityLabel, ) logger = logging.getLogger(__name__) _VALID_SEVERITIES: frozenset[str] = frozenset({"CRITICAL", "ERROR", "WARN", "INFO", "DEBUG"}) _SYSTEM_PROMPT = ( "You are a Linux sysadmin log analyst. Analyze the following clustered log timeline " "and generate 2-4 root cause hypotheses as a JSON array.\n\n" "Each hypothesis must follow this exact JSON schema:\n" '{"title": str (≤80 chars), "description": str (2-4 sentences), ' '"confidence": float (0.0-1.0), "severity": str (one of: CRITICAL, ERROR, WARN, INFO), ' '"supporting_clusters": [str list of cluster IDs]}\n\n' "Return ONLY a valid JSON array. No prose, no markdown, no explanation outside the JSON." ) def _coerce_float(val: object, default: float) -> float: """Safely coerce LLM output to float, returning default on failure.""" try: return float(val) # type: ignore[arg-type] except (TypeError, ValueError): return default def _validate_severity(s: str) -> SeverityLabel: """Map a raw severity string to a valid SeverityLabel, defaulting to ERROR.""" upper = s.upper() if upper == "WARNING": return "WARN" return upper if upper in _VALID_SEVERITIES else "ERROR" # type: ignore[return-value] def _cluster_summary(cluster: EventCluster, severity: str) -> str: """Build a condensed single-line summary of a cluster for the prompt.""" sources = ", ".join(list(cluster.source_ids)[:3]) patterns = ", ".join(list(cluster.pattern_tags)[:5]) text_preview = cluster.representative_text[:200] summary = ( f"[{severity}] {cluster.start_iso or 'unknown'} " f"({sources}) — {text_preview}" ) if patterns: summary += f" [patterns: {patterns}]" return summary class RootCauseHypothesizer: """Generate ranked root-cause hypotheses from a classified log timeline.""" def __init__(self, max_hypotheses: int = 4) -> None: self._max_hypotheses = max_hypotheses def hypothesize( self, classified: ClassifiedTimeline, ctx: RetrievedContext, query: str, llm_url: str | None = None, llm_model: str | None = None, llm_api_key: str | None = None, ) -> list[Hypothesis]: """Generate hypotheses from a classified timeline and RAG context. Returns an empty list when no LLM is configured or there are no clusters to analyse. """ if not llm_url or not llm_model: return [] clusters = classified.timeline.clusters if not clusters: return [] cluster_lines = [ _cluster_summary(c, classified.cluster_severities.get(c.cluster_id, c.severity)) for c in clusters ] cluster_block = "\n".join(cluster_lines) context_parts: list[str] = [] for chunk in ctx.chunks[:5]: filename = chunk.get("filename", "unknown") text = chunk.get("text", "")[:300] context_parts.append(f"[{filename}] {text}") context_block = "\n".join(context_parts) if context_parts else "(none)" user_message = ( f"Query: {query}\n\n" f"Context from runbooks and known patterns:\n{context_block}\n\n" f"Log timeline (clustered, {len(clusters)} clusters):\n{cluster_block}\n\n" f"Generate up to {self._max_hypotheses} hypotheses. Return JSON array only." ) messages = [ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": user_message}, ] raw_response = call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, messages=messages, ) if raw_response is None: return [] return self._parse_response(raw_response) def _parse_response(self, raw: str) -> list[Hypothesis]: """Parse the LLM JSON response into a list of Hypothesis objects. Strips markdown code fences before parsing — some LLMs wrap JSON in triple-backtick fences despite being instructed not to. """ try: # extract_first_json_array handles reasoning models that emit valid # JSON then repeat it inside a markdown fence block. data = json.loads(extract_first_json_array(strip_json_fences(raw))) except json.JSONDecodeError: logger.warning( "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw ) return [] if not isinstance(data, list): logger.warning( "Hypothesizer: expected JSON array, got %s", type(data).__name__ ) return [] hypotheses: list[Hypothesis] = [] for item in data[: self._max_hypotheses]: if not isinstance(item, dict): continue severity_raw = item.get("severity", "ERROR") severity = _validate_severity(str(severity_raw)) hypothesis = Hypothesis( hypothesis_id=str(uuid4()), title=str(item.get("title", "Unknown"))[:80], description=str(item.get("description", "")), confidence=_coerce_float(item.get("confidence"), 0.5), supporting_cluster_ids=tuple( str(x) for x in (item.get("supporting_clusters") or []) ), runbook_refs=(), severity=severity, ) hypotheses.append(hypothesis) return hypotheses