import logging import httpx from app.services.search import SearchResult logger = logging.getLogger(__name__) _SEVERITY_RANK = {"CRITICAL": 0, "ERROR": 1, "WARN": 2, "WARNING": 2} _PROMPT_TEMPLATE = """\ You are a homelab diagnostic assistant. A user described a symptom and the system retrieved relevant log entries. Analyze the log entries below and write a 2-4 sentence plain-language diagnosis. Focus on errors and their likely root cause. Be specific and concise — name the services involved, not generic platitudes. User query: {query} {context_section} Log entries ({n} shown, highest severity first): {log_block} Diagnosis:""" def _build_context(entries: list[SearchResult], max_entries: int = 25) -> str: ranked = sorted( entries, key=lambda e: (_SEVERITY_RANK.get(e.severity or "", 3), e.timestamp_iso or ""), )[:max_entries] return "\n".join( f"[{e.timestamp_iso or '?'}] [{e.severity or 'INFO'}] {e.text[:200]}" for e in ranked ) def _extract_content(resp_json: dict) -> str | None: """Pull text content from an OpenAI-compat chat completion response.""" choices = resp_json.get("choices") or [] if not choices: return None return (choices[0].get("message", {}).get("content") or "").strip() or None def summarize( query: str, entries: list[SearchResult], llm_url: str, llm_model: str, api_key: str | None = None, timeout: float = 120.0, context_block: str | None = None, ) -> str | None: if not entries: return None log_block = _build_context(entries) context_section = ( f"\nEnvironment context:\n{context_block}\n" if context_block else "" ) prompt = _PROMPT_TEMPLATE.format( query=query, n=min(len(entries), 25), log_block=log_block, context_section=context_section, ) headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} messages = [{"role": "user", "content": prompt}] # Try cf-orch task-based endpoint first (routes to the security reasoning model # assigned to turnstone.log_analysis without needing an explicit model name). task_url = f"{llm_url.rstrip('/')}/api/inference/task" try: resp = httpx.post( task_url, json={ "product": "turnstone", "task": "log_analysis", "payload": {"messages": messages, "stream": False}, }, headers=headers, timeout=timeout, ) if resp.status_code == 200: return _extract_content(resp.json()) if resp.status_code != 404: resp.raise_for_status() # 404 means no assignment configured — fall through to direct model call logger.debug("No task assignment for turnstone.log_analysis — falling back to direct model") except Exception as exc: logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) # Fallback: OpenAI-compat endpoint with explicit model name (local instances, # example-node, or any cf-orch that doesn't have task assignments loaded). try: resp = httpx.post( f"{llm_url.rstrip('/')}/v1/chat/completions", json={"model": llm_model, "messages": messages, "stream": False}, headers=headers, timeout=timeout, ) resp.raise_for_status() return _extract_content(resp.json()) except Exception as exc: logger.warning("LLM summarization failed (%s): %s", type(exc).__name__, exc) return None