feat: add diagnose service with NL time extraction via dateparser
Adds app/services/diagnose.py with parse_time_window() (dateparser-backed NL time phrase extraction with 60-min fallback) and diagnose() (layered FTS + window search returning severity/source summary). Includes 5 TDD tests.
This commit is contained in:
parent
c7f6acf913
commit
21b988fd66
3 changed files with 146 additions and 0 deletions
93
app/services/diagnose.py
Normal file
93
app/services/diagnose.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
"""Frictionless diagnose service — NL time extraction + layered log search."""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from app.services.search import SearchResult, entries_in_window, search
|
||||
|
||||
try:
|
||||
from dateparser.search import search_dates as _search_dates # type: ignore[import]
|
||||
_HAS_DATEPARSER = True
|
||||
except ImportError:
|
||||
_search_dates = None # type: ignore[assignment]
|
||||
_HAS_DATEPARSER = False
|
||||
|
||||
|
||||
def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
|
||||
"""Extract a time window from a natural-language query string.
|
||||
|
||||
Returns (since_iso, until_iso, keywords) where keywords is the query with
|
||||
the matched time phrase stripped. Falls back to last-60-min window.
|
||||
"""
|
||||
if _HAS_DATEPARSER and _search_dates is not None:
|
||||
results = _search_dates(query, languages=["en"], settings={"PREFER_DATES_FROM": "past"})
|
||||
if results:
|
||||
phrase, dt = results[0]
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
since = (dt - timedelta(minutes=30)).isoformat()
|
||||
until = (dt + timedelta(minutes=30)).isoformat()
|
||||
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
|
||||
return since, until, keywords or query
|
||||
|
||||
return _last_n_minutes(60), _now_iso(), query
|
||||
|
||||
|
||||
def diagnose(
|
||||
db_path: Path,
|
||||
query: str,
|
||||
since: str | None = None,
|
||||
until: str | None = None,
|
||||
) -> dict:
|
||||
"""Run layered log search with NL time extraction. Returns summary + entries."""
|
||||
time_detected = since is not None or until is not None
|
||||
if since is None or until is None:
|
||||
parsed_since, parsed_until, keywords = parse_time_window(query)
|
||||
since = since or parsed_since
|
||||
until = until or parsed_until
|
||||
time_detected = keywords != query
|
||||
else:
|
||||
keywords = query
|
||||
|
||||
keyword_hits = search(db_path, query=keywords, since=since, until=until, limit=150, or_mode=True)
|
||||
window_hits = entries_in_window(db_path, since=since, until=until, limit=50)
|
||||
|
||||
seen: set[str] = set()
|
||||
combined: list[SearchResult] = []
|
||||
for r in keyword_hits + window_hits:
|
||||
if r.entry_id not in seen:
|
||||
seen.add(r.entry_id)
|
||||
combined.append(r)
|
||||
|
||||
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
|
||||
combined = combined[:200]
|
||||
|
||||
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
|
||||
by_source: dict[str, int] = {}
|
||||
for r in combined:
|
||||
sev = (r.severity or "INFO").upper()
|
||||
if sev in by_severity:
|
||||
by_severity[sev] += 1
|
||||
by_source[r.source_id] = by_source.get(r.source_id, 0) + 1
|
||||
|
||||
return {
|
||||
"summary": {
|
||||
"total": len(combined),
|
||||
"window_start": since,
|
||||
"window_end": until,
|
||||
"time_detected": time_detected,
|
||||
"by_severity": by_severity,
|
||||
"by_source": by_source,
|
||||
},
|
||||
"entries": combined,
|
||||
}
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _last_n_minutes(n: int) -> str:
|
||||
return (datetime.now(timezone.utc) - timedelta(minutes=n)).isoformat()
|
||||
|
|
@ -4,3 +4,4 @@ pydantic>=2.0.0
|
|||
pyyaml>=6.0
|
||||
aiofiles>=23.0.0
|
||||
python-multipart>=0.0.9
|
||||
dateparser>=1.2.0
|
||||
|
|
|
|||
52
tests/test_services_diagnose.py
Normal file
52
tests/test_services_diagnose.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
"""Tests for app/services/diagnose.py — parse_time_window."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
from app.services.diagnose import parse_time_window
|
||||
|
||||
|
||||
def test_no_time_phrase_falls_back_to_last_60_min():
|
||||
with patch("app.services.diagnose._HAS_DATEPARSER", True), \
|
||||
patch("app.services.diagnose._search_dates", return_value=None):
|
||||
since, until, keywords = parse_time_window("plex stopped playing audio")
|
||||
assert since is not None and until is not None
|
||||
assert keywords == "plex stopped playing audio"
|
||||
diff = (datetime.fromisoformat(until) - datetime.fromisoformat(since)).total_seconds()
|
||||
assert abs(diff - 3600) < 5
|
||||
|
||||
|
||||
def test_time_phrase_detected_produces_30min_window():
|
||||
dt = datetime(2026, 5, 11, 14, 0, tzinfo=timezone.utc)
|
||||
with patch("app.services.diagnose._HAS_DATEPARSER", True), \
|
||||
patch("app.services.diagnose._search_dates", return_value=[("around 2pm", dt)]):
|
||||
since, until, keywords = parse_time_window("plex stopped audio around 2pm")
|
||||
diff = (datetime.fromisoformat(until) - datetime.fromisoformat(since)).total_seconds()
|
||||
assert abs(diff - 3600) < 5
|
||||
|
||||
|
||||
def test_time_phrase_stripped_from_keywords():
|
||||
dt = datetime(2026, 5, 11, 14, 0, tzinfo=timezone.utc)
|
||||
with patch("app.services.diagnose._HAS_DATEPARSER", True), \
|
||||
patch("app.services.diagnose._search_dates", return_value=[("around 2pm", dt)]):
|
||||
_, _, keywords = parse_time_window("plex stopped audio around 2pm")
|
||||
assert "around 2pm" not in keywords
|
||||
assert "plex" in keywords
|
||||
|
||||
|
||||
def test_no_dateparser_falls_back_to_60min():
|
||||
with patch("app.services.diagnose._HAS_DATEPARSER", False):
|
||||
since, until, keywords = parse_time_window("plex stopped playing audio")
|
||||
assert keywords == "plex stopped playing audio"
|
||||
diff = (datetime.fromisoformat(until) - datetime.fromisoformat(since)).total_seconds()
|
||||
assert abs(diff - 3600) < 5
|
||||
|
||||
|
||||
def test_keywords_cleaned_of_extra_spaces():
|
||||
dt = datetime(2026, 5, 11, 14, 0, tzinfo=timezone.utc)
|
||||
with patch("app.services.diagnose._HAS_DATEPARSER", True), \
|
||||
patch("app.services.diagnose._search_dates", return_value=[("around 2pm", dt)]):
|
||||
_, _, keywords = parse_time_window("plex stopped audio around 2pm extra")
|
||||
assert " " not in keywords
|
||||
Loading…
Reference in a new issue