turnstone/app/services/llm.py
pyr0ball 2c408907ac feat: inject environment context into diagnose pipeline and LLM prompt
- Add context_block param to summarize() and thread it into _PROMPT_TEMPLATE
- Wire retrieve_context/format_context_block into diagnose_stream() before
  log search; emit context SSE event (facts + chunks) to the client
- 3 new tests covering prompt injection and SSE event emission (155 total, all pass)
2026-05-13 16:29:26 -07:00

103 lines
3.5 KiB
Python

import logging
import httpx
from app.services.search import SearchResult
logger = logging.getLogger(__name__)
_SEVERITY_RANK = {"CRITICAL": 0, "ERROR": 1, "WARN": 2, "WARNING": 2}
_PROMPT_TEMPLATE = """\
You are a homelab diagnostic assistant. A user described a symptom and the system retrieved relevant log entries.
Analyze the log entries below and write a 2-4 sentence plain-language diagnosis. Focus on errors and their likely root cause. Be specific and concise — name the services involved, not generic platitudes.
User query: {query}
{context_section}
Log entries ({n} shown, highest severity first):
{log_block}
Diagnosis:"""
def _build_context(entries: list[SearchResult], max_entries: int = 25) -> str:
ranked = sorted(
entries,
key=lambda e: (_SEVERITY_RANK.get(e.severity or "", 3), e.timestamp_iso or ""),
)[:max_entries]
return "\n".join(
f"[{e.timestamp_iso or '?'}] [{e.severity or 'INFO'}] {e.text[:200]}"
for e in ranked
)
def _extract_content(resp_json: dict) -> str | None:
"""Pull text content from an OpenAI-compat chat completion response."""
choices = resp_json.get("choices") or []
if not choices:
return None
return (choices[0].get("message", {}).get("content") or "").strip() or None
def summarize(
query: str,
entries: list[SearchResult],
llm_url: str,
llm_model: str,
api_key: str | None = None,
timeout: float = 120.0,
context_block: str | None = None,
) -> str | None:
if not entries:
return None
log_block = _build_context(entries)
context_section = (
f"\nEnvironment context:\n{context_block}\n" if context_block else ""
)
prompt = _PROMPT_TEMPLATE.format(
query=query,
n=min(len(entries), 25),
log_block=log_block,
context_section=context_section,
)
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
messages = [{"role": "user", "content": prompt}]
# Try cf-orch task-based endpoint first (routes to the security reasoning model
# assigned to turnstone.log_analysis without needing an explicit model name).
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
try:
resp = httpx.post(
task_url,
json={
"product": "turnstone",
"task": "log_analysis",
"payload": {"messages": messages, "stream": False},
},
headers=headers,
timeout=timeout,
)
if resp.status_code == 200:
return _extract_content(resp.json())
if resp.status_code != 404:
resp.raise_for_status()
# 404 means no assignment configured — fall through to direct model call
logger.debug("No task assignment for turnstone.log_analysis — falling back to direct model")
except Exception as exc:
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
# Fallback: OpenAI-compat endpoint with explicit model name (local instances,
# example-node, or any cf-orch that doesn't have task assignments loaded).
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",
json={"model": llm_model, "messages": messages, "stream": False},
headers=headers,
timeout=timeout,
)
resp.raise_for_status()
return _extract_content(resp.json())
except Exception as exc:
logger.warning("LLM summarization failed (%s): %s", type(exc).__name__, exc)
return None