diff --git a/app/services/diagnose/__init__.py b/app/services/diagnose/__init__.py index ce6f6c4..a1ee55f 100644 --- a/app/services/diagnose/__init__.py +++ b/app/services/diagnose/__init__.py @@ -7,6 +7,7 @@ namespace, preserving backward compatibility with existing tests. The verbatim original is preserved in legacy.py for reference. """ + from __future__ import annotations import asyncio @@ -27,6 +28,7 @@ logger = logging.getLogger(__name__) try: from dateparser.search import search_dates as _search_dates # type: ignore[import] + _HAS_DATEPARSER = True except ImportError: _search_dates = None # type: ignore[assignment] @@ -63,7 +65,7 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]: m = _RELATIVE_RE.search(query) if m: since, until = _relative_window(m) - keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip() + keywords = re.sub(r"\s{2,}", " ", query[: m.start()] + query[m.end() :]).strip() return since, until, keywords or query if _HAS_DATEPARSER and _search_dates is not None: @@ -77,17 +79,27 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]: results = _search_dates( query, languages=["en"], - settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True}, + settings={ + "PREFER_DATES_FROM": "past", + "TIMEZONE": tz_str, + "RETURN_AS_TIMEZONE_AWARE": True, + }, + ) + except Exception as e: + logger.warning( + "dateparser failed (%s) on query %r — falling back to 60-min window", + type(e).__name__, + query, ) - except Exception: - logger.warning("dateparser failed on query %r — falling back to 60-min window", query) results = None if results: phrase, dt = results[0] if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: - dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare + dt = dt.astimezone( + timezone.utc + ) # normalise to UTC for SQLite string compare since = (dt - timedelta(minutes=30)).isoformat() until = (dt + timedelta(minutes=30)).isoformat() keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) @@ -116,8 +128,23 @@ def diagnose( else: keywords = query - keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) - window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15) + keyword_hits = search( + db_path, + query=keywords, + since=since, + until=until, + source_filter=source_filter, + limit=150, + or_mode=True, + ) + window_hits = entries_in_window( + db_path, + since=since, + until=until, + source_filter=source_filter, + limit=50, + per_source_cap=15, + ) seen: set[str] = set() merged: list[SearchResult] = [] @@ -126,7 +153,9 @@ def diagnose( seen.add(r.entry_id) merged.append(r) - combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[ + :200 + ] by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_source: dict[str, int] = {} @@ -138,7 +167,9 @@ def diagnose( reasoning: str | None = None if llm_url and llm_model: - reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key) + reasoning = summarize( + query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key + ) return { "summary": { @@ -186,7 +217,9 @@ async def diagnose_stream( yield {"type": "status", "message": "Parsing time window…"} time_detected = since is not None and until is not None if not time_detected: - parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query) + parsed_since, parsed_until, keywords = await asyncio.to_thread( + parse_time_window, query + ) since = since or parsed_since until = until or parsed_until time_detected = keywords != query @@ -206,23 +239,34 @@ async def diagnose_stream( keyword_hits: list[SearchResult] = [] window_hits = await asyncio.to_thread( lambda: entries_in_window( - db_path, since, until, - source_filter=source_filter, limit=200, + db_path, + since, + until, + source_filter=source_filter, + limit=200, ) ) else: keyword_hits, window_hits = await asyncio.gather( asyncio.to_thread( lambda: search( - db_path, keywords, - source_filter=source_filter, since=since, until=until, - limit=150, or_mode=True, + db_path, + keywords, + source_filter=source_filter, + since=since, + until=until, + limit=150, + or_mode=True, ) ), asyncio.to_thread( lambda: entries_in_window( - db_path, since, until, - source_filter=source_filter, limit=50, per_source_cap=15, + db_path, + since, + until, + source_filter=source_filter, + limit=50, + per_source_cap=15, ) ), ) @@ -234,7 +278,9 @@ async def diagnose_stream( seen.add(r.entry_id) merged.append(r) - combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[ + :200 + ] by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_source: dict[str, int] = {} @@ -260,7 +306,14 @@ async def diagnose_stream( if llm_url and llm_model and combined: yield {"type": "status", "message": "Analyzing with LLM…"} reasoning = await asyncio.to_thread( - lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block) + lambda: summarize( + query, + combined, + llm_url, + llm_model, + llm_api_key, + context_block=context_block, + ) ) if reasoning: yield {"type": "reasoning", "text": reasoning} @@ -280,15 +333,9 @@ __all__ = [ "diagnose", "diagnose_stream", "parse_time_window", - "_now_iso", - "_last_n_minutes", - "_HAS_DATEPARSER", - "_search_dates", - "_RELATIVE_RE", - "_RELATIVE_UNITS", - "_APPROX_N", - "_relative_window", ] # Feature flag for Task 6 -MULTI_AGENT_ENABLED = os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true" +MULTI_AGENT_ENABLED = ( + os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true" +) diff --git a/app/services/diagnose/legacy.py b/app/services/diagnose/legacy.py index 2f0c4c7..ccbe6d8 100644 --- a/app/services/diagnose/legacy.py +++ b/app/services/diagnose/legacy.py @@ -1,4 +1,5 @@ """Frictionless diagnose service — NL time extraction + layered log search.""" + from __future__ import annotations import asyncio @@ -18,6 +19,7 @@ logger = logging.getLogger(__name__) try: from dateparser.search import search_dates as _search_dates # type: ignore[import] + _HAS_DATEPARSER = True except ImportError: _search_dates = None # type: ignore[assignment] @@ -54,7 +56,7 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]: m = _RELATIVE_RE.search(query) if m: since, until = _relative_window(m) - keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip() + keywords = re.sub(r"\s{2,}", " ", query[: m.start()] + query[m.end() :]).strip() return since, until, keywords or query if _HAS_DATEPARSER and _search_dates is not None: @@ -68,17 +70,25 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]: results = _search_dates( query, languages=["en"], - settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True}, + settings={ + "PREFER_DATES_FROM": "past", + "TIMEZONE": tz_str, + "RETURN_AS_TIMEZONE_AWARE": True, + }, ) except Exception: - logger.warning("dateparser failed on query %r — falling back to 60-min window", query) + logger.warning( + "dateparser failed on query %r — falling back to 60-min window", query + ) results = None if results: phrase, dt = results[0] if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: - dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare + dt = dt.astimezone( + timezone.utc + ) # normalise to UTC for SQLite string compare since = (dt - timedelta(minutes=30)).isoformat() until = (dt + timedelta(minutes=30)).isoformat() keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) @@ -107,8 +117,23 @@ def diagnose( else: keywords = query - keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) - window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15) + keyword_hits = search( + db_path, + query=keywords, + since=since, + until=until, + source_filter=source_filter, + limit=150, + or_mode=True, + ) + window_hits = entries_in_window( + db_path, + since=since, + until=until, + source_filter=source_filter, + limit=50, + per_source_cap=15, + ) seen: set[str] = set() merged: list[SearchResult] = [] @@ -117,7 +142,9 @@ def diagnose( seen.add(r.entry_id) merged.append(r) - combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[ + :200 + ] by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_source: dict[str, int] = {} @@ -129,7 +156,9 @@ def diagnose( reasoning: str | None = None if llm_url and llm_model: - reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key) + reasoning = summarize( + query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key + ) return { "summary": { @@ -177,7 +206,9 @@ async def diagnose_stream( yield {"type": "status", "message": "Parsing time window…"} time_detected = since is not None and until is not None if not time_detected: - parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query) + parsed_since, parsed_until, keywords = await asyncio.to_thread( + parse_time_window, query + ) since = since or parsed_since until = until or parsed_until time_detected = keywords != query @@ -197,23 +228,34 @@ async def diagnose_stream( keyword_hits: list[SearchResult] = [] window_hits = await asyncio.to_thread( lambda: entries_in_window( - db_path, since, until, - source_filter=source_filter, limit=200, + db_path, + since, + until, + source_filter=source_filter, + limit=200, ) ) else: keyword_hits, window_hits = await asyncio.gather( asyncio.to_thread( lambda: search( - db_path, keywords, - source_filter=source_filter, since=since, until=until, - limit=150, or_mode=True, + db_path, + keywords, + source_filter=source_filter, + since=since, + until=until, + limit=150, + or_mode=True, ) ), asyncio.to_thread( lambda: entries_in_window( - db_path, since, until, - source_filter=source_filter, limit=50, per_source_cap=15, + db_path, + since, + until, + source_filter=source_filter, + limit=50, + per_source_cap=15, ) ), ) @@ -225,7 +267,9 @@ async def diagnose_stream( seen.add(r.entry_id) merged.append(r) - combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[ + :200 + ] by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_source: dict[str, int] = {} @@ -251,7 +295,14 @@ async def diagnose_stream( if llm_url and llm_model and combined: yield {"type": "status", "message": "Analyzing with LLM…"} reasoning = await asyncio.to_thread( - lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block) + lambda: summarize( + query, + combined, + llm_url, + llm_model, + llm_api_key, + context_block=context_block, + ) ) if reasoning: yield {"type": "reasoning", "text": reasoning} diff --git a/app/services/diagnose/models.py b/app/services/diagnose/models.py index b32b3a4..2831d30 100644 --- a/app/services/diagnose/models.py +++ b/app/services/diagnose/models.py @@ -1,4 +1,5 @@ """Pipeline data types for the multi-agent diagnose pipeline.""" + from __future__ import annotations from dataclasses import dataclass @@ -7,53 +8,63 @@ from typing import Literal SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] -@dataclass +@dataclass(frozen=True) class EventCluster: + """A time-correlated group of log entries within the timeline.""" + cluster_id: str - entries: list[str] # entry_id refs + entries: tuple[str, ...] # entry_id refs start_iso: str | None end_iso: str | None duration_seconds: float - source_ids: list[str] - pattern_tags: list[str] - severity: SeverityLabel # highest severity from raw text + source_ids: tuple[str, ...] + pattern_tags: tuple[str, ...] + severity: SeverityLabel burst: bool gap_before_seconds: float representative_text: str -@dataclass +@dataclass(frozen=True) class TimelineResult: - clusters: list[EventCluster] + """Structured timeline of event clusters built from log entries.""" + + clusters: tuple[EventCluster, ...] total_entries: int window_start: str | None window_end: str | None gap_count: int burst_count: int - dominant_sources: list[str] + dominant_sources: tuple[str, ...] -@dataclass +@dataclass(frozen=True) class ClassifiedTimeline: + """Timeline annotated with ML-assigned severity per cluster.""" + timeline: TimelineResult cluster_severities: dict[str, SeverityLabel] classifier_used: Literal["ml", "pattern_tags", "regex"] model_id: str | None -@dataclass +@dataclass(frozen=True) class Hypothesis: + """A root-cause hypothesis generated by Stage 3.""" + hypothesis_id: str title: str description: str confidence: float - supporting_cluster_ids: list[str] - runbook_refs: list[str] + supporting_cluster_ids: tuple[str, ...] + runbook_refs: tuple[str, ...] severity: SeverityLabel -@dataclass +@dataclass(frozen=True) class RankedHypothesis: + """A hypothesis enriched by Stage 4 false-positive suppression.""" + hypothesis: Hypothesis novelty_score: float similarity_to_known: float diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py index 38208a7..6c713b4 100644 --- a/app/services/diagnose/pipeline.py +++ b/app/services/diagnose/pipeline.py @@ -1,4 +1,5 @@ """Multi-agent diagnose pipeline orchestrator — stub (Task 1).""" + from __future__ import annotations from typing import Any