diff --git a/app/services/diagnose/__init__.py b/app/services/diagnose/__init__.py index 51613cf..80d8a0f 100644 --- a/app/services/diagnose/__init__.py +++ b/app/services/diagnose/__init__.py @@ -227,7 +227,6 @@ async def diagnose_stream( yield {"type": "status", "message": "Loading environment context…"} ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query)) - context_block = format_context_block(ctx) yield { "type": "context", "facts": ctx.facts, @@ -320,6 +319,8 @@ async def diagnose_stream( return # pipeline emits its own "done" event if llm_url and llm_model and combined: + # Only compute context_block in the legacy path — pipeline uses ctx directly. + context_block = format_context_block(ctx) yield {"type": "status", "message": "Analyzing with LLM…"} reasoning = await asyncio.to_thread( lambda: summarize( diff --git a/app/services/diagnose/_llm_client.py b/app/services/diagnose/_llm_client.py new file mode 100644 index 0000000..a9e1481 --- /dev/null +++ b/app/services/diagnose/_llm_client.py @@ -0,0 +1,116 @@ +"""Shared LLM client for the multi-agent diagnose pipeline. + +Both Stage 3 (RootCauseHypothesizer) and Stage 5 (SummarySynthesizer) send +messages to the same LLM backend using the same two-step pattern: + 1. Try the cf-orch task endpoint → product-scoped inference routing. + 2. Fall back to OpenAI-compat → direct model call by name. + +Centralising here means changes to auth headers, timeouts, retry logic, or +cf-orch payload structure only need to be made once. +""" +from __future__ import annotations + +import logging +import re + +import httpx + +logger = logging.getLogger(__name__) + +# Regex that strips ```json … ``` or ``` … ``` fences from LLM output. +_JSON_FENCE_RE = re.compile( + r"^```(?:json)?\s*|\s*```$", + re.MULTILINE, +) + + +def extract_content(resp_json: dict) -> str | None: + """Pull text content from an OpenAI-compat chat completion response. + + Returns None when the response has no choices or empty content. + """ + choices = resp_json.get("choices") or [] + if not choices: + return None + return (choices[0].get("message", {}).get("content") or "").strip() or None + + +def strip_json_fences(raw: str) -> str: + """Remove markdown code fences that some LLMs wrap around JSON output. + + Example: '```json\\n[...]\\n```' → '[...]' + """ + return _JSON_FENCE_RE.sub("", raw).strip() + + +def call_llm( + llm_url: str, + llm_model: str, + llm_api_key: str | None, + messages: list[dict], + task_name: str = "log_analysis", + timeout: float = 120.0, +) -> str | None: + """Send messages to the LLM; return raw text or None on failure. + + Tries the cf-orch task endpoint first (product-routed inference). + Falls back to a direct OpenAI-compat ``/v1/chat/completions`` call when: + - The task endpoint returns 404 (no assignment for this task). + - The task endpoint is unreachable (connection error, timeout, etc.). + + Args: + llm_url: Base URL of the LLM backend (e.g. ``http://10.1.10.71:7700``). + llm_model: Model identifier used in the OpenAI-compat fallback call. + llm_api_key: Optional bearer token for authenticated endpoints. + messages: OpenAI-style message list (system + user turns). + task_name: cf-orch task name for product-routed inference (default: ``log_analysis``). + timeout: Request timeout in seconds (default: 120). + + Returns: + Raw text content string, or None if both paths fail. + """ + headers: dict[str, str] = {} + if llm_api_key: + headers["Authorization"] = f"Bearer {llm_api_key}" + + # --- Path 1: cf-orch task endpoint --- + task_url = f"{llm_url.rstrip('/')}/api/inference/task" + try: + resp = httpx.post( + task_url, + json={ + "product": "turnstone", + "task": task_name, + "payload": {"messages": messages, "stream": False}, + }, + headers=headers, + timeout=timeout, + ) + if resp.status_code == 200: + return extract_content(resp.json()) + if resp.status_code != 404: + resp.raise_for_status() + logger.debug( + "No task assignment for turnstone.%s — falling back to direct model", + task_name, + ) + except Exception as exc: # noqa: BLE001 + # Broad catch is intentional: captures network errors, timeouts, and + # any backend-specific exceptions so the pipeline can fall back. + logger.debug( + "Task endpoint unavailable (%s) — falling back to direct model", exc + ) + + # --- Path 2: OpenAI-compat fallback --- + try: + resp = httpx.post( + f"{llm_url.rstrip('/')}/v1/chat/completions", + json={"model": llm_model, "messages": messages, "stream": False}, + headers=headers, + timeout=timeout, + ) + resp.raise_for_status() + return extract_content(resp.json()) + except Exception as exc: # noqa: BLE001 + logger.warning("LLM call failed (%s): %s", type(exc).__name__, exc) + return None diff --git a/app/services/diagnose/classifier.py b/app/services/diagnose/classifier.py index b7aa8ed..b340c39 100644 --- a/app/services/diagnose/classifier.py +++ b/app/services/diagnose/classifier.py @@ -19,6 +19,8 @@ import os from pathlib import Path from typing import Any +from types import MappingProxyType + from app.services.diagnose.models import ( ClassifiedTimeline, EventCluster, @@ -243,7 +245,7 @@ class SeverityClassifier: return ClassifiedTimeline( timeline=timeline, - cluster_severities=cluster_severities, + cluster_severities=MappingProxyType(cluster_severities), classifier_used=classifier_used, # type: ignore[arg-type] model_id=self._model_id if ml_available else None, ) diff --git a/app/services/diagnose/hypothesizer.py b/app/services/diagnose/hypothesizer.py index 7c5c3e6..827332f 100644 --- a/app/services/diagnose/hypothesizer.py +++ b/app/services/diagnose/hypothesizer.py @@ -5,9 +5,8 @@ import json import logging from uuid import uuid4 -import httpx - from app.context.retriever import RetrievedContext +from app.services.diagnose._llm_client import call_llm, strip_json_fences from app.services.diagnose.models import ( ClassifiedTimeline, EventCluster, @@ -60,14 +59,6 @@ def _cluster_summary(cluster: EventCluster, severity: str) -> str: return summary -def _extract_content(resp_json: dict) -> str | None: - """Pull text content from an OpenAI-compat chat completion response.""" - choices = resp_json.get("choices") or [] - if not choices: - return None - return (choices[0].get("message", {}).get("content") or "").strip() or None - - class RootCauseHypothesizer: """Generate ranked root-cause hypotheses from a classified log timeline.""" @@ -120,7 +111,7 @@ class RootCauseHypothesizer: {"role": "user", "content": user_message}, ] - raw_response = self._call_llm( + raw_response = call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, @@ -131,59 +122,14 @@ class RootCauseHypothesizer: return self._parse_response(raw_response) - def _call_llm( - self, - llm_url: str, - llm_model: str, - llm_api_key: str | None, - messages: list[dict], - ) -> str | None: - """Send messages to the LLM and return raw text content.""" - headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} - - # Try cf-orch task-based endpoint first. - task_url = f"{llm_url.rstrip('/')}/api/inference/task" - try: - resp = httpx.post( - task_url, - json={ - "product": "turnstone", - "task": "log_analysis", - "payload": {"messages": messages, "stream": False}, - }, - headers=headers, - timeout=120.0, - ) - if resp.status_code == 200: - return _extract_content(resp.json()) - if resp.status_code != 404: - resp.raise_for_status() - logger.debug( - "No task assignment for turnstone.log_analysis — falling back to direct model" - ) - except Exception as exc: - logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) - - # Fallback: OpenAI-compat endpoint with explicit model name. - try: - resp = httpx.post( - f"{llm_url.rstrip('/')}/v1/chat/completions", - json={"model": llm_model, "messages": messages, "stream": False}, - headers=headers, - timeout=120.0, - ) - resp.raise_for_status() - return _extract_content(resp.json()) - except Exception as exc: - logger.warning( - "LLM hypothesizer failed (%s): %s", type(exc).__name__, exc - ) - return None - def _parse_response(self, raw: str) -> list[Hypothesis]: - """Parse the LLM JSON response into a list of Hypothesis objects.""" + """Parse the LLM JSON response into a list of Hypothesis objects. + + Strips markdown code fences before parsing — some LLMs wrap JSON in + triple-backtick fences despite being instructed not to. + """ try: - data = json.loads(raw.strip()) + data = json.loads(strip_json_fences(raw)) except json.JSONDecodeError: logger.warning( "Hypothesizer: invalid JSON from LLM (truncated): %.120s", raw @@ -207,7 +153,9 @@ class RootCauseHypothesizer: title=str(item.get("title", "Unknown"))[:80], description=str(item.get("description", "")), confidence=_coerce_float(item.get("confidence"), 0.5), - supporting_cluster_ids=tuple(item.get("supporting_clusters") or []), + supporting_cluster_ids=tuple( + str(x) for x in (item.get("supporting_clusters") or []) + ), runbook_refs=(), severity=severity, ) diff --git a/app/services/diagnose/models.py b/app/services/diagnose/models.py index 2831d30..ed454d1 100644 --- a/app/services/diagnose/models.py +++ b/app/services/diagnose/models.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass +from types import MappingProxyType from typing import Literal SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] @@ -40,10 +41,14 @@ class TimelineResult: @dataclass(frozen=True) class ClassifiedTimeline: - """Timeline annotated with ML-assigned severity per cluster.""" + """Timeline annotated with ML-assigned severity per cluster. + + ``cluster_severities`` is a ``MappingProxyType`` so the mapping is + fully immutable — consistent with the ``frozen=True`` intent. + """ timeline: TimelineResult - cluster_severities: dict[str, SeverityLabel] + cluster_severities: MappingProxyType # MappingProxyType[str, SeverityLabel] classifier_used: Literal["ml", "pattern_tags", "regex"] model_id: str | None diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py index f902610..a60952f 100644 --- a/app/services/diagnose/pipeline.py +++ b/app/services/diagnose/pipeline.py @@ -53,9 +53,15 @@ async def run_pipeline( """ # Stage 1: Timeline reconstruction yield {"type": "status", "message": "Building timeline…"} - timeline = await asyncio.to_thread( - TimelineReconstructor().reconstruct, entries - ) + try: + timeline = await asyncio.to_thread( + TimelineReconstructor().reconstruct, entries + ) + except Exception as exc: + logger.exception("Stage 1 (timeline) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"} + yield {"type": "done"} + return n_clusters = len(timeline.clusters) burst = timeline.burst_count yield { @@ -66,9 +72,15 @@ async def run_pipeline( } # Stage 2: Severity classification - classified = await asyncio.to_thread( - SeverityClassifier().classify, timeline - ) + try: + classified = await asyncio.to_thread( + SeverityClassifier().classify, timeline + ) + except Exception as exc: + logger.exception("Stage 2 (classifier) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"} + yield {"type": "done"} + return sev_counts: dict[str, int] = {} for sev in classified.cluster_severities.values(): sev_counts[sev] = sev_counts.get(sev, 0) + 1 @@ -81,15 +93,21 @@ async def run_pipeline( } # Stage 3: Root-cause hypotheses - hypotheses = await asyncio.to_thread( - RootCauseHypothesizer().hypothesize, - classified, - ctx, - query, - llm_url, - llm_model, - llm_api_key, - ) + try: + hypotheses = await asyncio.to_thread( + RootCauseHypothesizer().hypothesize, + classified, + ctx, + query, + llm_url, + llm_model, + llm_api_key, + ) + except Exception as exc: + logger.exception("Stage 3 (hypothesizer) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"} + yield {"type": "done"} + return yield { "type": "pipeline_stage", "stage": 3, @@ -98,9 +116,15 @@ async def run_pipeline( } # Stage 4: False-positive suppression - ranked = await asyncio.to_thread( - FalsePositiveSuppressor().suppress, hypotheses, db_path - ) + try: + ranked = await asyncio.to_thread( + FalsePositiveSuppressor().suppress, hypotheses, db_path + ) + except Exception as exc: + logger.exception("Stage 4 (suppressor) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"} + yield {"type": "done"} + return suppressed = sum(1 for rh in ranked if rh.suppress) active = len(ranked) - suppressed yield { @@ -116,16 +140,22 @@ async def run_pipeline( # Stage 5: Summary synthesis yield {"type": "status", "message": "Synthesizing…"} - synthesis_text = await asyncio.to_thread( - SummarySynthesizer().synthesize, - ranked, - timeline, - ctx, - query, - llm_url, - llm_model, - llm_api_key, - ) + try: + synthesis_text = await asyncio.to_thread( + SummarySynthesizer().synthesize, + ranked, + timeline, + ctx, + query, + llm_url, + llm_model, + llm_api_key, + ) + except Exception as exc: + logger.exception("Stage 5 (synthesizer) failed: %s", exc) + yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"} + yield {"type": "done"} + return if synthesis_text: yield {"type": "reasoning", "text": synthesis_text} diff --git a/app/services/diagnose/synthesizer.py b/app/services/diagnose/synthesizer.py index ce07625..edbf29a 100644 --- a/app/services/diagnose/synthesizer.py +++ b/app/services/diagnose/synthesizer.py @@ -7,9 +7,8 @@ from __future__ import annotations import logging -import httpx - from app.context.retriever import RetrievedContext +from app.services.diagnose._llm_client import call_llm from app.services.diagnose.models import RankedHypothesis, TimelineResult logger = logging.getLogger(__name__) @@ -29,14 +28,6 @@ _SYSTEM_PROMPT = ( ) -def _extract_content(resp_json: dict) -> str | None: - """Pull text content from an OpenAI-compat chat completion response.""" - choices = resp_json.get("choices") or [] - if not choices: - return None - return (choices[0].get("message", {}).get("content") or "").strip() or None - - def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: """Build the hypothesis block for the prompt (non-suppressed only, top 3).""" active = [rh for rh in ranked if not rh.suppress][:3] @@ -46,15 +37,10 @@ def _build_hypothesis_block(ranked: list[RankedHypothesis]) -> str: for rh in active: h = rh.hypothesis conf_pct = int(h.confidence * 100) - similar = ( - f"Yes — suppressed, {rh.suppression_reason}" - if rh.suppress and rh.suppression_reason - else "No" - ) novelty = f"{rh.novelty_score:.2f}" lines.append( f"- [{h.severity}, {conf_pct}%] {h.title}\n" - f" Similar resolved incident? {similar} (novelty {novelty})" + f" Novelty: {novelty}" ) return "\n".join(lines) @@ -149,62 +135,10 @@ class SummarySynthesizer: {"role": "user", "content": user_message}, ] - result = self._call_llm( + result = call_llm( llm_url=llm_url, llm_model=llm_model, llm_api_key=llm_api_key, messages=messages, ) return result if result else fallback - - def _call_llm( - self, - llm_url: str, - llm_model: str, - llm_api_key: str | None, - messages: list[dict], - ) -> str | None: - """Send messages to the LLM and return raw text content. - - Tries the cf-orch task endpoint first, falls back to direct OpenAI-compat. - """ - headers = {"Authorization": f"Bearer {llm_api_key}"} if llm_api_key else {} - - task_url = f"{llm_url.rstrip('/')}/api/inference/task" - try: - resp = httpx.post( - task_url, - json={ - "product": "turnstone", - "task": "log_analysis", - "payload": {"messages": messages, "stream": False}, - }, - headers=headers, - timeout=120.0, - ) - if resp.status_code == 200: - return _extract_content(resp.json()) - if resp.status_code != 404: - resp.raise_for_status() - logger.debug( - "No task assignment for turnstone.log_analysis — falling back to direct model" - ) - except Exception as exc: - logger.debug( - "Task endpoint unavailable (%s) — falling back to direct model", exc - ) - - try: - resp = httpx.post( - f"{llm_url.rstrip('/')}/v1/chat/completions", - json={"model": llm_model, "messages": messages, "stream": False}, - headers=headers, - timeout=120.0, - ) - resp.raise_for_status() - return _extract_content(resp.json()) - except Exception as exc: - logger.warning( - "LLM synthesizer failed (%s): %s", type(exc).__name__, exc - ) - return None