diff --git a/app/services/diagnose/__init__.py b/app/services/diagnose/__init__.py new file mode 100644 index 0000000..ce6f6c4 --- /dev/null +++ b/app/services/diagnose/__init__.py @@ -0,0 +1,294 @@ +"""Frictionless diagnose service — NL time extraction + layered log search. + +This module is the public interface for the diagnose package. +Full implementation lives here so that patch("app.services.diagnose._HAS_DATEPARSER") +and patch("app.services.diagnose._search_dates") continue to target the correct +namespace, preserving backward compatibility with existing tests. + +The verbatim original is preserved in legacy.py for reference. +""" +from __future__ import annotations + +import asyncio +import dataclasses +import logging +import os +import re +from collections.abc import AsyncGenerator +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +from app.context.retriever import retrieve_context, format_context_block +from app.services.llm import summarize +from app.services.search import SearchResult, entries_in_window, search + +logger = logging.getLogger(__name__) + +try: + from dateparser.search import search_dates as _search_dates # type: ignore[import] + _HAS_DATEPARSER = True +except ImportError: + _search_dates = None # type: ignore[assignment] + _HAS_DATEPARSER = False + + +_RELATIVE_RE = re.compile( + r"\b(?:last|past)\s+(?:(?P\d+)|(?Pa\s+few|few|couple(?:\s+of)?|several))?\s*(?Pminute|hour|day|week)s?\b", + re.IGNORECASE, +) +_RELATIVE_UNITS = {"minute": 1, "hour": 60, "day": 1440, "week": 10080} +# Fuzzy quantifiers map to a reasonable span so "last few hours" → 3h window +_APPROX_N = 3 + + +def _relative_window(match: re.Match) -> tuple[str, str]: + """Convert a relative time match to (since_iso, until_iso).""" + n_str = match.group("n") + approx = match.group("approx") + unit = match.group("unit").lower() + n = int(n_str) if n_str else (_APPROX_N if approx else 1) + minutes = n * _RELATIVE_UNITS[unit] + return _last_n_minutes(minutes), _now_iso() + + +def parse_time_window(query: str) -> tuple[str | None, str | None, str]: + """Extract a time window from a natural-language query string. + + Returns (since_iso, until_iso, keywords) where keywords is the query with + the matched time phrase stripped. Falls back to last-60-min window. + """ + # Handle relative expressions first ("last hour", "past 30 minutes", etc.) + # dateparser misinterprets these as absolute times. + m = _RELATIVE_RE.search(query) + if m: + since, until = _relative_window(m) + keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip() + return since, until, keywords or query + + if _HAS_DATEPARSER and _search_dates is not None: + # Tell dateparser what timezone the user is in so "3:35 am" means local time. + # PREFER_DAY_OF_MONTH is unused here but PREFER_DATES_FROM=past ensures + # "3:35 am" resolves to the most recent past occurrence, not a future one. + local_offset = datetime.now().astimezone().utcoffset() + offset_h = int((local_offset.total_seconds() if local_offset else 0) / 3600) + tz_str = f"UTC{'+' if offset_h >= 0 else ''}{offset_h}" + try: + results = _search_dates( + query, + languages=["en"], + settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True}, + ) + except Exception: + logger.warning("dateparser failed on query %r — falling back to 60-min window", query) + results = None + if results: + phrase, dt = results[0] + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare + since = (dt - timedelta(minutes=30)).isoformat() + until = (dt + timedelta(minutes=30)).isoformat() + keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) + return since, until, keywords or query + + return _last_n_minutes(60), _now_iso(), query + + +def diagnose( + db_path: Path, + query: str, + since: str | None = None, + until: str | None = None, + source_filter: str | None = None, + llm_url: str | None = None, + llm_model: str | None = None, + llm_api_key: str | None = None, +) -> dict[str, Any]: + """Run layered log search with NL time extraction. Returns summary + entries.""" + time_detected = since is not None and until is not None + if not time_detected: + parsed_since, parsed_until, keywords = parse_time_window(query) + since = since or parsed_since + until = until or parsed_until + time_detected = keywords != query + else: + keywords = query + + keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) + window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15) + + seen: set[str] = set() + merged: list[SearchResult] = [] + for r in keyword_hits + window_hits: + if r.entry_id not in seen: + seen.add(r.entry_id) + merged.append(r) + + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + + by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} + by_source: dict[str, int] = {} + for r in combined: + sev = (r.severity or "INFO").upper() + if sev in by_severity: + by_severity[sev] += 1 + by_source[r.source_id] = by_source.get(r.source_id, 0) + 1 + + reasoning: str | None = None + if llm_url and llm_model: + reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key) + + return { + "summary": { + "total": len(combined), + "window_start": since, + "window_end": until, + "time_detected": time_detected, + "by_severity": by_severity, + "by_source": by_source, + }, + "reasoning": reasoning, + "entries": combined, + } + + +async def diagnose_stream( + db_path: Path, + query: str, + since: str | None = None, + until: str | None = None, + source_filter: str | None = None, + llm_url: str | None = None, + llm_model: str | None = None, + llm_api_key: str | None = None, +) -> AsyncGenerator[dict[str, Any], None]: + """Async generator yielding SSE event dicts for the diagnose pipeline. + + Yields events in order: + {"type":"status","message":"…"} — pipeline progress + {"type":"summary","data":{…}} — window + severity counts (fast, from DB) + {"type":"entries","data":[…]} — log entries (fast, from DB) + {"type":"reasoning","text":"…"} — LLM analysis (slow, optional) + {"type":"done"} + """ + keywords = query.strip() + source_browse = not keywords and source_filter is not None + + if source_browse: + # No keyword — browsing a source directly. Use 24h window; skip FTS entirely. + yield {"type": "status", "message": f"Loading {source_filter}…"} + since = since or _last_n_minutes(60 * 24) + until = until or _now_iso() + time_detected = False + else: + yield {"type": "status", "message": "Parsing time window…"} + time_detected = since is not None and until is not None + if not time_detected: + parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query) + since = since or parsed_since + until = until or parsed_until + time_detected = keywords != query + + yield {"type": "status", "message": "Loading environment context…"} + ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query)) + context_block = format_context_block(ctx) + yield { + "type": "context", + "facts": ctx.facts, + "chunks": ctx.chunks, + } + + yield {"type": "status", "message": "Searching logs…"} + + if source_browse: + keyword_hits: list[SearchResult] = [] + window_hits = await asyncio.to_thread( + lambda: entries_in_window( + db_path, since, until, + source_filter=source_filter, limit=200, + ) + ) + else: + keyword_hits, window_hits = await asyncio.gather( + asyncio.to_thread( + lambda: search( + db_path, keywords, + source_filter=source_filter, since=since, until=until, + limit=150, or_mode=True, + ) + ), + asyncio.to_thread( + lambda: entries_in_window( + db_path, since, until, + source_filter=source_filter, limit=50, per_source_cap=15, + ) + ), + ) + + seen: set[str] = set() + merged: list[SearchResult] = [] + for r in keyword_hits + window_hits: + if r.entry_id not in seen: + seen.add(r.entry_id) + merged.append(r) + + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + + by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} + by_source: dict[str, int] = {} + for r in combined: + sev = (r.severity or "INFO").upper() + if sev in by_severity: + by_severity[sev] += 1 + by_source[r.source_id] = by_source.get(r.source_id, 0) + 1 + + yield { + "type": "summary", + "data": { + "total": len(combined), + "window_start": since, + "window_end": until, + "time_detected": time_detected, + "by_severity": by_severity, + "by_source": by_source, + }, + } + yield {"type": "entries", "data": [dataclasses.asdict(r) for r in combined]} + + if llm_url and llm_model and combined: + yield {"type": "status", "message": "Analyzing with LLM…"} + reasoning = await asyncio.to_thread( + lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block) + ) + if reasoning: + yield {"type": "reasoning", "text": reasoning} + + yield {"type": "done"} + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _last_n_minutes(n: int) -> str: + return (datetime.now(timezone.utc) - timedelta(minutes=n)).isoformat() + + +__all__ = [ + "diagnose", + "diagnose_stream", + "parse_time_window", + "_now_iso", + "_last_n_minutes", + "_HAS_DATEPARSER", + "_search_dates", + "_RELATIVE_RE", + "_RELATIVE_UNITS", + "_APPROX_N", + "_relative_window", +] + +# Feature flag for Task 6 +MULTI_AGENT_ENABLED = os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true" diff --git a/app/services/diagnose.py b/app/services/diagnose/legacy.py similarity index 100% rename from app/services/diagnose.py rename to app/services/diagnose/legacy.py diff --git a/app/services/diagnose/models.py b/app/services/diagnose/models.py new file mode 100644 index 0000000..b32b3a4 --- /dev/null +++ b/app/services/diagnose/models.py @@ -0,0 +1,61 @@ +"""Pipeline data types for the multi-agent diagnose pipeline.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal + +SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] + + +@dataclass +class EventCluster: + cluster_id: str + entries: list[str] # entry_id refs + start_iso: str | None + end_iso: str | None + duration_seconds: float + source_ids: list[str] + pattern_tags: list[str] + severity: SeverityLabel # highest severity from raw text + burst: bool + gap_before_seconds: float + representative_text: str + + +@dataclass +class TimelineResult: + clusters: list[EventCluster] + total_entries: int + window_start: str | None + window_end: str | None + gap_count: int + burst_count: int + dominant_sources: list[str] + + +@dataclass +class ClassifiedTimeline: + timeline: TimelineResult + cluster_severities: dict[str, SeverityLabel] + classifier_used: Literal["ml", "pattern_tags", "regex"] + model_id: str | None + + +@dataclass +class Hypothesis: + hypothesis_id: str + title: str + description: str + confidence: float + supporting_cluster_ids: list[str] + runbook_refs: list[str] + severity: SeverityLabel + + +@dataclass +class RankedHypothesis: + hypothesis: Hypothesis + novelty_score: float + similarity_to_known: float + suppress: bool + suppression_reason: str | None diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py new file mode 100644 index 0000000..38208a7 --- /dev/null +++ b/app/services/diagnose/pipeline.py @@ -0,0 +1,11 @@ +"""Multi-agent diagnose pipeline orchestrator — stub (Task 1).""" +from __future__ import annotations + +from typing import Any + +# run_pipeline() will be implemented in Task 6 + + +async def run_pipeline(*args: Any, **kwargs: Any) -> None: + """Placeholder — implemented in Task 6.""" + return None