turnstone/app/services/diagnose.py
pyr0ball abd142addf feat: add diagnose service with NL time extraction via dateparser
Adds app/services/diagnose.py with parse_time_window() (dateparser-backed
NL time phrase extraction with 60-min fallback) and diagnose() (layered
FTS + window search returning severity/source summary). Includes 5 TDD tests.
2026-05-11 09:04:50 -07:00

93 lines
3.2 KiB
Python

"""Frictionless diagnose service — NL time extraction + layered log search."""
from __future__ import annotations
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from app.services.search import SearchResult, entries_in_window, search
try:
from dateparser.search import search_dates as _search_dates # type: ignore[import]
_HAS_DATEPARSER = True
except ImportError:
_search_dates = None # type: ignore[assignment]
_HAS_DATEPARSER = False
def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
"""Extract a time window from a natural-language query string.
Returns (since_iso, until_iso, keywords) where keywords is the query with
the matched time phrase stripped. Falls back to last-60-min window.
"""
if _HAS_DATEPARSER and _search_dates is not None:
results = _search_dates(query, languages=["en"], settings={"PREFER_DATES_FROM": "past"})
if results:
phrase, dt = results[0]
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
since = (dt - timedelta(minutes=30)).isoformat()
until = (dt + timedelta(minutes=30)).isoformat()
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
return since, until, keywords or query
return _last_n_minutes(60), _now_iso(), query
def diagnose(
db_path: Path,
query: str,
since: str | None = None,
until: str | None = None,
) -> dict:
"""Run layered log search with NL time extraction. Returns summary + entries."""
time_detected = since is not None or until is not None
if since is None or until is None:
parsed_since, parsed_until, keywords = parse_time_window(query)
since = since or parsed_since
until = until or parsed_until
time_detected = keywords != query
else:
keywords = query
keyword_hits = search(db_path, query=keywords, since=since, until=until, limit=150, or_mode=True)
window_hits = entries_in_window(db_path, since=since, until=until, limit=50)
seen: set[str] = set()
combined: list[SearchResult] = []
for r in keyword_hits + window_hits:
if r.entry_id not in seen:
seen.add(r.entry_id)
combined.append(r)
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
combined = combined[:200]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {}
for r in combined:
sev = (r.severity or "INFO").upper()
if sev in by_severity:
by_severity[sev] += 1
by_source[r.source_id] = by_source.get(r.source_id, 0) + 1
return {
"summary": {
"total": len(combined),
"window_start": since,
"window_end": until,
"time_detected": time_detected,
"by_severity": by_severity,
"by_source": by_source,
},
"entries": combined,
}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _last_n_minutes(n: int) -> str:
return (datetime.now(timezone.utc) - timedelta(minutes=n)).isoformat()