turnstone/app/services/diagnose/_llm_client.py
pyr0ball 85e7a70536 refactor: pipeline cleanup — 6 follow-up fixes (#33-#38)
- #33: Wrap ClassifiedTimeline.cluster_severities in MappingProxyType for
  true immutability (frozen=True only blocks field reassignment, not dict
  mutation).

- #34: Remove dead suppression branch in synthesizer._build_hypothesis_block.
  active[] is already filtered to not rh.suppress, so the 'Yes — suppressed'
  branch was unreachable. Now shows novelty score only.

- #35: Extract shared _llm_client.py with call_llm() + extract_content() +
  strip_json_fences(). Both RootCauseHypothesizer and SummarySynthesizer
  now import from one source. Also strips JSON fences from LLM output before
  parsing in hypothesizer._parse_response.

- #36: Add per-stage try/except in pipeline.run_pipeline(). Unhandled
  stage exceptions now emit {type: 'error'} + {type: 'done'} SSE events
  instead of silently closing the stream.

- #37: Move format_context_block() call inside the legacy LLM branch in
  diagnose/__init__.py — it was being computed unconditionally but only
  used in the non-pipeline path.

- #38: Coerce supporting_cluster_ids items to str() in hypothesizer
  _parse_response to guard against LLMs returning integers instead of
  string cluster IDs.
2026-05-25 19:05:56 -07:00

116 lines
4 KiB
Python

"""Shared LLM client for the multi-agent diagnose pipeline.
Both Stage 3 (RootCauseHypothesizer) and Stage 5 (SummarySynthesizer) send
messages to the same LLM backend using the same two-step pattern:
1. Try the cf-orch task endpoint → product-scoped inference routing.
2. Fall back to OpenAI-compat → direct model call by name.
Centralising here means changes to auth headers, timeouts, retry logic, or
cf-orch payload structure only need to be made once.
"""
from __future__ import annotations
import logging
import re
import httpx
logger = logging.getLogger(__name__)
# Regex that strips ```json … ``` or ``` … ``` fences from LLM output.
_JSON_FENCE_RE = re.compile(
r"^```(?:json)?\s*|\s*```$",
re.MULTILINE,
)
def extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response.
Returns None when the response has no choices or empty content.
"""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def strip_json_fences(raw: str) -> str:
"""Remove markdown code fences that some LLMs wrap around JSON output.
Example: '```json\\n[...]\\n```''[...]'
"""
return _JSON_FENCE_RE.sub("", raw).strip()
def call_llm(
llm_url: str,
llm_model: str,
llm_api_key: str | None,
messages: list[dict],
task_name: str = "log_analysis",
timeout: float = 120.0,
) -> str | None:
"""Send messages to the LLM; return raw text or None on failure.
Tries the cf-orch task endpoint first (product-routed inference).
Falls back to a direct OpenAI-compat ``/v1/chat/completions`` call when:
- The task endpoint returns 404 (no assignment for this task).
- The task endpoint is unreachable (connection error, timeout, etc.).
Args:
llm_url: Base URL of the LLM backend (e.g. ``http://<YOUR_HOST_IP>:7700``).
llm_model: Model identifier used in the OpenAI-compat fallback call.
llm_api_key: Optional bearer token for authenticated endpoints.
messages: OpenAI-style message list (system + user turns).
task_name: cf-orch task name for product-routed inference (default: ``log_analysis``).
timeout: Request timeout in seconds (default: 120).
Returns:
Raw text content string, or None if both paths fail.
"""
headers: dict[str, str] = {}
if llm_api_key:
headers["Authorization"] = f"Bearer {llm_api_key}"
# --- Path 1: cf-orch task endpoint ---
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": task_name,
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=timeout,
)
if resp.status_code == 200:
return extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
logger.debug(
"No task assignment for turnstone.%s — falling back to direct model",
task_name,
)
except Exception as exc: # noqa: BLE001
# Broad catch is intentional: captures network errors, timeouts, and
# any backend-specific exceptions so the pipeline can fall back.
logger.debug(
"Task endpoint unavailable (%s) — falling back to direct model", exc
)
# --- Path 2: OpenAI-compat fallback ---
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=timeout,
)
resp.raise_for_status()
return extract_content(resp.json())
except Exception as exc: # noqa: BLE001
logger.warning("LLM call failed (%s): %s", type(exc).__name__, exc)
return None