- diagnose_stream() async generator: status/summary/entries/reasoning/done events - POST /api/diagnose/stream SSE endpoint wired in rest.py - entries_in_window() gains per_source_cap to prevent high-volume sources crowding results - QuickCapture: severity filter pills, filtered entries view, pipeline status spinner - llm.py: remove overly broad HTTPStatusError re-raise
94 lines
3.3 KiB
Python
94 lines
3.3 KiB
Python
import logging
|
|
|
|
import httpx
|
|
|
|
from app.services.search import SearchResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SEVERITY_RANK = {"CRITICAL": 0, "ERROR": 1, "WARN": 2, "WARNING": 2}
|
|
|
|
_PROMPT_TEMPLATE = """\
|
|
You are a homelab diagnostic assistant. A user described a symptom and the system retrieved relevant log entries.
|
|
|
|
Analyze the log entries below and write a 2-4 sentence plain-language diagnosis. Focus on errors and their likely root cause. Be specific and concise — name the services involved, not generic platitudes.
|
|
|
|
User query: {query}
|
|
|
|
Log entries ({n} shown, highest severity first):
|
|
{log_block}
|
|
|
|
Diagnosis:"""
|
|
|
|
|
|
def _build_context(entries: list[SearchResult], max_entries: int = 25) -> str:
|
|
ranked = sorted(
|
|
entries,
|
|
key=lambda e: (_SEVERITY_RANK.get(e.severity or "", 3), e.timestamp_iso or ""),
|
|
)[:max_entries]
|
|
return "\n".join(
|
|
f"[{e.timestamp_iso or '?'}] [{e.severity or 'INFO'}] {e.text[:200]}"
|
|
for e in ranked
|
|
)
|
|
|
|
|
|
def _extract_content(resp_json: dict) -> str | None:
|
|
"""Pull text content from an OpenAI-compat chat completion response."""
|
|
choices = resp_json.get("choices") or []
|
|
if not choices:
|
|
return None
|
|
return (choices[0].get("message", {}).get("content") or "").strip() or None
|
|
|
|
|
|
def summarize(
|
|
query: str,
|
|
entries: list[SearchResult],
|
|
llm_url: str,
|
|
llm_model: str,
|
|
api_key: str | None = None,
|
|
timeout: float = 120.0,
|
|
) -> str | None:
|
|
if not entries:
|
|
return None
|
|
log_block = _build_context(entries)
|
|
prompt = _PROMPT_TEMPLATE.format(query=query, n=min(len(entries), 25), log_block=log_block)
|
|
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
|
|
messages = [{"role": "user", "content": prompt}]
|
|
|
|
# Try cf-orch task-based endpoint first (routes to the security reasoning model
|
|
# assigned to turnstone.log_analysis without needing an explicit model name).
|
|
task_url = f"{llm_url.rstrip('/')}/api/inference/task"
|
|
try:
|
|
resp = httpx.post(
|
|
task_url,
|
|
json={
|
|
"product": "turnstone",
|
|
"task": "log_analysis",
|
|
"payload": {"messages": messages, "stream": False},
|
|
},
|
|
headers=headers,
|
|
timeout=timeout,
|
|
)
|
|
if resp.status_code == 200:
|
|
return _extract_content(resp.json())
|
|
if resp.status_code != 404:
|
|
resp.raise_for_status()
|
|
# 404 means no assignment configured — fall through to direct model call
|
|
logger.debug("No task assignment for turnstone.log_analysis — falling back to direct model")
|
|
except Exception as exc:
|
|
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
|
|
|
|
# Fallback: OpenAI-compat endpoint with explicit model name (local instances,
|
|
# xanderland, or any cf-orch that doesn't have task assignments loaded).
|
|
try:
|
|
resp = httpx.post(
|
|
f"{llm_url.rstrip('/')}/v1/chat/completions",
|
|
json={"model": llm_model, "messages": messages, "stream": False},
|
|
headers=headers,
|
|
timeout=timeout,
|
|
)
|
|
resp.raise_for_status()
|
|
return _extract_content(resp.json())
|
|
except Exception as exc:
|
|
logger.warning("LLM summarization failed (%s): %s", type(exc).__name__, exc)
|
|
return None
|