"""Stage 5: Summary Synthesizer — deterministic narrative from ranked hypotheses. Streaming upgrade (async SSE chunks) is tracked as a follow-up enhancement. This implementation is synchronous to match the rest of the pipeline. """ from __future__ import annotations import logging import httpx from app.context.retriever import RetrievedContext from app.services.diagnose.models import RankedHypothesis, TimelineResult logger = logging.getLogger(__name__) _SYSTEM_PROMPT = ( "You are a Linux sysadmin diagnosing a system incident. " "Write a concise, actionable incident diagnosis.\n\n" "Format your response exactly as:\n" "1. VERDICT: [CRITICAL|ERROR|WARN|INFO] — (% confidence)\n" "2. TIMELINE: \n" "3. ROOT CAUSES:\n" " - (%)\n" " - (%)\n" "4. RECOMMENDED ACTIONS:\n" " - \n" "5. INVESTIGATE FURTHER: " ) def _extract_content(resp_json: dict) -> str | None: """Pull text content from an OpenAI-compat chat completion response.""" choices = resp_json.get("choices") or [] if not choices: return None return (choices[0].get("message", {}).get("content") or "").strip() or None def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: """Build the hypothesis block for the prompt (non-suppressed only, top 3).""" active = [rh for rh in ranked if not rh.suppress][:3] if not active: return "(none)" lines: list[str] = [] for rh in active: h = rh.hypothesis conf_pct = int(h.confidence * 100) similar = ( f"Yes — suppressed, {rh.suppression_reason}" if rh.suppress and rh.suppression_reason else "No" ) novelty = f"{rh.novelty_score:.2f}" lines.append( f"- [{h.severity}, {conf_pct}%] {h.title}\n" f" Similar resolved incident? {similar} (novelty {novelty})" ) return "\n".join(lines) def _build_context_block(ctx: RetrievedContext) -> str: """Build the runbook context block for the prompt.""" parts: list[str] = [] for chunk in ctx.chunks[:5]: filename = chunk.get("filename", "unknown") text = chunk.get("text", "")[:300] parts.append(f"[{filename}] {text}") return "\n".join(parts) if parts else "(none)" def _deterministic_fallback( ranked: list[RankedHypothesis], timeline: TimelineResult, ) -> str: """Build a deterministic fallback text when no LLM is available.""" active = [rh for rh in ranked if not rh.suppress][:3] if active: top = active[0] verdict_severity = top.hypothesis.severity verdict_title = top.hypothesis.title verdict_conf = int(top.hypothesis.confidence * 100) elif ranked: top = ranked[0] verdict_severity = top.hypothesis.severity verdict_title = top.hypothesis.title verdict_conf = int(top.hypothesis.confidence * 100) else: verdict_severity = "UNKNOWN" verdict_title = "No hypotheses generated" verdict_conf = 0 root_causes = ", ".join( rh.hypothesis.title for rh in (active or ranked[:3]) ) or "None" return ( f"VERDICT: {verdict_severity} — {verdict_title} ({verdict_conf}% confidence)\n" f"TIMELINE: {timeline.total_entries} entries across {len(timeline.clusters)} clusters.\n" f"ROOT CAUSES: {root_causes}" ) class SummarySynthesizer: """Stage 5 of the multi-agent diagnose pipeline. Synthesizes a human-readable incident narrative from ranked hypotheses, the reconstructed timeline, and RAG context. When no LLM is configured, returns a deterministic fallback built from the hypothesis data. """ def synthesize( self, ranked: list[RankedHypothesis], timeline: TimelineResult, ctx: RetrievedContext, query: str, llm_url: str | None = None, llm_model: str | None = None, llm_api_key: str | None = None, ) -> str: """Return synthesis text (single string, synchronous). Falls back to a deterministic narrative when no LLM URL or model is provided, or when the LLM call fails. """ fallback = _deterministic_fallback(ranked, timeline) if not llm_url or not llm_model: return fallback hypothesis_block = _build_hypothesis_block(ranked) context_block = _build_context_block(ctx) dominant = ", ".join(timeline.dominant_sources[:5]) or "none" user_message = ( f"Query: {query}\n\n" f"Timeline summary:\n" f"- {len(timeline.clusters)} clusters, " f"{timeline.burst_count} bursts, " f"{timeline.gap_count} silence gaps\n" f"- Primary sources: {dominant}\n\n" f"Top hypotheses:\n{hypothesis_block}\n\n" f"Context from runbooks:\n{context_block}" ) messages = [ {"role": "system", "content": _SYSTEM_PROMPT}, {"role": "user", "content": user_message}, ] result = self._call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, messages=messages, ) return result if result else fallback def _call_llm( self, llm_url: str, llm_model: str, llm_api_key: str | None, messages: list[dict], ) -> str | None: """Send messages to the LLM and return raw text content. Tries the cf-orch task endpoint first, falls back to direct OpenAI-compat. """ headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} task_url = f"{llm_url.rstrip('/')}/api/inference/task" try: resp = httpx.post( task_url, json={ "product": "turnstone", "task": "log_analysis", "payload": {"messages": messages, "stream": False}, }, headers=headers, timeout=120.0, ) if resp.status_code == 200: return _extract_content(resp.json()) if resp.status_code != 404: resp.raise_for_status() logger.debug( "No task assignment for turnstone.log_analysis — falling back to direct model" ) except Exception as exc: logger.debug( "Task endpoint unavailable (%s) — falling back to direct model", exc ) try: resp = httpx.post( f"{llm_url.rstrip('/')}/v1/chat/completions", json={"model": llm_model, "messages": messages, "stream": False}, headers=headers, timeout=120.0, ) resp.raise_for_status() return _extract_content(resp.json()) except Exception as exc: logger.warning( "LLM synthesizer failed (%s): %s", type(exc).__name__, exc ) return None