refactor: pipeline cleanup — 6 follow-up fixes (#33–#38) #40

Merged
pyr0ball merged 3 commits from feat/pipeline-cleanup into main 2026-05-25 20:00:11 -07:00
7 changed files with 200 additions and 164 deletions
Showing only changes of commit 94d796e103 - Show all commits

View file

@ -227,7 +227,6 @@ async def diagnose_stream(
yield {"type": "status", "message": "Loading environment context…"} yield {"type": "status", "message": "Loading environment context…"}
ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query)) ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query))
context_block = format_context_block(ctx)
yield { yield {
"type": "context", "type": "context",
"facts": ctx.facts, "facts": ctx.facts,
@ -320,6 +319,8 @@ async def diagnose_stream(
return # pipeline emits its own "done" event return # pipeline emits its own "done" event
if llm_url and llm_model and combined: if llm_url and llm_model and combined:
# Only compute context_block in the legacy path — pipeline uses ctx directly.
context_block = format_context_block(ctx)
yield {"type": "status", "message": "Analyzing with LLM…"} yield {"type": "status", "message": "Analyzing with LLM…"}
reasoning = await asyncio.to_thread( reasoning = await asyncio.to_thread(
lambda: summarize( lambda: summarize(

View file

@ -0,0 +1,116 @@
"""Shared LLM client for the multi-agent diagnose pipeline.
Both Stage 3 (RootCauseHypothesizer) and Stage 5 (SummarySynthesizer) send
messages to the same LLM backend using the same two-step pattern:
1. Try the cf-orch task endpoint product-scoped inference routing.
2. Fall back to OpenAI-compat direct model call by name.
Centralising here means changes to auth headers, timeouts, retry logic, or
cf-orch payload structure only need to be made once.
"""
from __future__ import annotations
import logging
import re
import httpx
logger = logging.getLogger(__name__)
# Regex that strips ```json … ``` or ``` … ``` fences from LLM output.
_JSON_FENCE_RE = re.compile(
r"^```(?:json)?\s*|\s*```$",
re.MULTILINE,
)
def extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response.
Returns None when the response has no choices or empty content.
"""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def strip_json_fences(raw: str) -> str:
"""Remove markdown code fences that some LLMs wrap around JSON output.
Example: '```json\\n[...]\\n```' '[...]'
"""
return _JSON_FENCE_RE.sub("", raw).strip()
def call_llm(
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
task_name: str = "log_analysis",
timeout: float = 120.0,
) -> str | None:
"""Send messages to the LLM; return raw text or None on failure.
Tries the cf-orch task endpoint first (product-routed inference).
Falls back to a direct OpenAI-compat ``/v1/chat/completions`` call when:
- The task endpoint returns 404 (no assignment for this task).
- The task endpoint is unreachable (connection error, timeout, etc.).
Args:
llm_url: Base URL of the LLM backend (e.g. ``http://10.1.10.71:7700``).
llm_model: Model identifier used in the OpenAI-compat fallback call.
llm_api_key: Optional bearer token for authenticated endpoints.
messages: OpenAI-style message list (system + user turns).
task_name: cf-orch task name for product-routed inference (default: ``log_analysis``).
timeout: Request timeout in seconds (default: 120).
Returns:
Raw text content string, or None if both paths fail.
"""
headers: dict[str, str] = {}
if llm_api_key:
headers["Authorization"] = f"Bearer {llm_api_key}"
# --- Path 1: cf-orch task endpoint ---
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": task_name,
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=timeout,
)
if resp.status_code == 200:
return extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.%s — falling back to direct model",
task_name,
)
except Exception as exc: # noqa: BLE001
# Broad catch is intentional: captures network errors, timeouts, and
# any backend-specific exceptions so the pipeline can fall back.
logger.debug(
"Task endpoint unavailable (%s) — falling back to direct model", exc
)
# --- Path 2: OpenAI-compat fallback ---
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=timeout,
)
resp.raise_for_status()
return extract_content(resp.json())
except Exception as exc: # noqa: BLE001
logger.warning("LLM call failed (%s): %s", type(exc).__name__, exc)
return None

View file

@ -19,6 +19,8 @@ import os
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from types import MappingProxyType
from app.services.diagnose.models import ( from app.services.diagnose.models import (
ClassifiedTimeline, ClassifiedTimeline,
EventCluster, EventCluster,
@ -243,7 +245,7 @@ class SeverityClassifier:
return ClassifiedTimeline( return ClassifiedTimeline(
timeline=timeline, timeline=timeline,
cluster_severities=cluster_severities, cluster_severities=MappingProxyType(cluster_severities),
classifier_used=classifier_used, # type: ignore[arg-type] classifier_used=classifier_used, # type: ignore[arg-type]
model_id=self._model_id if ml_available else None, model_id=self._model_id if ml_available else None,
) )

View file

@ -5,9 +5,8 @@ import json
import logging import logging
from uuid import uuid4 from uuid import uuid4
import httpx
from app.context.retriever import RetrievedContext from app.context.retriever import RetrievedContext
from app.services.diagnose._llm_client import call_llm, strip_json_fences
from app.services.diagnose.models import ( from app.services.diagnose.models import (
ClassifiedTimeline, ClassifiedTimeline,
EventCluster, EventCluster,
@ -60,14 +59,6 @@ def _cluster_summary(cluster: EventCluster, severity: str) -> str:
return summary return summary
def _extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response."""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
class RootCauseHypothesizer: class RootCauseHypothesizer:
"""Generate ranked root-cause hypotheses from a classified log timeline.""" """Generate ranked root-cause hypotheses from a classified log timeline."""
@ -120,7 +111,7 @@ class RootCauseHypothesizer:
{"role": "user", "content": user_message}, {"role": "user", "content": user_message},
] ]
raw_response = self._call_llm( raw_response = call_llm(
llm_url=llm_url, llm_url=llm_url,
llm_model=llm_model, llm_model=llm_model,
llm_api_key=llm_api_key, llm_api_key=llm_api_key,
@ -131,59 +122,14 @@ class RootCauseHypothesizer:
return self._parse_response(raw_response) return self._parse_response(raw_response)
def _call_llm(
self,
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
) -> str | None:
"""Send messages to the LLM and return raw text content."""
headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {}
# Try cf-orch task-based endpoint first.
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": "log_analysis",
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=120.0,
)
if resp.status_code == 200:
return _extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.log_analysis — falling back to direct model"
)
except Exception as exc:
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
# Fallback: OpenAI-compat endpoint with explicit model name.
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=120.0,
)
resp.raise_for_status()
return _extract_content(resp.json())
except Exception as exc:
logger.warning(
"LLM hypothesizer failed (%s): %s", type(exc).__name__, exc
)
return None
def _parse_response(self, raw: str) -> list[Hypothesis]: def _parse_response(self, raw: str) -> list[Hypothesis]:
"""Parse the LLM JSON response into a list of Hypothesis objects.""" """Parse the LLM JSON response into a list of Hypothesis objects.
Strips markdown code fences before parsing some LLMs wrap JSON in
triple-backtick fences despite being instructed not to.
"""
try: try:
data = json.loads(raw.strip()) data = json.loads(strip_json_fences(raw))
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning( logger.warning(
"Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw
@ -207,7 +153,9 @@ class RootCauseHypothesizer:
title=str(item.get("title", "Unknown"))[:80], title=str(item.get("title", "Unknown"))[:80],
description=str(item.get("description", "")), description=str(item.get("description", "")),
confidence=_coerce_float(item.get("confidence"), 0.5), confidence=_coerce_float(item.get("confidence"), 0.5),
supporting_cluster_ids=tuple(item.get("supporting_clusters") or []), supporting_cluster_ids=tuple(
str(x) for x in (item.get("supporting_clusters") or [])
),
runbook_refs=(), runbook_refs=(),
severity=severity, severity=severity,
) )

View file

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from types import MappingProxyType
from typing import Literal from typing import Literal
SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"]
@ -40,10 +41,14 @@ class TimelineResult:
@dataclass(frozen=True) @dataclass(frozen=True)
class ClassifiedTimeline: class ClassifiedTimeline:
"""Timeline annotated with ML-assigned severity per cluster.""" """Timeline annotated with ML-assigned severity per cluster.
``cluster_severities`` is a ``MappingProxyType`` so the mapping is
fully immutable consistent with the ``frozen=True`` intent.
"""
timeline: TimelineResult timeline: TimelineResult
cluster_severities: dict[str, SeverityLabel] cluster_severities: MappingProxyType # MappingProxyType[str, SeverityLabel]
classifier_used: Literal["ml", "pattern_tags", "regex"] classifier_used: Literal["ml", "pattern_tags", "regex"]
model_id: str | None model_id: str | None

View file

@ -53,9 +53,15 @@ async def run_pipeline(
""" """
# Stage 1: Timeline reconstruction # Stage 1: Timeline reconstruction
yield {"type": "status", "message": "Building timeline…"} yield {"type": "status", "message": "Building timeline…"}
timeline = await asyncio.to_thread( try:
TimelineReconstructor().reconstruct, entries timeline = await asyncio.to_thread(
) TimelineReconstructor().reconstruct, entries
)
except Exception as exc:
logger.exception("Stage 1 (timeline) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"}
yield {"type": "done"}
return
n_clusters = len(timeline.clusters) n_clusters = len(timeline.clusters)
burst = timeline.burst_count burst = timeline.burst_count
yield { yield {
@ -66,9 +72,15 @@ async def run_pipeline(
} }
# Stage 2: Severity classification # Stage 2: Severity classification
classified = await asyncio.to_thread( try:
SeverityClassifier().classify, timeline classified = await asyncio.to_thread(
) SeverityClassifier().classify, timeline
)
except Exception as exc:
logger.exception("Stage 2 (classifier) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"}
yield {"type": "done"}
return
sev_counts: dict[str, int] = {} sev_counts: dict[str, int] = {}
for sev in classified.cluster_severities.values(): for sev in classified.cluster_severities.values():
sev_counts[sev] = sev_counts.get(sev, 0) + 1 sev_counts[sev] = sev_counts.get(sev, 0) + 1
@ -81,15 +93,21 @@ async def run_pipeline(
} }
# Stage 3: Root-cause hypotheses # Stage 3: Root-cause hypotheses
hypotheses = await asyncio.to_thread( try:
RootCauseHypothesizer().hypothesize, hypotheses = await asyncio.to_thread(
classified, RootCauseHypothesizer().hypothesize,
ctx, classified,
query, ctx,
llm_url, query,
llm_model, llm_url,
llm_api_key, llm_model,
) llm_api_key,
)
except Exception as exc:
logger.exception("Stage 3 (hypothesizer) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"}
yield {"type": "done"}
return
yield { yield {
"type": "pipeline_stage", "type": "pipeline_stage",
"stage": 3, "stage": 3,
@ -98,9 +116,15 @@ async def run_pipeline(
} }
# Stage 4: False-positive suppression # Stage 4: False-positive suppression
ranked = await asyncio.to_thread( try:
FalsePositiveSuppressor().suppress, hypotheses, db_path ranked = await asyncio.to_thread(
) FalsePositiveSuppressor().suppress, hypotheses, db_path
)
except Exception as exc:
logger.exception("Stage 4 (suppressor) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"}
yield {"type": "done"}
return
suppressed = sum(1 for rh in ranked if rh.suppress) suppressed = sum(1 for rh in ranked if rh.suppress)
active = len(ranked) - suppressed active = len(ranked) - suppressed
yield { yield {
@ -116,16 +140,22 @@ async def run_pipeline(
# Stage 5: Summary synthesis # Stage 5: Summary synthesis
yield {"type": "status", "message": "Synthesizing…"} yield {"type": "status", "message": "Synthesizing…"}
synthesis_text = await asyncio.to_thread( try:
SummarySynthesizer().synthesize, synthesis_text = await asyncio.to_thread(
ranked, SummarySynthesizer().synthesize,
timeline, ranked,
ctx, timeline,
query, ctx,
llm_url, query,
llm_model, llm_url,
llm_api_key, llm_model,
) llm_api_key,
)
except Exception as exc:
logger.exception("Stage 5 (synthesizer) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"}
yield {"type": "done"}
return
if synthesis_text: if synthesis_text:
yield {"type": "reasoning", "text": synthesis_text} yield {"type": "reasoning", "text": synthesis_text}

View file

@ -7,9 +7,8 @@ from __future__ import annotations
import logging import logging
import httpx
from app.context.retriever import RetrievedContext from app.context.retriever import RetrievedContext
from app.services.diagnose._llm_client import call_llm
from app.services.diagnose.models import RankedHypothesis, TimelineResult from app.services.diagnose.models import RankedHypothesis, TimelineResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,14 +28,6 @@ _SYSTEM_PROMPT = (
) )
def _extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response."""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str:
"""Build the hypothesis block for the prompt (non-suppressed only, top 3).""" """Build the hypothesis block for the prompt (non-suppressed only, top 3)."""
active = [rh for rh in ranked if not rh.suppress][:3] active = [rh for rh in ranked if not rh.suppress][:3]
@ -46,15 +37,10 @@ def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str:
for rh in active: for rh in active:
h = rh.hypothesis h = rh.hypothesis
conf_pct = int(h.confidence * 100) conf_pct = int(h.confidence * 100)
similar = (
f"Yes — suppressed, {rh.suppression_reason}"
if rh.suppress and rh.suppression_reason
else "No"
)
novelty = f"{rh.novelty_score:.2f}" novelty = f"{rh.novelty_score:.2f}"
lines.append( lines.append(
f"- [{h.severity}, {conf_pct}%] {h.title}\n" f"- [{h.severity}, {conf_pct}%] {h.title}\n"
f" Similar resolved incident? {similar} (novelty {novelty})" f" Novelty: {novelty}"
) )
return "\n".join(lines) return "\n".join(lines)
@ -149,62 +135,10 @@ class SummarySynthesizer:
{"role": "user", "content": user_message}, {"role": "user", "content": user_message},
] ]
result = self._call_llm( result = call_llm(
llm_url=llm_url, llm_url=llm_url,
llm_model=llm_model, llm_model=llm_model,
llm_api_key=llm_api_key, llm_api_key=llm_api_key,
messages=messages, messages=messages,
) )
return result if result else fallback return result if result else fallback
def _call_llm(
self,
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
) -> str | None:
"""Send messages to the LLM and return raw text content.
Tries the cf-orch task endpoint first, falls back to direct OpenAI-compat.
"""
headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {}
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": "log_analysis",
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=120.0,
)
if resp.status_code == 200:
return _extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.log_analysis — falling back to direct model"
)
except Exception as exc:
logger.debug(
"Task endpoint unavailable (%s) — falling back to direct model", exc
)
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=120.0,
)
resp.raise_for_status()
return _extract_content(resp.json())
except Exception as exc:
logger.warning(
"LLM synthesizer failed (%s): %s", type(exc).__name__, exc
)
return None