From 85e7a70536d80014da9fcf5c263d23736f029f1c Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 19:05:56 -0700 Subject: [PATCH 1/3] =?UTF-8?q?refactor:=20pipeline=20cleanup=20=E2=80=94?= =?UTF-8?q?=206=20follow-up=20fixes=20(#33-#38)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - #33: Wrap ClassifiedTimeline.cluster_severities in MappingProxyType for true immutability (frozen=True only blocks field reassignment, not dict mutation). - #34: Remove dead suppression branch in synthesizer._build_hypothesis_block. active[] is already filtered to not rh.suppress, so the 'Yes — suppressed' branch was unreachable. Now shows novelty score only. - #35: Extract shared _llm_client.py with call_llm() + extract_content() + strip_json_fences(). Both RootCauseHypothesizer and SummarySynthesizer now import from one source. Also strips JSON fences from LLM output before parsing in hypothesizer._parse_response. - #36: Add per-stage try/except in pipeline.run_pipeline(). Unhandled stage exceptions now emit {type: 'error'} + {type: 'done'} SSE events instead of silently closing the stream. - #37: Move format_context_block() call inside the legacy LLM branch in diagnose/__init__.py — it was being computed unconditionally but only used in the non-pipeline path. - #38: Coerce supporting_cluster_ids items to str() in hypothesizer _parse_response to guard against LLMs returning integers instead of string cluster IDs. --- app/services/diagnose/__init__.py | 3 +- app/services/diagnose/_llm_client.py | 116 ++++++++++++++++++++++++++ app/services/diagnose/classifier.py | 4 +- app/services/diagnose/hypothesizer.py | 74 +++------------- app/services/diagnose/models.py | 9 +- app/services/diagnose/pipeline.py | 86 ++++++++++++------- app/services/diagnose/synthesizer.py | 72 +--------------- 7 files changed, 200 insertions(+), 164 deletions(-) create mode 100644 app/services/diagnose/_llm_client.py diff --git a/app/services/diagnose/__init__.py b/app/services/diagnose/__init__.py index 51613cf..80d8a0f 100644 --- a/app/services/diagnose/__init__.py +++ b/app/services/diagnose/__init__.py @@ -227,7 +227,6 @@ async def diagnose_stream( yield {"type": "status", "message": "Loading environment context…"} ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query)) - context_block = format_context_block(ctx) yield { "type": "context", "facts": ctx.facts, @@ -320,6 +319,8 @@ async def diagnose_stream( return # pipeline emits its own "done" event if llm_url and llm_model and combined: + # Only compute context_block in the legacy path — pipeline uses ctx directly. + context_block = format_context_block(ctx) yield {"type": "status", "message": "Analyzing with LLM…"} reasoning = await asyncio.to_thread( lambda: summarize( diff --git a/app/services/diagnose/_llm_client.py b/app/services/diagnose/_llm_client.py new file mode 100644 index 0000000..b2fad32 --- /dev/null +++ b/app/services/diagnose/_llm_client.py @@ -0,0 +1,116 @@ +"""Shared LLM client for the multi-agent diagnose pipeline. + +Both Stage 3 (RootCauseHypothesizer) and Stage 5 (SummarySynthesizer) send +messages to the same LLM backend using the same two-step pattern: + 1. Try the cf-orch task endpoint → product-scoped inference routing. + 2. Fall back to OpenAI-compat → direct model call by name. + +Centralising here means changes to auth headers, timeouts, retry logic, or +cf-orch payload structure only need to be made once. +""" +from __future__ import annotations + +import logging +import re + +import httpx + +logger = logging.getLogger(__name__) + +# Regex that strips ```json … ``` or ``` … ``` fences from LLM output. +_JSON_FENCE_RE = re.compile( + r"^```(?:json)?\s*|\s*```$", + re.MULTILINE, +) + + +def extract_content(resp_json: dict) -> str | None: + """Pull text content from an OpenAI-compat chat completion response. + + Returns None when the response has no choices or empty content. + """ + choices = resp_json.get("choices") or [] + if not choices: + return None + return (choices[0].get("message", {}).get("content") or "").strip() or None + + +def strip_json_fences(raw: str) -> str: + """Remove markdown code fences that some LLMs wrap around JSON output. + + Example: '```json\\n[...]\\n```' → '[...]' + """ + return _JSON_FENCE_RE.sub("", raw).strip() + + +def call_llm( + llm_url: str, + llm_model: str, + llm_api_key: str | None, + messages: list[dict], + task_name: str = "log_analysis", + timeout: float = 120.0, +) -> str | None: + """Send messages to the LLM; return raw text or None on failure. + + Tries the cf-orch task endpoint first (product-routed inference). + Falls back to a direct OpenAI-compat ``/v1/chat/completions`` call when: + - The task endpoint returns 404 (no assignment for this task). + - The task endpoint is unreachable (connection error, timeout, etc.). + + Args: + llm_url: Base URL of the LLM backend (e.g. ``http://:7700``). + llm_model: Model identifier used in the OpenAI-compat fallback call. + llm_api_key: Optional bearer token for authenticated endpoints. + messages: OpenAI-style message list (system + user turns). + task_name: cf-orch task name for product-routed inference (default: ``log_analysis``). + timeout: Request timeout in seconds (default: 120). + + Returns: + Raw text content string, or None if both paths fail. + """ + headers: dict[str, str] = {} + if llm_api_key: + headers["Authorization"] = f"Bearer {llm_api_key}" + + # --- Path 1: cf-orch task endpoint --- + task_url = f"{llm_url.rstrip('/')}/api/inference/task" + try: + resp = httpx.post( + task_url, + json={ + "product": "turnstone", + "task": task_name, + "payload": {"messages": messages, "stream": False}, + }, + headers=headers, + timeout=timeout, + ) + if resp.status_code == 200: + return extract_content(resp.json()) + if resp.status_code != 404: + resp.raise_for_status() + logger.debug( + "No task assignment for turnstone.%s — falling back to direct model", + task_name, + ) + except Exception as exc: # noqa: BLE001 + # Broad catch is intentional: captures network errors, timeouts, and + # any backend-specific exceptions so the pipeline can fall back. + logger.debug( + "Task endpoint unavailable (%s) — falling back to direct model", exc + ) + + # --- Path 2: OpenAI-compat fallback --- + try: + resp = httpx.post( + f"{llm_url.rstrip('/')}/v1/chat/completions", + json={"model": llm_model, "messages": messages, "stream": False}, + headers=headers, + timeout=timeout, + ) + resp.raise_for_status() + return extract_content(resp.json()) + except Exception as exc: # noqa: BLE001 + logger.warning("LLM call failed (%s): %s", type(exc).__name__, exc) + return None diff --git a/app/services/diagnose/classifier.py b/app/services/diagnose/classifier.py index b7aa8ed..b340c39 100644 --- a/app/services/diagnose/classifier.py +++ b/app/services/diagnose/classifier.py @@ -19,6 +19,8 @@ import os from pathlib import Path from typing import Any +from types import MappingProxyType + from app.services.diagnose.models import ( ClassifiedTimeline, EventCluster, @@ -243,7 +245,7 @@ class SeverityClassifier: return ClassifiedTimeline( timeline=timeline, - cluster_severities=cluster_severities, + cluster_severities=MappingProxyType(cluster_severities), classifier_used=classifier_used, # type: ignore[arg-type] model_id=self._model_id if ml_available else None, ) diff --git a/app/services/diagnose/hypothesizer.py b/app/services/diagnose/hypothesizer.py index 7c5c3e6..827332f 100644 --- a/app/services/diagnose/hypothesizer.py +++ b/app/services/diagnose/hypothesizer.py @@ -5,9 +5,8 @@ import json import logging from uuid import uuid4 -import httpx - from app.context.retriever import RetrievedContext +from app.services.diagnose._llm_client import call_llm, strip_json_fences from app.services.diagnose.models import ( ClassifiedTimeline, EventCluster, @@ -60,14 +59,6 @@ def _cluster_summary(cluster: EventCluster, severity: str) -> str: return summary -def _extract_content(resp_json: dict) -> str | None: - """Pull text content from an OpenAI-compat chat completion response.""" - choices = resp_json.get("choices") or [] - if not choices: - return None - return (choices[0].get("message", {}).get("content") or "").strip() or None - - class RootCauseHypothesizer: """Generate ranked root-cause hypotheses from a classified log timeline.""" @@ -120,7 +111,7 @@ class RootCauseHypothesizer: {"role": "user", "content": user_message}, ] - raw_response = self._call_llm( + raw_response = call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, @@ -131,59 +122,14 @@ class RootCauseHypothesizer: return self._parse_response(raw_response) - def _call_llm( - self, - llm_url: str, - llm_model: str, - llm_api_key: str | None, - messages: list[dict], - ) -> str | None: - """Send messages to the LLM and return raw text content.""" - headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} - - # Try cf-orch task-based endpoint first. - task_url = f"{llm_url.rstrip('/')}/api/inference/task" - try: - resp = httpx.post( - task_url, - json={ - "product": "turnstone", - "task": "log_analysis", - "payload": {"messages": messages, "stream": False}, - }, - headers=headers, - timeout=120.0, - ) - if resp.status_code == 200: - return _extract_content(resp.json()) - if resp.status_code != 404: - resp.raise_for_status() - logger.debug( - "No task assignment for turnstone.log_analysis — falling back to direct model" - ) - except Exception as exc: - logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) - - # Fallback: OpenAI-compat endpoint with explicit model name. - try: - resp = httpx.post( - f"{llm_url.rstrip('/')}/v1/chat/completions", - json={"model": llm_model, "messages": messages, "stream": False}, - headers=headers, - timeout=120.0, - ) - resp.raise_for_status() - return _extract_content(resp.json()) - except Exception as exc: - logger.warning( - "LLM hypothesizer failed (%s): %s", type(exc).__name__, exc - ) - return None - def _parse_response(self, raw: str) -> list[Hypothesis]: - """Parse the LLM JSON response into a list of Hypothesis objects.""" + """Parse the LLM JSON response into a list of Hypothesis objects. + + Strips markdown code fences before parsing — some LLMs wrap JSON in + triple-backtick fences despite being instructed not to. + """ try: - data = json.loads(raw.strip()) + data = json.loads(strip_json_fences(raw)) except json.JSONDecodeError: logger.warning( "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw @@ -207,7 +153,9 @@ class RootCauseHypothesizer: title=str(item.get("title", "Unknown"))[:80], description=str(item.get("description", "")), confidence=_coerce_float(item.get("confidence"), 0.5), - supporting_cluster_ids=tuple(item.get("supporting_clusters") or []), + supporting_cluster_ids=tuple( + str(x) for x in (item.get("supporting_clusters") or []) + ), runbook_refs=(), severity=severity, ) diff --git a/app/services/diagnose/models.py b/app/services/diagnose/models.py index 2831d30..ed454d1 100644 --- a/app/services/diagnose/models.py +++ b/app/services/diagnose/models.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass +from types import MappingProxyType from typing import Literal SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] @@ -40,10 +41,14 @@ class TimelineResult: @dataclass(frozen=True) class ClassifiedTimeline: - """Timeline annotated with ML-assigned severity per cluster.""" + """Timeline annotated with ML-assigned severity per cluster. + + ``cluster_severities`` is a ``MappingProxyType`` so the mapping is + fully immutable — consistent with the ``frozen=True`` intent. + """ timeline: TimelineResult - cluster_severities: dict[str, SeverityLabel] + cluster_severities: MappingProxyType # MappingProxyType[str, SeverityLabel] classifier_used: Literal["ml", "pattern_tags", "regex"] model_id: str | None diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py index f902610..a60952f 100644 --- a/app/services/diagnose/pipeline.py +++ b/app/services/diagnose/pipeline.py @@ -53,9 +53,15 @@ async def run_pipeline( """ # Stage 1: Timeline reconstruction yield {"type": "status", "message": "Building timeline…"} - timeline = await asyncio.to_thread( - TimelineReconstructor().reconstruct, entries - ) + try: + timeline = await asyncio.to_thread( + TimelineReconstructor().reconstruct, entries + ) + except Exception as exc: + logger.exception("Stage 1 (timeline) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"} + yield {"type": "done"} + return n_clusters = len(timeline.clusters) burst = timeline.burst_count yield { @@ -66,9 +72,15 @@ async def run_pipeline( } # Stage 2: Severity classification - classified = await asyncio.to_thread( - SeverityClassifier().classify, timeline - ) + try: + classified = await asyncio.to_thread( + SeverityClassifier().classify, timeline + ) + except Exception as exc: + logger.exception("Stage 2 (classifier) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"} + yield {"type": "done"} + return sev_counts: dict[str, int] = {} for sev in classified.cluster_severities.values(): sev_counts[sev] = sev_counts.get(sev, 0) + 1 @@ -81,15 +93,21 @@ async def run_pipeline( } # Stage 3: Root-cause hypotheses - hypotheses = await asyncio.to_thread( - RootCauseHypothesizer().hypothesize, - classified, - ctx, - query, - llm_url, - llm_model, - llm_api_key, - ) + try: + hypotheses = await asyncio.to_thread( + RootCauseHypothesizer().hypothesize, + classified, + ctx, + query, + llm_url, + llm_model, + llm_api_key, + ) + except Exception as exc: + logger.exception("Stage 3 (hypothesizer) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"} + yield {"type": "done"} + return yield { "type": "pipeline_stage", "stage": 3, @@ -98,9 +116,15 @@ async def run_pipeline( } # Stage 4: False-positive suppression - ranked = await asyncio.to_thread( - FalsePositiveSuppressor().suppress, hypotheses, db_path - ) + try: + ranked = await asyncio.to_thread( + FalsePositiveSuppressor().suppress, hypotheses, db_path + ) + except Exception as exc: + logger.exception("Stage 4 (suppressor) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"} + yield {"type": "done"} + return suppressed = sum(1 for rh in ranked if rh.suppress) active = len(ranked) - suppressed yield { @@ -116,16 +140,22 @@ async def run_pipeline( # Stage 5: Summary synthesis yield {"type": "status", "message": "Synthesizing…"} - synthesis_text = await asyncio.to_thread( - SummarySynthesizer().synthesize, - ranked, - timeline, - ctx, - query, - llm_url, - llm_model, - llm_api_key, - ) + try: + synthesis_text = await asyncio.to_thread( + SummarySynthesizer().synthesize, + ranked, + timeline, + ctx, + query, + llm_url, + llm_model, + llm_api_key, + ) + except Exception as exc: + logger.exception("Stage 5 (synthesizer) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"} + yield {"type": "done"} + return if synthesis_text: yield {"type": "reasoning", "text": synthesis_text} diff --git a/app/services/diagnose/synthesizer.py b/app/services/diagnose/synthesizer.py index ce07625..edbf29a 100644 --- a/app/services/diagnose/synthesizer.py +++ b/app/services/diagnose/synthesizer.py @@ -7,9 +7,8 @@ from __future__ import annotations import logging -import httpx - from app.context.retriever import RetrievedContext +from app.services.diagnose._llm_client import call_llm from app.services.diagnose.models import RankedHypothesis, TimelineResult logger = logging.getLogger(__name__) @@ -29,14 +28,6 @@ _SYSTEM_PROMPT = ( ) -def _extract_content(resp_json: dict) -> str | None: - """Pull text content from an OpenAI-compat chat completion response.""" - choices = resp_json.get("choices") or [] - if not choices: - return None - return (choices[0].get("message", {}).get("content") or "").strip() or None - - def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: """Build the hypothesis block for the prompt (non-suppressed only, top 3).""" active = [rh for rh in ranked if not rh.suppress][:3] @@ -46,15 +37,10 @@ def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: for rh in active: h = rh.hypothesis conf_pct = int(h.confidence * 100) - similar = ( - f"Yes — suppressed, {rh.suppression_reason}" - if rh.suppress and rh.suppression_reason - else "No" - ) novelty = f"{rh.novelty_score:.2f}" lines.append( f"- [{h.severity}, {conf_pct}%] {h.title}\n" - f" Similar resolved incident? {similar} (novelty {novelty})" + f" Novelty: {novelty}" ) return "\n".join(lines) @@ -149,62 +135,10 @@ class SummarySynthesizer: {"role": "user", "content": user_message}, ] - result = self._call_llm( + result = call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, messages=messages, ) return result if result else fallback - - def _call_llm( - self, - llm_url: str, - llm_model: str, - llm_api_key: str | None, - messages: list[dict], - ) -> str | None: - """Send messages to the LLM and return raw text content. - - Tries the cf-orch task endpoint first, falls back to direct OpenAI-compat. - """ - headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} - - task_url = f"{llm_url.rstrip('/')}/api/inference/task" - try: - resp = httpx.post( - task_url, - json={ - "product": "turnstone", - "task": "log_analysis", - "payload": {"messages": messages, "stream": False}, - }, - headers=headers, - timeout=120.0, - ) - if resp.status_code == 200: - return _extract_content(resp.json()) - if resp.status_code != 404: - resp.raise_for_status() - logger.debug( - "No task assignment for turnstone.log_analysis — falling back to direct model" - ) - except Exception as exc: - logger.debug( - "Task endpoint unavailable (%s) — falling back to direct model", exc - ) - - try: - resp = httpx.post( - f"{llm_url.rstrip('/')}/v1/chat/completions", - json={"model": llm_model, "messages": messages, "stream": False}, - headers=headers, - timeout=120.0, - ) - resp.raise_for_status() - return _extract_content(resp.json()) - except Exception as exc: - logger.warning( - "LLM synthesizer failed (%s): %s", type(exc).__name__, exc - ) - return None From 2375e073bada1334f753de1885b6ed1fb1aed4f9 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 19:11:32 -0700 Subject: [PATCH 2/3] feat(pipeline): add TURNSTONE_CLASSIFIER_MODEL env var for Stage 2 ML config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Makes the HuggingFace classifier model for Stage 2 configurable via TURNSTONE_CLASSIFIER_MODEL. When unset (default), Stage 2 falls back to pattern_tags then regex — no download required on first run. Also documents TURNSTONE_MULTI_AGENT_DIAGNOSE, TURNSTONE_CLASSIFIER_MODEL, TURNSTONE_EMBED_BACKEND/MODEL/DEVICE in .env.example. --- .env.example | 15 +++++++++++++++ app/services/diagnose/pipeline.py | 9 ++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/.env.example b/.env.example index c6a152c..5199a4d 100644 --- a/.env.example +++ b/.env.example @@ -26,3 +26,18 @@ # --- Periodic batch glean --- # Seconds between automatic glean runs from sources.yaml. Set to 0 to disable. # TURNSTONE_GLEAN_INTERVAL=900 + +# --- Multi-agent diagnose pipeline (experimental) --- +# Enable the 5-stage ML pipeline instead of the single-LLM summarize() call. +# TURNSTONE_MULTI_AGENT_DIAGNOSE=true + +# Stage 2 — ML severity classifier (optional; falls back to pattern_tags then regex). +# Recommended: byviz/bylastic_classification_logs (~300MB, downloaded from HuggingFace) +# TURNSTONE_CLASSIFIER_MODEL=byviz/bylastic_classification_logs + +# Stage 4 — Embedding backend for false-positive suppression. +# sentence_transformers: in-process local model (downloads on first use) +# ollama: uses a running Ollama instance (no download needed if model is already pulled) +# TURNSTONE_EMBED_BACKEND=sentence_transformers +# TURNSTONE_EMBED_MODEL=BAAI/bge-small-en-v1.5 +# TURNSTONE_EMBED_DEVICE=cpu diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py index a60952f..6539b8f 100644 --- a/app/services/diagnose/pipeline.py +++ b/app/services/diagnose/pipeline.py @@ -5,10 +5,17 @@ from __future__ import annotations import asyncio import dataclasses import logging +import os from collections.abc import AsyncGenerator from pathlib import Path from typing import Any +# Optional ML classifier model for Stage 2. +# When empty (default), Stage 2 falls back to pattern_tags then regex. +# Set TURNSTONE_CLASSIFIER_MODEL to a HuggingFace model ID to enable ML classification. +# Recommended: byviz/bylastic_classification_logs (DistilBERT, ~300MB) +_CLASSIFIER_MODEL: str = os.environ.get("TURNSTONE_CLASSIFIER_MODEL", "") + from app.context.retriever import RetrievedContext from app.services.diagnose.classifier import SeverityClassifier from app.services.diagnose.hypothesizer import RootCauseHypothesizer @@ -74,7 +81,7 @@ async def run_pipeline( # Stage 2: Severity classification try: classified = await asyncio.to_thread( - SeverityClassifier().classify, timeline + SeverityClassifier(model_id=_CLASSIFIER_MODEL).classify, timeline ) except Exception as exc: logger.exception("Stage 2 (classifier) failed: %s", exc) From 39ef1320b00f58d62d666ccf3f6561fa382389fc Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 19:15:33 -0700 Subject: [PATCH 3/3] feat(manage): source .env before starting uvicorn Enables TURNSTONE_MULTI_AGENT_DIAGNOSE and other env vars set in .env to reach the running process without manual export. Variables already set in the caller's environment take precedence. --- manage.sh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/manage.sh b/manage.sh index 19a5b96..ec74fe4 100755 --- a/manage.sh +++ b/manage.sh @@ -38,6 +38,15 @@ PATTERN_DIR="${TURNSTONE_PATTERNS:-$([[ -d /devl/turnstone-cluster/patterns ]] & CONDA_BASE="${CONDA_BASE:-/devl/miniconda3}" PYTHON="${CONDA_BASE}/envs/cf/bin/python" +# Source .env if present — loads TURNSTONE_MULTI_AGENT_DIAGNOSE, GPU_SERVER_URL, etc. +# Variables already set in the environment take precedence (set -a / set +a scoping). +if [[ -f "${SCRIPT_DIR}/.env" ]]; then + set -a + # shellcheck source=/dev/null + source "${SCRIPT_DIR}/.env" + set +a +fi + # ── Helpers ─────────────────────────────────────────────────────────────────── _is_alive() {