Merge pull request 'refactor: pipeline cleanup — 6 follow-up fixes (#33–#38)' (#40) from feat/pipeline-cleanup into main

This commit is contained in:
pyr0ball 2026-05-25 20:00:11 -07:00
commit b19bea8f2a
9 changed files with 231 additions and 164 deletions

View file

@ -26,3 +26,18 @@
# --- Periodic batch glean --- # --- Periodic batch glean ---
# Seconds between automatic glean runs from sources.yaml. Set to 0 to disable. # Seconds between automatic glean runs from sources.yaml. Set to 0 to disable.
# TURNSTONE_GLEAN_INTERVAL=900 # TURNSTONE_GLEAN_INTERVAL=900
# --- Multi-agent diagnose pipeline (experimental) ---
# Enable the 5-stage ML pipeline instead of the single-LLM summarize() call.
# TURNSTONE_MULTI_AGENT_DIAGNOSE=true
# Stage 2 — ML severity classifier (optional; falls back to pattern_tags then regex).
# Recommended: byviz/bylastic_classification_logs (~300MB, downloaded from HuggingFace)
# TURNSTONE_CLASSIFIER_MODEL=byviz/bylastic_classification_logs
# Stage 4 — Embedding backend for false-positive suppression.
# sentence_transformers: in-process local model (downloads on first use)
# ollama: uses a running Ollama instance (no download needed if model is already pulled)
# TURNSTONE_EMBED_BACKEND=sentence_transformers
# TURNSTONE_EMBED_MODEL=BAAI/bge-small-en-v1.5
# TURNSTONE_EMBED_DEVICE=cpu

View file

@ -227,7 +227,6 @@ async def diagnose_stream(
yield {"type": "status", "message": "Loading environment context…"} yield {"type": "status", "message": "Loading environment context…"}
ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query)) ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query))
context_block = format_context_block(ctx)
yield { yield {
"type": "context", "type": "context",
"facts": ctx.facts, "facts": ctx.facts,
@ -320,6 +319,8 @@ async def diagnose_stream(
return # pipeline emits its own "done" event return # pipeline emits its own "done" event
if llm_url and llm_model and combined: if llm_url and llm_model and combined:
# Only compute context_block in the legacy path — pipeline uses ctx directly.
context_block = format_context_block(ctx)
yield {"type": "status", "message": "Analyzing with LLM…"} yield {"type": "status", "message": "Analyzing with LLM…"}
reasoning = await asyncio.to_thread( reasoning = await asyncio.to_thread(
lambda: summarize( lambda: summarize(

View file

@ -0,0 +1,116 @@
"""Shared LLM client for the multi-agent diagnose pipeline.
Both Stage 3 (RootCauseHypothesizer) and Stage 5 (SummarySynthesizer) send
messages to the same LLM backend using the same two-step pattern:
1. Try the cf-orch task endpoint product-scoped inference routing.
2. Fall back to OpenAI-compat direct model call by name.
Centralising here means changes to auth headers, timeouts, retry logic, or
cf-orch payload structure only need to be made once.
"""
from __future__ import annotations
import logging
import re
import httpx
logger = logging.getLogger(__name__)
# Regex that strips ```json … ``` or ``` … ``` fences from LLM output.
_JSON_FENCE_RE = re.compile(
r"^```(?:json)?\s*|\s*```$",
re.MULTILINE,
)
def extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response.
Returns None when the response has no choices or empty content.
"""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def strip_json_fences(raw: str) -> str:
"""Remove markdown code fences that some LLMs wrap around JSON output.
Example: '```json\\n[...]\\n```' '[...]'
"""
return _JSON_FENCE_RE.sub("", raw).strip()
def call_llm(
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
task_name: str = "log_analysis",
timeout: float = 120.0,
) -> str | None:
"""Send messages to the LLM; return raw text or None on failure.
Tries the cf-orch task endpoint first (product-routed inference).
Falls back to a direct OpenAI-compat ``/v1/chat/completions`` call when:
- The task endpoint returns 404 (no assignment for this task).
- The task endpoint is unreachable (connection error, timeout, etc.).
Args:
llm_url: Base URL of the LLM backend (e.g. ``http://<YOUR_HOST_IP>:7700``).
llm_model: Model identifier used in the OpenAI-compat fallback call.
llm_api_key: Optional bearer token for authenticated endpoints.
messages: OpenAI-style message list (system + user turns).
task_name: cf-orch task name for product-routed inference (default: ``log_analysis``).
timeout: Request timeout in seconds (default: 120).
Returns:
Raw text content string, or None if both paths fail.
"""
headers: dict[str, str] = {}
if llm_api_key:
headers["Authorization"] = f"Bearer {llm_api_key}"
# --- Path 1: cf-orch task endpoint ---
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": task_name,
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=timeout,
)
if resp.status_code == 200:
return extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.%s — falling back to direct model",
task_name,
)
except Exception as exc: # noqa: BLE001
# Broad catch is intentional: captures network errors, timeouts, and
# any backend-specific exceptions so the pipeline can fall back.
logger.debug(
"Task endpoint unavailable (%s) — falling back to direct model", exc
)
# --- Path 2: OpenAI-compat fallback ---
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=timeout,
)
resp.raise_for_status()
return extract_content(resp.json())
except Exception as exc: # noqa: BLE001
logger.warning("LLM call failed (%s): %s", type(exc).__name__, exc)
return None

View file

@ -19,6 +19,8 @@ import os
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from types import MappingProxyType
from app.services.diagnose.models import ( from app.services.diagnose.models import (
ClassifiedTimeline, ClassifiedTimeline,
EventCluster, EventCluster,
@ -243,7 +245,7 @@ class SeverityClassifier:
return ClassifiedTimeline( return ClassifiedTimeline(
timeline=timeline, timeline=timeline,
cluster_severities=cluster_severities, cluster_severities=MappingProxyType(cluster_severities),
classifier_used=classifier_used, # type: ignore[arg-type] classifier_used=classifier_used, # type: ignore[arg-type]
model_id=self._model_id if ml_available else None, model_id=self._model_id if ml_available else None,
) )

View file

@ -5,9 +5,8 @@ import json
import logging import logging
from uuid import uuid4 from uuid import uuid4
import httpx
from app.context.retriever import RetrievedContext from app.context.retriever import RetrievedContext
from app.services.diagnose._llm_client import call_llm, strip_json_fences
from app.services.diagnose.models import ( from app.services.diagnose.models import (
ClassifiedTimeline, ClassifiedTimeline,
EventCluster, EventCluster,
@ -60,14 +59,6 @@ def _cluster_summary(cluster: EventCluster, severity: str) -> str:
return summary return summary
def _extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response."""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
class RootCauseHypothesizer: class RootCauseHypothesizer:
"""Generate ranked root-cause hypotheses from a classified log timeline.""" """Generate ranked root-cause hypotheses from a classified log timeline."""
@ -120,7 +111,7 @@ class RootCauseHypothesizer:
{"role": "user", "content": user_message}, {"role": "user", "content": user_message},
] ]
raw_response = self._call_llm( raw_response = call_llm(
llm_url=llm_url, llm_url=llm_url,
llm_model=llm_model, llm_model=llm_model,
llm_api_key=llm_api_key, llm_api_key=llm_api_key,
@ -131,59 +122,14 @@ class RootCauseHypothesizer:
return self._parse_response(raw_response) return self._parse_response(raw_response)
def _call_llm(
self,
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
) -> str | None:
"""Send messages to the LLM and return raw text content."""
headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {}
# Try cf-orch task-based endpoint first.
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": "log_analysis",
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=120.0,
)
if resp.status_code == 200:
return _extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.log_analysis — falling back to direct model"
)
except Exception as exc:
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
# Fallback: OpenAI-compat endpoint with explicit model name.
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=120.0,
)
resp.raise_for_status()
return _extract_content(resp.json())
except Exception as exc:
logger.warning(
"LLM hypothesizer failed (%s): %s", type(exc).__name__, exc
)
return None
def _parse_response(self, raw: str) -> list[Hypothesis]: def _parse_response(self, raw: str) -> list[Hypothesis]:
"""Parse the LLM JSON response into a list of Hypothesis objects.""" """Parse the LLM JSON response into a list of Hypothesis objects.
Strips markdown code fences before parsing some LLMs wrap JSON in
triple-backtick fences despite being instructed not to.
"""
try: try:
data = json.loads(raw.strip()) data = json.loads(strip_json_fences(raw))
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning( logger.warning(
"Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw
@ -207,7 +153,9 @@ class RootCauseHypothesizer:
title=str(item.get("title", "Unknown"))[:80], title=str(item.get("title", "Unknown"))[:80],
description=str(item.get("description", "")), description=str(item.get("description", "")),
confidence=_coerce_float(item.get("confidence"), 0.5), confidence=_coerce_float(item.get("confidence"), 0.5),
supporting_cluster_ids=tuple(item.get("supporting_clusters") or []), supporting_cluster_ids=tuple(
str(x) for x in (item.get("supporting_clusters") or [])
),
runbook_refs=(), runbook_refs=(),
severity=severity, severity=severity,
) )

View file

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from types import MappingProxyType
from typing import Literal from typing import Literal
SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"]
@ -40,10 +41,14 @@ class TimelineResult:
@dataclass(frozen=True) @dataclass(frozen=True)
class ClassifiedTimeline: class ClassifiedTimeline:
"""Timeline annotated with ML-assigned severity per cluster.""" """Timeline annotated with ML-assigned severity per cluster.
``cluster_severities`` is a ``MappingProxyType`` so the mapping is
fully immutable consistent with the ``frozen=True`` intent.
"""
timeline: TimelineResult timeline: TimelineResult
cluster_severities: dict[str, SeverityLabel] cluster_severities: MappingProxyType # MappingProxyType[str, SeverityLabel]
classifier_used: Literal["ml", "pattern_tags", "regex"] classifier_used: Literal["ml", "pattern_tags", "regex"]
model_id: str | None model_id: str | None

View file

@ -5,10 +5,17 @@ from __future__ import annotations
import asyncio import asyncio
import dataclasses import dataclasses
import logging import logging
import os
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
# Optional ML classifier model for Stage 2.
# When empty (default), Stage 2 falls back to pattern_tags then regex.
# Set TURNSTONE_CLASSIFIER_MODEL to a HuggingFace model ID to enable ML classification.
# Recommended: byviz/bylastic_classification_logs (DistilBERT, ~300MB)
_CLASSIFIER_MODEL: str = os.environ.get("TURNSTONE_CLASSIFIER_MODEL", "")
from app.context.retriever import RetrievedContext from app.context.retriever import RetrievedContext
from app.services.diagnose.classifier import SeverityClassifier from app.services.diagnose.classifier import SeverityClassifier
from app.services.diagnose.hypothesizer import RootCauseHypothesizer from app.services.diagnose.hypothesizer import RootCauseHypothesizer
@ -53,9 +60,15 @@ async def run_pipeline(
""" """
# Stage 1: Timeline reconstruction # Stage 1: Timeline reconstruction
yield {"type": "status", "message": "Building timeline…"} yield {"type": "status", "message": "Building timeline…"}
try:
timeline = await asyncio.to_thread( timeline = await asyncio.to_thread(
TimelineReconstructor().reconstruct, entries TimelineReconstructor().reconstruct, entries
) )
except Exception as exc:
logger.exception("Stage 1 (timeline) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"}
yield {"type": "done"}
return
n_clusters = len(timeline.clusters) n_clusters = len(timeline.clusters)
burst = timeline.burst_count burst = timeline.burst_count
yield { yield {
@ -66,9 +79,15 @@ async def run_pipeline(
} }
# Stage 2: Severity classification # Stage 2: Severity classification
try:
classified = await asyncio.to_thread( classified = await asyncio.to_thread(
SeverityClassifier().classify, timeline SeverityClassifier(model_id=_CLASSIFIER_MODEL).classify, timeline
) )
except Exception as exc:
logger.exception("Stage 2 (classifier) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"}
yield {"type": "done"}
return
sev_counts: dict[str, int] = {} sev_counts: dict[str, int] = {}
for sev in classified.cluster_severities.values(): for sev in classified.cluster_severities.values():
sev_counts[sev] = sev_counts.get(sev, 0) + 1 sev_counts[sev] = sev_counts.get(sev, 0) + 1
@ -81,6 +100,7 @@ async def run_pipeline(
} }
# Stage 3: Root-cause hypotheses # Stage 3: Root-cause hypotheses
try:
hypotheses = await asyncio.to_thread( hypotheses = await asyncio.to_thread(
RootCauseHypothesizer().hypothesize, RootCauseHypothesizer().hypothesize,
classified, classified,
@ -90,6 +110,11 @@ async def run_pipeline(
llm_model, llm_model,
llm_api_key, llm_api_key,
) )
except Exception as exc:
logger.exception("Stage 3 (hypothesizer) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"}
yield {"type": "done"}
return
yield { yield {
"type": "pipeline_stage", "type": "pipeline_stage",
"stage": 3, "stage": 3,
@ -98,9 +123,15 @@ async def run_pipeline(
} }
# Stage 4: False-positive suppression # Stage 4: False-positive suppression
try:
ranked = await asyncio.to_thread( ranked = await asyncio.to_thread(
FalsePositiveSuppressor().suppress, hypotheses, db_path FalsePositiveSuppressor().suppress, hypotheses, db_path
) )
except Exception as exc:
logger.exception("Stage 4 (suppressor) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"}
yield {"type": "done"}
return
suppressed = sum(1 for rh in ranked if rh.suppress) suppressed = sum(1 for rh in ranked if rh.suppress)
active = len(ranked) - suppressed active = len(ranked) - suppressed
yield { yield {
@ -116,6 +147,7 @@ async def run_pipeline(
# Stage 5: Summary synthesis # Stage 5: Summary synthesis
yield {"type": "status", "message": "Synthesizing…"} yield {"type": "status", "message": "Synthesizing…"}
try:
synthesis_text = await asyncio.to_thread( synthesis_text = await asyncio.to_thread(
SummarySynthesizer().synthesize, SummarySynthesizer().synthesize,
ranked, ranked,
@ -126,6 +158,11 @@ async def run_pipeline(
llm_model, llm_model,
llm_api_key, llm_api_key,
) )
except Exception as exc:
logger.exception("Stage 5 (synthesizer) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"}
yield {"type": "done"}
return
if synthesis_text: if synthesis_text:
yield {"type": "reasoning", "text": synthesis_text} yield {"type": "reasoning", "text": synthesis_text}

View file

@ -7,9 +7,8 @@ from __future__ import annotations
import logging import logging
import httpx
from app.context.retriever import RetrievedContext from app.context.retriever import RetrievedContext
from app.services.diagnose._llm_client import call_llm
from app.services.diagnose.models import RankedHypothesis, TimelineResult from app.services.diagnose.models import RankedHypothesis, TimelineResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,14 +28,6 @@ _SYSTEM_PROMPT = (
) )
def _extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response."""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str:
"""Build the hypothesis block for the prompt (non-suppressed only, top 3).""" """Build the hypothesis block for the prompt (non-suppressed only, top 3)."""
active = [rh for rh in ranked if not rh.suppress][:3] active = [rh for rh in ranked if not rh.suppress][:3]
@ -46,15 +37,10 @@ def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str:
for rh in active: for rh in active:
h = rh.hypothesis h = rh.hypothesis
conf_pct = int(h.confidence * 100) conf_pct = int(h.confidence * 100)
similar = (
f"Yes — suppressed, {rh.suppression_reason}"
if rh.suppress and rh.suppression_reason
else "No"
)
novelty = f"{rh.novelty_score:.2f}" novelty = f"{rh.novelty_score:.2f}"
lines.append( lines.append(
f"- [{h.severity}, {conf_pct}%] {h.title}\n" f"- [{h.severity}, {conf_pct}%] {h.title}\n"
f" Similar resolved incident? {similar} (novelty {novelty})" f" Novelty: {novelty}"
) )
return "\n".join(lines) return "\n".join(lines)
@ -149,62 +135,10 @@ class SummarySynthesizer:
{"role": "user", "content": user_message}, {"role": "user", "content": user_message},
] ]
result = self._call_llm( result = call_llm(
llm_url=llm_url, llm_url=llm_url,
llm_model=llm_model, llm_model=llm_model,
llm_api_key=llm_api_key, llm_api_key=llm_api_key,
messages=messages, messages=messages,
) )
return result if result else fallback return result if result else fallback
def _call_llm(
self,
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
) -> str | None:
"""Send messages to the LLM and return raw text content.
Tries the cf-orch task endpoint first, falls back to direct OpenAI-compat.
"""
headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {}
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": "log_analysis",
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=120.0,
)
if resp.status_code == 200:
return _extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.log_analysis — falling back to direct model"
)
except Exception as exc:
logger.debug(
"Task endpoint unavailable (%s) — falling back to direct model", exc
)
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=120.0,
)
resp.raise_for_status()
return _extract_content(resp.json())
except Exception as exc:
logger.warning(
"LLM synthesizer failed (%s): %s", type(exc).__name__, exc
)
return None

View file

@ -38,6 +38,15 @@ PATTERN_DIR="${TURNSTONE_PATTERNS:-$([[ -d /devl/turnstone-cluster/patterns ]] &
CONDA_BASE="${CONDA_BASE:-/devl/miniconda3}" CONDA_BASE="${CONDA_BASE:-/devl/miniconda3}"
PYTHON="${CONDA_BASE}/envs/cf/bin/python" PYTHON="${CONDA_BASE}/envs/cf/bin/python"
# Source .env if present — loads TURNSTONE_MULTI_AGENT_DIAGNOSE, GPU_SERVER_URL, etc.
# Variables already set in the environment take precedence (set -a / set +a scoping).
if [[ -f "${SCRIPT_DIR}/.env" ]]; then
set -a
# shellcheck source=/dev/null
source "${SCRIPT_DIR}/.env"
set +a
fi
# ── Helpers ─────────────────────────────────────────────────────────────────── # ── Helpers ───────────────────────────────────────────────────────────────────
_is_alive() { _is_alive() {