feat(diagnose): 5-stage multi-agent diagnose pipeline (#29) #39

Merged
pyr0ball merged 17 commits from feat/29-multi-agent-diagnose into main 2026-05-25 19:59:35 -07:00
4 changed files with 169 additions and 59 deletions
Showing only changes of commit 959a6cbf1c - Show all commits

View file

@ -7,6 +7,7 @@ namespace, preserving backward compatibility with existing tests.
The verbatim original is preserved in legacy.py for reference. The verbatim original is preserved in legacy.py for reference.
""" """
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
@ -27,6 +28,7 @@ logger = logging.getLogger(__name__)
try: try:
from dateparser.search import search_dates as _search_dates # type: ignore[import] from dateparser.search import search_dates as _search_dates # type: ignore[import]
_HAS_DATEPARSER = True _HAS_DATEPARSER = True
except ImportError: except ImportError:
_search_dates = None # type: ignore[assignment] _search_dates = None # type: ignore[assignment]
@ -63,7 +65,7 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
m = _RELATIVE_RE.search(query) m = _RELATIVE_RE.search(query)
if m: if m:
since, until = _relative_window(m) since, until = _relative_window(m)
keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip() keywords = re.sub(r"\s{2,}", " ", query[: m.start()] + query[m.end() :]).strip()
return since, until, keywords or query return since, until, keywords or query
if _HAS_DATEPARSER and _search_dates is not None: if _HAS_DATEPARSER and _search_dates is not None:
@ -77,17 +79,27 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
results = _search_dates( results = _search_dates(
query, query,
languages=["en"], languages=["en"],
settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True}, settings={
"PREFER_DATES_FROM": "past",
"TIMEZONE": tz_str,
"RETURN_AS_TIMEZONE_AWARE": True,
},
)
except Exception as e:
logger.warning(
"dateparser failed (%s) on query %r — falling back to 60-min window",
type(e).__name__,
query,
) )
except Exception:
logger.warning("dateparser failed on query %r — falling back to 60-min window", query)
results = None results = None
if results: if results:
phrase, dt = results[0] phrase, dt = results[0]
if dt.tzinfo is None: if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc) dt = dt.replace(tzinfo=timezone.utc)
else: else:
dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare dt = dt.astimezone(
timezone.utc
) # normalise to UTC for SQLite string compare
since = (dt - timedelta(minutes=30)).isoformat() since = (dt - timedelta(minutes=30)).isoformat()
until = (dt + timedelta(minutes=30)).isoformat() until = (dt + timedelta(minutes=30)).isoformat()
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
@ -116,8 +128,23 @@ def diagnose(
else: else:
keywords = query keywords = query
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) keyword_hits = search(
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15) db_path,
query=keywords,
since=since,
until=until,
source_filter=source_filter,
limit=150,
or_mode=True,
)
window_hits = entries_in_window(
db_path,
since=since,
until=until,
source_filter=source_filter,
limit=50,
per_source_cap=15,
)
seen: set[str] = set() seen: set[str] = set()
merged: list[SearchResult] = [] merged: list[SearchResult] = []
@ -126,7 +153,9 @@ def diagnose(
seen.add(r.entry_id) seen.add(r.entry_id)
merged.append(r) merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
:200
]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {} by_source: dict[str, int] = {}
@ -138,7 +167,9 @@ def diagnose(
reasoning: str | None = None reasoning: str | None = None
if llm_url and llm_model: if llm_url and llm_model:
reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key) reasoning = summarize(
query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key
)
return { return {
"summary": { "summary": {
@ -186,7 +217,9 @@ async def diagnose_stream(
yield {"type": "status", "message": "Parsing time window…"} yield {"type": "status", "message": "Parsing time window…"}
time_detected = since is not None and until is not None time_detected = since is not None and until is not None
if not time_detected: if not time_detected:
parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query) parsed_since, parsed_until, keywords = await asyncio.to_thread(
parse_time_window, query
)
since = since or parsed_since since = since or parsed_since
until = until or parsed_until until = until or parsed_until
time_detected = keywords != query time_detected = keywords != query
@ -206,23 +239,34 @@ async def diagnose_stream(
keyword_hits: list[SearchResult] = [] keyword_hits: list[SearchResult] = []
window_hits = await asyncio.to_thread( window_hits = await asyncio.to_thread(
lambda: entries_in_window( lambda: entries_in_window(
db_path, since, until, db_path,
source_filter=source_filter, limit=200, since,
until,
source_filter=source_filter,
limit=200,
) )
) )
else: else:
keyword_hits, window_hits = await asyncio.gather( keyword_hits, window_hits = await asyncio.gather(
asyncio.to_thread( asyncio.to_thread(
lambda: search( lambda: search(
db_path, keywords, db_path,
source_filter=source_filter, since=since, until=until, keywords,
limit=150, or_mode=True, source_filter=source_filter,
since=since,
until=until,
limit=150,
or_mode=True,
) )
), ),
asyncio.to_thread( asyncio.to_thread(
lambda: entries_in_window( lambda: entries_in_window(
db_path, since, until, db_path,
source_filter=source_filter, limit=50, per_source_cap=15, since,
until,
source_filter=source_filter,
limit=50,
per_source_cap=15,
) )
), ),
) )
@ -234,7 +278,9 @@ async def diagnose_stream(
seen.add(r.entry_id) seen.add(r.entry_id)
merged.append(r) merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
:200
]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {} by_source: dict[str, int] = {}
@ -260,7 +306,14 @@ async def diagnose_stream(
if llm_url and llm_model and combined: if llm_url and llm_model and combined:
yield {"type": "status", "message": "Analyzing with LLM…"} yield {"type": "status", "message": "Analyzing with LLM…"}
reasoning = await asyncio.to_thread( reasoning = await asyncio.to_thread(
lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block) lambda: summarize(
query,
combined,
llm_url,
llm_model,
llm_api_key,
context_block=context_block,
)
) )
if reasoning: if reasoning:
yield {"type": "reasoning", "text": reasoning} yield {"type": "reasoning", "text": reasoning}
@ -280,15 +333,9 @@ __all__ = [
"diagnose", "diagnose",
"diagnose_stream", "diagnose_stream",
"parse_time_window", "parse_time_window",
"_now_iso",
"_last_n_minutes",
"_HAS_DATEPARSER",
"_search_dates",
"_RELATIVE_RE",
"_RELATIVE_UNITS",
"_APPROX_N",
"_relative_window",
] ]
# Feature flag for Task 6 # Feature flag for Task 6
MULTI_AGENT_ENABLED = os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true" MULTI_AGENT_ENABLED = (
os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true"
)

View file

@ -1,4 +1,5 @@
"""Frictionless diagnose service — NL time extraction + layered log search.""" """Frictionless diagnose service — NL time extraction + layered log search."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
@ -18,6 +19,7 @@ logger = logging.getLogger(__name__)
try: try:
from dateparser.search import search_dates as _search_dates # type: ignore[import] from dateparser.search import search_dates as _search_dates # type: ignore[import]
_HAS_DATEPARSER = True _HAS_DATEPARSER = True
except ImportError: except ImportError:
_search_dates = None # type: ignore[assignment] _search_dates = None # type: ignore[assignment]
@ -54,7 +56,7 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
m = _RELATIVE_RE.search(query) m = _RELATIVE_RE.search(query)
if m: if m:
since, until = _relative_window(m) since, until = _relative_window(m)
keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip() keywords = re.sub(r"\s{2,}", " ", query[: m.start()] + query[m.end() :]).strip()
return since, until, keywords or query return since, until, keywords or query
if _HAS_DATEPARSER and _search_dates is not None: if _HAS_DATEPARSER and _search_dates is not None:
@ -68,17 +70,25 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
results = _search_dates( results = _search_dates(
query, query,
languages=["en"], languages=["en"],
settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True}, settings={
"PREFER_DATES_FROM": "past",
"TIMEZONE": tz_str,
"RETURN_AS_TIMEZONE_AWARE": True,
},
) )
except Exception: except Exception:
logger.warning("dateparser failed on query %r — falling back to 60-min window", query) logger.warning(
"dateparser failed on query %r — falling back to 60-min window", query
)
results = None results = None
if results: if results:
phrase, dt = results[0] phrase, dt = results[0]
if dt.tzinfo is None: if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc) dt = dt.replace(tzinfo=timezone.utc)
else: else:
dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare dt = dt.astimezone(
timezone.utc
) # normalise to UTC for SQLite string compare
since = (dt - timedelta(minutes=30)).isoformat() since = (dt - timedelta(minutes=30)).isoformat()
until = (dt + timedelta(minutes=30)).isoformat() until = (dt + timedelta(minutes=30)).isoformat()
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
@ -107,8 +117,23 @@ def diagnose(
else: else:
keywords = query keywords = query
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) keyword_hits = search(
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15) db_path,
query=keywords,
since=since,
until=until,
source_filter=source_filter,
limit=150,
or_mode=True,
)
window_hits = entries_in_window(
db_path,
since=since,
until=until,
source_filter=source_filter,
limit=50,
per_source_cap=15,
)
seen: set[str] = set() seen: set[str] = set()
merged: list[SearchResult] = [] merged: list[SearchResult] = []
@ -117,7 +142,9 @@ def diagnose(
seen.add(r.entry_id) seen.add(r.entry_id)
merged.append(r) merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
:200
]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {} by_source: dict[str, int] = {}
@ -129,7 +156,9 @@ def diagnose(
reasoning: str | None = None reasoning: str | None = None
if llm_url and llm_model: if llm_url and llm_model:
reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key) reasoning = summarize(
query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key
)
return { return {
"summary": { "summary": {
@ -177,7 +206,9 @@ async def diagnose_stream(
yield {"type": "status", "message": "Parsing time window…"} yield {"type": "status", "message": "Parsing time window…"}
time_detected = since is not None and until is not None time_detected = since is not None and until is not None
if not time_detected: if not time_detected:
parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query) parsed_since, parsed_until, keywords = await asyncio.to_thread(
parse_time_window, query
)
since = since or parsed_since since = since or parsed_since
until = until or parsed_until until = until or parsed_until
time_detected = keywords != query time_detected = keywords != query
@ -197,23 +228,34 @@ async def diagnose_stream(
keyword_hits: list[SearchResult] = [] keyword_hits: list[SearchResult] = []
window_hits = await asyncio.to_thread( window_hits = await asyncio.to_thread(
lambda: entries_in_window( lambda: entries_in_window(
db_path, since, until, db_path,
source_filter=source_filter, limit=200, since,
until,
source_filter=source_filter,
limit=200,
) )
) )
else: else:
keyword_hits, window_hits = await asyncio.gather( keyword_hits, window_hits = await asyncio.gather(
asyncio.to_thread( asyncio.to_thread(
lambda: search( lambda: search(
db_path, keywords, db_path,
source_filter=source_filter, since=since, until=until, keywords,
limit=150, or_mode=True, source_filter=source_filter,
since=since,
until=until,
limit=150,
or_mode=True,
) )
), ),
asyncio.to_thread( asyncio.to_thread(
lambda: entries_in_window( lambda: entries_in_window(
db_path, since, until, db_path,
source_filter=source_filter, limit=50, per_source_cap=15, since,
until,
source_filter=source_filter,
limit=50,
per_source_cap=15,
) )
), ),
) )
@ -225,7 +267,9 @@ async def diagnose_stream(
seen.add(r.entry_id) seen.add(r.entry_id)
merged.append(r) merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[
:200
]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {} by_source: dict[str, int] = {}
@ -251,7 +295,14 @@ async def diagnose_stream(
if llm_url and llm_model and combined: if llm_url and llm_model and combined:
yield {"type": "status", "message": "Analyzing with LLM…"} yield {"type": "status", "message": "Analyzing with LLM…"}
reasoning = await asyncio.to_thread( reasoning = await asyncio.to_thread(
lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block) lambda: summarize(
query,
combined,
llm_url,
llm_model,
llm_api_key,
context_block=context_block,
)
) )
if reasoning: if reasoning:
yield {"type": "reasoning", "text": reasoning} yield {"type": "reasoning", "text": reasoning}

View file

@ -1,4 +1,5 @@
"""Pipeline data types for the multi-agent diagnose pipeline.""" """Pipeline data types for the multi-agent diagnose pipeline."""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@ -7,53 +8,63 @@ from typing import Literal
SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"] SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"]
@dataclass @dataclass(frozen=True)
class EventCluster: class EventCluster:
"""A time-correlated group of log entries within the timeline."""
cluster_id: str cluster_id: str
entries: list[str] # entry_id refs entries: tuple[str, ...] # entry_id refs
start_iso: str | None start_iso: str | None
end_iso: str | None end_iso: str | None
duration_seconds: float duration_seconds: float
source_ids: list[str] source_ids: tuple[str, ...]
pattern_tags: list[str] pattern_tags: tuple[str, ...]
severity: SeverityLabel # highest severity from raw text severity: SeverityLabel
burst: bool burst: bool
gap_before_seconds: float gap_before_seconds: float
representative_text: str representative_text: str
@dataclass @dataclass(frozen=True)
class TimelineResult: class TimelineResult:
clusters: list[EventCluster] """Structured timeline of event clusters built from log entries."""
clusters: tuple[EventCluster, ...]
total_entries: int total_entries: int
window_start: str | None window_start: str | None
window_end: str | None window_end: str | None
gap_count: int gap_count: int
burst_count: int burst_count: int
dominant_sources: list[str] dominant_sources: tuple[str, ...]
@dataclass @dataclass(frozen=True)
class ClassifiedTimeline: class ClassifiedTimeline:
"""Timeline annotated with ML-assigned severity per cluster."""
timeline: TimelineResult timeline: TimelineResult
cluster_severities: dict[str, SeverityLabel] cluster_severities: dict[str, SeverityLabel]
classifier_used: Literal["ml", "pattern_tags", "regex"] classifier_used: Literal["ml", "pattern_tags", "regex"]
model_id: str | None model_id: str | None
@dataclass @dataclass(frozen=True)
class Hypothesis: class Hypothesis:
"""A root-cause hypothesis generated by Stage 3."""
hypothesis_id: str hypothesis_id: str
title: str title: str
description: str description: str
confidence: float confidence: float
supporting_cluster_ids: list[str] supporting_cluster_ids: tuple[str, ...]
runbook_refs: list[str] runbook_refs: tuple[str, ...]
severity: SeverityLabel severity: SeverityLabel
@dataclass @dataclass(frozen=True)
class RankedHypothesis: class RankedHypothesis:
"""A hypothesis enriched by Stage 4 false-positive suppression."""
hypothesis: Hypothesis hypothesis: Hypothesis
novelty_score: float novelty_score: float
similarity_to_known: float similarity_to_known: float

View file

@ -1,4 +1,5 @@
"""Multi-agent diagnose pipeline orchestrator — stub (Task 1).""" """Multi-agent diagnose pipeline orchestrator — stub (Task 1)."""
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any