- Diagnose: add source_filter param threaded through entries_in_window, search, _diagnose, and DiagnoseRequest — clicking diagnose on a dashboard source now scopes both keyword and window hits to that source - QuickCapture: read route.query.source; show scope badge with clear ✕; auto-run when source param is present without a query - DashboardView: pass source= (not q=) when navigating to diagnose - collect_cluster_logs.sh: auto-discover Docker containers on all nodes (Heimdall non-watched, Navi, Strahl via SSH); collect Cass Plex logs via SSH; write to per-node dirs for directory-mode ingest - turnstone-cluster.service: add --reload for hot-reload during dev
133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
"""Frictionless diagnose service — NL time extraction + layered log search."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.services.llm import summarize
|
|
from app.services.search import SearchResult, entries_in_window, search
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from dateparser.search import search_dates as _search_dates # type: ignore[import]
|
|
_HAS_DATEPARSER = True
|
|
except ImportError:
|
|
_search_dates = None # type: ignore[assignment]
|
|
_HAS_DATEPARSER = False
|
|
|
|
|
|
_RELATIVE_RE = re.compile(
|
|
r"\b(?:last|past)\s+(\d+)?\s*(minute|hour|day|week)s?\b",
|
|
re.IGNORECASE,
|
|
)
|
|
_RELATIVE_UNITS = {"minute": 1, "hour": 60, "day": 1440, "week": 10080}
|
|
|
|
|
|
def _relative_window(match: re.Match) -> tuple[str, str]:
|
|
"""Convert a relative time match to (since_iso, until_iso)."""
|
|
n = int(match.group(1) or 1)
|
|
unit = match.group(2).lower()
|
|
minutes = n * _RELATIVE_UNITS[unit]
|
|
return _last_n_minutes(minutes), _now_iso()
|
|
|
|
|
|
def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
|
|
"""Extract a time window from a natural-language query string.
|
|
|
|
Returns (since_iso, until_iso, keywords) where keywords is the query with
|
|
the matched time phrase stripped. Falls back to last-60-min window.
|
|
"""
|
|
# Handle relative expressions first ("last hour", "past 30 minutes", etc.)
|
|
# dateparser misinterprets these as absolute times.
|
|
m = _RELATIVE_RE.search(query)
|
|
if m:
|
|
since, until = _relative_window(m)
|
|
keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip()
|
|
return since, until, keywords or query
|
|
|
|
if _HAS_DATEPARSER and _search_dates is not None:
|
|
try:
|
|
results = _search_dates(query, languages=["en"], settings={"PREFER_DATES_FROM": "past"})
|
|
except Exception:
|
|
logger.warning("dateparser failed on query %r — falling back to 60-min window", query)
|
|
results = None
|
|
if results:
|
|
phrase, dt = results[0]
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
since = (dt - timedelta(minutes=30)).isoformat()
|
|
until = (dt + timedelta(minutes=30)).isoformat()
|
|
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
|
|
return since, until, keywords or query
|
|
|
|
return _last_n_minutes(60), _now_iso(), query
|
|
|
|
|
|
def diagnose(
|
|
db_path: Path,
|
|
query: str,
|
|
since: str | None = None,
|
|
until: str | None = None,
|
|
source_filter: str | None = None,
|
|
llm_url: str | None = None,
|
|
llm_model: str | None = None,
|
|
llm_api_key: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Run layered log search with NL time extraction. Returns summary + entries."""
|
|
time_detected = since is not None and until is not None
|
|
if not time_detected:
|
|
parsed_since, parsed_until, keywords = parse_time_window(query)
|
|
since = since or parsed_since
|
|
until = until or parsed_until
|
|
time_detected = keywords != query
|
|
else:
|
|
keywords = query
|
|
|
|
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True)
|
|
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50)
|
|
|
|
seen: set[str] = set()
|
|
merged: list[SearchResult] = []
|
|
for r in keyword_hits + window_hits:
|
|
if r.entry_id not in seen:
|
|
seen.add(r.entry_id)
|
|
merged.append(r)
|
|
|
|
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
|
|
|
|
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
|
|
by_source: dict[str, int] = {}
|
|
for r in combined:
|
|
sev = (r.severity or "INFO").upper()
|
|
if sev in by_severity:
|
|
by_severity[sev] += 1
|
|
by_source[r.source_id] = by_source.get(r.source_id, 0) + 1
|
|
|
|
reasoning: str | None = None
|
|
if llm_url and llm_model:
|
|
reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key)
|
|
|
|
return {
|
|
"summary": {
|
|
"total": len(combined),
|
|
"window_start": since,
|
|
"window_end": until,
|
|
"time_detected": time_detected,
|
|
"by_severity": by_severity,
|
|
"by_source": by_source,
|
|
},
|
|
"reasoning": reasoning,
|
|
"entries": combined,
|
|
}
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _last_n_minutes(n: int) -> str:
|
|
return (datetime.now(timezone.utc) - timedelta(minutes=n)).isoformat()
|