"""Frictionless diagnose service — NL time extraction + layered log search.""" from __future__ import annotations import logging import re from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from app.services.llm import summarize from app.services.search import SearchResult, entries_in_window, search logger = logging.getLogger(__name__) try: from dateparser.search import search_dates as _search_dates # type: ignore[import] _HAS_DATEPARSER = True except ImportError: _search_dates = None # type: ignore[assignment] _HAS_DATEPARSER = False _RELATIVE_RE = re.compile( r"\b(?:last|past)\s+(\d+)?\s*(minute|hour|day|week)s?\b", re.IGNORECASE, ) _RELATIVE_UNITS = {"minute": 1, "hour": 60, "day": 1440, "week": 10080} def _relative_window(match: re.Match) -> tuple[str, str]: """Convert a relative time match to (since_iso, until_iso).""" n = int(match.group(1) or 1) unit = match.group(2).lower() minutes = n * _RELATIVE_UNITS[unit] return _last_n_minutes(minutes), _now_iso() def parse_time_window(query: str) -> tuple[str | None, str | None, str]: """Extract a time window from a natural-language query string. Returns (since_iso, until_iso, keywords) where keywords is the query with the matched time phrase stripped. Falls back to last-60-min window. """ # Handle relative expressions first ("last hour", "past 30 minutes", etc.) # dateparser misinterprets these as absolute times. m = _RELATIVE_RE.search(query) if m: since, until = _relative_window(m) keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip() return since, until, keywords or query if _HAS_DATEPARSER and _search_dates is not None: try: results = _search_dates(query, languages=["en"], settings={"PREFER_DATES_FROM": "past"}) except Exception: logger.warning("dateparser failed on query %r — falling back to 60-min window", query) results = None if results: phrase, dt = results[0] if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) since = (dt - timedelta(minutes=30)).isoformat() until = (dt + timedelta(minutes=30)).isoformat() keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) return since, until, keywords or query return _last_n_minutes(60), _now_iso(), query def diagnose( db_path: Path, query: str, since: str | None = None, until: str | None = None, source_filter: str | None = None, llm_url: str | None = None, llm_model: str | None = None, llm_api_key: str | None = None, ) -> dict[str, Any]: """Run layered log search with NL time extraction. Returns summary + entries.""" time_detected = since is not None and until is not None if not time_detected: parsed_since, parsed_until, keywords = parse_time_window(query) since = since or parsed_since until = until or parsed_until time_detected = keywords != query else: keywords = query keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50) seen: set[str] = set() merged: list[SearchResult] = [] for r in keyword_hits + window_hits: if r.entry_id not in seen: seen.add(r.entry_id) merged.append(r) combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_source: dict[str, int] = {} for r in combined: sev = (r.severity or "INFO").upper() if sev in by_severity: by_severity[sev] += 1 by_source[r.source_id] = by_source.get(r.source_id, 0) + 1 reasoning: str | None = None if llm_url and llm_model: reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key) return { "summary": { "total": len(combined), "window_start": since, "window_end": until, "time_detected": time_detected, "by_severity": by_severity, "by_source": by_source, }, "reasoning": reasoning, "entries": combined, } def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _last_n_minutes(n: int) -> str: return (datetime.now(timezone.utc) - timedelta(minutes=n)).isoformat()