turnstone/app/services/diagnose/synthesizer.py

210 lines
7.1 KiB
Python

"""Stage 5: Summary Synthesizer — deterministic narrative from ranked hypotheses.
Streaming upgrade (async SSE chunks) is tracked as a follow-up enhancement.
This implementation is synchronous to match the rest of the pipeline.
"""
from __future__ import annotations
import logging
import httpx
from app.context.retriever import RetrievedContext
from app.services.diagnose.models import RankedHypothesis, TimelineResult
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = (
"You are a Linux sysadmin diagnosing a system incident. "
"Write a concise, actionable incident diagnosis.\n\n"
"Format your response exactly as:\n"
"1. VERDICT: [CRITICAL|ERROR|WARN|INFO] — <what happened> (<X>% confidence)\n"
"2. TIMELINE: <what the logs show in sequence, 2-3 sentences>\n"
"3. ROOT CAUSES:\n"
" - <hypothesis 1 title> (<confidence>%)\n"
" - <hypothesis 2 title> (<confidence>%)\n"
"4. RECOMMENDED ACTIONS:\n"
" - <action based on hypotheses>\n"
"5. INVESTIGATE FURTHER: <open questions, if any>"
)
def _extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response."""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str:
"""Build the hypothesis block for the prompt (non-suppressed only, top 3)."""
active = [rh for rh in ranked if not rh.suppress][:3]
if not active:
return "(none)"
lines: list[str] = []
for rh in active:
h = rh.hypothesis
conf_pct = int(h.confidence * 100)
similar = (
f"Yes — suppressed, {rh.suppression_reason}"
if rh.suppress and rh.suppression_reason
else "No"
)
novelty = f"{rh.novelty_score:.2f}"
lines.append(
f"- [{h.severity}, {conf_pct}%] {h.title}\n"
f" Similar resolved incident? {similar} (novelty {novelty})"
)
return "\n".join(lines)
def _build_context_block(ctx: RetrievedContext) -> str:
"""Build the runbook context block for the prompt."""
parts: list[str] = []
for chunk in ctx.chunks[:5]:
filename = chunk.get("filename", "unknown")
text = chunk.get("text", "")[:300]
parts.append(f"[{filename}] {text}")
return "\n".join(parts) if parts else "(none)"
def _deterministic_fallback(
ranked: list[RankedHypothesis],
timeline: TimelineResult,
) -> str:
"""Build a deterministic fallback text when no LLM is available."""
active = [rh for rh in ranked if not rh.suppress][:3]
if active:
top = active[0]
verdict_severity = top.hypothesis.severity
verdict_title = top.hypothesis.title
verdict_conf = int(top.hypothesis.confidence * 100)
elif ranked:
top = ranked[0]
verdict_severity = top.hypothesis.severity
verdict_title = top.hypothesis.title
verdict_conf = int(top.hypothesis.confidence * 100)
else:
verdict_severity = "UNKNOWN"
verdict_title = "No hypotheses generated"
verdict_conf = 0
root_causes = ", ".join(
rh.hypothesis.title for rh in (active or ranked[:3])
) or "None"
return (
f"VERDICT: {verdict_severity}{verdict_title} ({verdict_conf}% confidence)\n"
f"TIMELINE: {timeline.total_entries} entries across {len(timeline.clusters)} clusters.\n"
f"ROOT CAUSES: {root_causes}"
)
class SummarySynthesizer:
"""Stage 5 of the multi-agent diagnose pipeline.
Synthesizes a human-readable incident narrative from ranked hypotheses,
the reconstructed timeline, and RAG context. When no LLM is configured,
returns a deterministic fallback built from the hypothesis data.
"""
def synthesize(
self,
ranked: list[RankedHypothesis],
timeline: TimelineResult,
ctx: RetrievedContext,
query: str,
llm_url: str | None = None,
llm_model: str | None = None,
llm_api_key: str | None = None,
) -> str:
"""Return synthesis text (single string, synchronous).
Falls back to a deterministic narrative when no LLM URL or model is
provided, or when the LLM call fails.
"""
fallback = _deterministic_fallback(ranked, timeline)
if not llm_url or not llm_model:
return fallback
hypothesis_block = _build_hypothesis_block(ranked)
context_block = _build_context_block(ctx)
dominant = ", ".join(timeline.dominant_sources[:5]) or "none"
user_message = (
f"Query: {query}\n\n"
f"Timeline summary:\n"
f"- {len(timeline.clusters)} clusters, "
f"{timeline.burst_count} bursts, "
f"{timeline.gap_count} silence gaps\n"
f"- Primary sources: {dominant}\n\n"
f"Top hypotheses:\n{hypothesis_block}\n\n"
f"Context from runbooks:\n{context_block}"
)
messages = [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": user_message},
]
result = self._call_llm(
llm_url=llm_url,
llm_model=llm_model,
llm_api_key=llm_api_key,
messages=messages,
)
return result if result else fallback
def _call_llm(
self,
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
) -> str | None:
"""Send messages to the LLM and return raw text content.
Tries the cf-orch task endpoint first, falls back to direct OpenAI-compat.
"""
headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {}
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": "log_analysis",
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=120.0,
)
if resp.status_code == 200:
return _extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.log_analysis — falling back to direct model"
)
except Exception as exc:
logger.debug(
"Task endpoint unavailable (%s) — falling back to direct model", exc
)
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=120.0,
)
resp.raise_for_status()
return _extract_content(resp.json())
except Exception as exc:
logger.warning(
"LLM synthesizer failed (%s): %s", type(exc).__name__, exc
)
return None