refactor: convert diagnose module to package for multi-agent pipeline (issue #29)

- Move app/services/diagnose.py verbatim to app/services/diagnose/legacy.py
- Create app/services/diagnose/__init__.py with full implementation so that
  patch('app.services.diagnose._HAS_DATEPARSER') targets the correct namespace
  and all 303 existing tests continue to pass without modification
- Add app/services/diagnose/models.py with 5 pipeline dataclasses:
  EventCluster, TimelineResult, ClassifiedTimeline, Hypothesis, RankedHypothesis
- Add app/services/diagnose/pipeline.py with run_pipeline() stub (Task 6)
- Add MULTI_AGENT_ENABLED feature flag (off by default via env var)
- Zero behavior change; ruff clean

Closes: #29
This commit is contained in:
pyr0ball 2026-05-25 11:12:39 -07:00
parent 5f32a6678d
commit 664ab50433
4 changed files with 366 additions and 0 deletions

View file

@ -0,0 +1,294 @@
"""Frictionless diagnose service — NL time extraction + layered log search.
This module is the public interface for the diagnose package.
Full implementation lives here so that patch("app.services.diagnose._HAS_DATEPARSER")
and patch("app.services.diagnose._search_dates") continue to target the correct
namespace, preserving backward compatibility with existing tests.
The verbatim original is preserved in legacy.py for reference.
"""
from __future__ import annotations
import asyncio
import dataclasses
import logging
import os
import re
from collections.abc import AsyncGenerator
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from app.context.retriever import retrieve_context, format_context_block
from app.services.llm import summarize
from app.services.search import SearchResult, entries_in_window, search
logger = logging.getLogger(__name__)
try:
from dateparser.search import search_dates as _search_dates # type: ignore[import]
_HAS_DATEPARSER = True
except ImportError:
_search_dates = None # type: ignore[assignment]
_HAS_DATEPARSER = False
_RELATIVE_RE = re.compile(
r"\b(?:last|past)\s+(?:(?P<n>\d+)|(?P<approx>a\s+few|few|couple(?:\s+of)?|several))?\s*(?P<unit>minute|hour|day|week)s?\b",
re.IGNORECASE,
)
_RELATIVE_UNITS = {"minute": 1, "hour": 60, "day": 1440, "week": 10080}
# Fuzzy quantifiers map to a reasonable span so "last few hours" → 3h window
_APPROX_N = 3
def _relative_window(match: re.Match) -> tuple[str, str]:
"""Convert a relative time match to (since_iso, until_iso)."""
n_str = match.group("n")
approx = match.group("approx")
unit = match.group("unit").lower()
n = int(n_str) if n_str else (_APPROX_N if approx else 1)
minutes = n * _RELATIVE_UNITS[unit]
return _last_n_minutes(minutes), _now_iso()
def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
"""Extract a time window from a natural-language query string.
Returns (since_iso, until_iso, keywords) where keywords is the query with
the matched time phrase stripped. Falls back to last-60-min window.
"""
# Handle relative expressions first ("last hour", "past 30 minutes", etc.)
# dateparser misinterprets these as absolute times.
m = _RELATIVE_RE.search(query)
if m:
since, until = _relative_window(m)
keywords = re.sub(r"\s{2,}", " ", query[:m.start()] + query[m.end():]).strip()
return since, until, keywords or query
if _HAS_DATEPARSER and _search_dates is not None:
# Tell dateparser what timezone the user is in so "3:35 am" means local time.
# PREFER_DAY_OF_MONTH is unused here but PREFER_DATES_FROM=past ensures
# "3:35 am" resolves to the most recent past occurrence, not a future one.
local_offset = datetime.now().astimezone().utcoffset()
offset_h = int((local_offset.total_seconds() if local_offset else 0) / 3600)
tz_str = f"UTC{'+' if offset_h >= 0 else ''}{offset_h}"
try:
results = _search_dates(
query,
languages=["en"],
settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True},
)
except Exception:
logger.warning("dateparser failed on query %r — falling back to 60-min window", query)
results = None
if results:
phrase, dt = results[0]
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare
since = (dt - timedelta(minutes=30)).isoformat()
until = (dt + timedelta(minutes=30)).isoformat()
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
return since, until, keywords or query
return _last_n_minutes(60), _now_iso(), query
def diagnose(
db_path: Path,
query: str,
since: str | None = None,
until: str | None = None,
source_filter: str | None = None,
llm_url: str | None = None,
llm_model: str | None = None,
llm_api_key: str | None = None,
) -> dict[str, Any]:
"""Run layered log search with NL time extraction. Returns summary + entries."""
time_detected = since is not None and until is not None
if not time_detected:
parsed_since, parsed_until, keywords = parse_time_window(query)
since = since or parsed_since
until = until or parsed_until
time_detected = keywords != query
else:
keywords = query
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True)
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15)
seen: set[str] = set()
merged: list[SearchResult] = []
for r in keyword_hits + window_hits:
if r.entry_id not in seen:
seen.add(r.entry_id)
merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {}
for r in combined:
sev = (r.severity or "INFO").upper()
if sev in by_severity:
by_severity[sev] += 1
by_source[r.source_id] = by_source.get(r.source_id, 0) + 1
reasoning: str | None = None
if llm_url and llm_model:
reasoning = summarize(query, combined, llm_url=llm_url, llm_model=llm_model, api_key=llm_api_key)
return {
"summary": {
"total": len(combined),
"window_start": since,
"window_end": until,
"time_detected": time_detected,
"by_severity": by_severity,
"by_source": by_source,
},
"reasoning": reasoning,
"entries": combined,
}
async def diagnose_stream(
db_path: Path,
query: str,
since: str | None = None,
until: str | None = None,
source_filter: str | None = None,
llm_url: str | None = None,
llm_model: str | None = None,
llm_api_key: str | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""Async generator yielding SSE event dicts for the diagnose pipeline.
Yields events in order:
{"type":"status","message":""} pipeline progress
{"type":"summary","data":{}} window + severity counts (fast, from DB)
{"type":"entries","data":[]} log entries (fast, from DB)
{"type":"reasoning","text":""} LLM analysis (slow, optional)
{"type":"done"}
"""
keywords = query.strip()
source_browse = not keywords and source_filter is not None
if source_browse:
# No keyword — browsing a source directly. Use 24h window; skip FTS entirely.
yield {"type": "status", "message": f"Loading {source_filter}"}
since = since or _last_n_minutes(60 * 24)
until = until or _now_iso()
time_detected = False
else:
yield {"type": "status", "message": "Parsing time window…"}
time_detected = since is not None and until is not None
if not time_detected:
parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query)
since = since or parsed_since
until = until or parsed_until
time_detected = keywords != query
yield {"type": "status", "message": "Loading environment context…"}
ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query))
context_block = format_context_block(ctx)
yield {
"type": "context",
"facts": ctx.facts,
"chunks": ctx.chunks,
}
yield {"type": "status", "message": "Searching logs…"}
if source_browse:
keyword_hits: list[SearchResult] = []
window_hits = await asyncio.to_thread(
lambda: entries_in_window(
db_path, since, until,
source_filter=source_filter, limit=200,
)
)
else:
keyword_hits, window_hits = await asyncio.gather(
asyncio.to_thread(
lambda: search(
db_path, keywords,
source_filter=source_filter, since=since, until=until,
limit=150, or_mode=True,
)
),
asyncio.to_thread(
lambda: entries_in_window(
db_path, since, until,
source_filter=source_filter, limit=50, per_source_cap=15,
)
),
)
seen: set[str] = set()
merged: list[SearchResult] = []
for r in keyword_hits + window_hits:
if r.entry_id not in seen:
seen.add(r.entry_id)
merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {}
for r in combined:
sev = (r.severity or "INFO").upper()
if sev in by_severity:
by_severity[sev] += 1
by_source[r.source_id] = by_source.get(r.source_id, 0) + 1
yield {
"type": "summary",
"data": {
"total": len(combined),
"window_start": since,
"window_end": until,
"time_detected": time_detected,
"by_severity": by_severity,
"by_source": by_source,
},
}
yield {"type": "entries", "data": [dataclasses.asdict(r) for r in combined]}
if llm_url and llm_model and combined:
yield {"type": "status", "message": "Analyzing with LLM…"}
reasoning = await asyncio.to_thread(
lambda: summarize(query, combined, llm_url, llm_model, llm_api_key, context_block=context_block)
)
if reasoning:
yield {"type": "reasoning", "text": reasoning}
yield {"type": "done"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _last_n_minutes(n: int) -> str:
return (datetime.now(timezone.utc) - timedelta(minutes=n)).isoformat()
__all__ = [
"diagnose",
"diagnose_stream",
"parse_time_window",
"_now_iso",
"_last_n_minutes",
"_HAS_DATEPARSER",
"_search_dates",
"_RELATIVE_RE",
"_RELATIVE_UNITS",
"_APPROX_N",
"_relative_window",
]
# Feature flag for Task 6
MULTI_AGENT_ENABLED = os.getenv("TURNSTONE_MULTI_AGENT_DIAGNOSE", "false").lower() == "true"

View file

@ -0,0 +1,61 @@
"""Pipeline data types for the multi-agent diagnose pipeline."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
SeverityLabel = Literal["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "UNKNOWN"]
@dataclass
class EventCluster:
cluster_id: str
entries: list[str] # entry_id refs
start_iso: str | None
end_iso: str | None
duration_seconds: float
source_ids: list[str]
pattern_tags: list[str]
severity: SeverityLabel # highest severity from raw text
burst: bool
gap_before_seconds: float
representative_text: str
@dataclass
class TimelineResult:
clusters: list[EventCluster]
total_entries: int
window_start: str | None
window_end: str | None
gap_count: int
burst_count: int
dominant_sources: list[str]
@dataclass
class ClassifiedTimeline:
timeline: TimelineResult
cluster_severities: dict[str, SeverityLabel]
classifier_used: Literal["ml", "pattern_tags", "regex"]
model_id: str | None
@dataclass
class Hypothesis:
hypothesis_id: str
title: str
description: str
confidence: float
supporting_cluster_ids: list[str]
runbook_refs: list[str]
severity: SeverityLabel
@dataclass
class RankedHypothesis:
hypothesis: Hypothesis
novelty_score: float
similarity_to_known: float
suppress: bool
suppression_reason: str | None

View file

@ -0,0 +1,11 @@
"""Multi-agent diagnose pipeline orchestrator — stub (Task 1)."""
from __future__ import annotations
from typing import Any
# run_pipeline() will be implemented in Task 6
async def run_pipeline(*args: Any, **kwargs: Any) -> None:
"""Placeholder — implemented in Task 6."""
return None