turnstone/app/services/diagnose/pipeline.py

132 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Multi-agent diagnose pipeline orchestrator — Stage 15 wiring."""
from __future__ import annotations
import asyncio
import dataclasses
import logging
from collections.abc import AsyncGenerator
from pathlib import Path
from typing import Any
from app.context.retriever import RetrievedContext
from app.services.diagnose.classifier import SeverityClassifier
from app.services.diagnose.hypothesizer import RootCauseHypothesizer
from app.services.diagnose.suppressor import FalsePositiveSuppressor
from app.services.diagnose.synthesizer import SummarySynthesizer
from app.services.diagnose.timeline import TimelineReconstructor
from app.services.search import SearchResult
logger = logging.getLogger(__name__)
async def run_pipeline(
db_path: Path,
entries: list[SearchResult],
ctx: RetrievedContext,
query: str,
since: str | None, # reserved for future range-filtering in stage queries (#29 follow-up)
until: str | None, # reserved for future range-filtering in stage queries (#29 follow-up)
llm_url: str | None,
llm_model: str | None,
llm_api_key: str | None,
) -> AsyncGenerator[dict[str, Any], None]:
"""Async generator that runs all 5 pipeline stages and yields SSE event dicts.
Stages:
1. TimelineReconstructor — cluster log entries by time
2. SeverityClassifier — annotate clusters with severity
3. RootCauseHypothesizer — generate hypotheses via LLM
4. FalsePositiveSuppressor — rank and suppress known patterns
5. SummarySynthesizer — produce a narrative diagnosis
Yields events in order:
{"type": "status", "message": "Building timeline…"}
{"type": "pipeline_stage", "stage": 1, ...}
{"type": "pipeline_stage", "stage": 2, ...}
{"type": "pipeline_stage", "stage": 3, ...}
{"type": "pipeline_stage", "stage": 4, ...}
{"type": "hypotheses", "data": [...]}
{"type": "status", "message": "Synthesizing…"}
{"type": "reasoning", "text": "..."} — only when synthesis produces text
{"type": "done"}
"""
# Stage 1: Timeline reconstruction
yield {"type": "status", "message": "Building timeline…"}
timeline = await asyncio.to_thread(
TimelineReconstructor().reconstruct, entries
)
n_clusters = len(timeline.clusters)
burst = timeline.burst_count
yield {
"type": "pipeline_stage",
"stage": 1,
"name": "timeline",
"message": f"Built {n_clusters} clusters, {burst} bursts",
}
# Stage 2: Severity classification
classified = await asyncio.to_thread(
SeverityClassifier().classify, timeline
)
sev_counts: dict[str, int] = {}
for sev in classified.cluster_severities.values():
sev_counts[sev] = sev_counts.get(sev, 0) + 1
counts_str = ", ".join(f"{k}:{v}" for k, v in sorted(sev_counts.items()))
yield {
"type": "pipeline_stage",
"stage": 2,
"name": "classifier",
"message": f"{classified.classifier_used} classifier: {counts_str}",
}
# Stage 3: Root-cause hypotheses
hypotheses = await asyncio.to_thread(
RootCauseHypothesizer().hypothesize,
classified,
ctx,
query,
llm_url,
llm_model,
llm_api_key,
)
yield {
"type": "pipeline_stage",
"stage": 3,
"name": "hypotheses",
"message": f"{len(hypotheses)} hypotheses generated",
}
# Stage 4: False-positive suppression
ranked = await asyncio.to_thread(
FalsePositiveSuppressor().suppress, hypotheses, db_path
)
suppressed = sum(1 for rh in ranked if rh.suppress)
active = len(ranked) - suppressed
yield {
"type": "pipeline_stage",
"stage": 4,
"name": "suppressor",
"message": f"{suppressed} suppressed, {active} active",
}
yield {
"type": "hypotheses",
"data": [dataclasses.asdict(rh) for rh in ranked],
}
# Stage 5: Summary synthesis
yield {"type": "status", "message": "Synthesizing…"}
synthesis_text = await asyncio.to_thread(
SummarySynthesizer().synthesize,
ranked,
timeline,
ctx,
query,
llm_url,
llm_model,
llm_api_key,
)
if synthesis_text:
yield {"type": "reasoning", "text": synthesis_text}
yield {"type": "done"}