"""Multi-agent diagnose pipeline orchestrator — Stage 1–5 wiring.""" from __future__ import annotations import asyncio import dataclasses import logging from collections.abc import AsyncGenerator from pathlib import Path from typing import Any from app.context.retriever import RetrievedContext from app.services.diagnose.classifier import SeverityClassifier from app.services.diagnose.hypothesizer import RootCauseHypothesizer from app.services.diagnose.suppressor import FalsePositiveSuppressor from app.services.diagnose.synthesizer import SummarySynthesizer from app.services.diagnose.timeline import TimelineReconstructor from app.services.search import SearchResult logger = logging.getLogger(__name__) async def run_pipeline( db_path: Path, entries: list[SearchResult], ctx: RetrievedContext, query: str, since: str | None, # reserved for future range-filtering in stage queries (#29 follow-up) until: str | None, # reserved for future range-filtering in stage queries (#29 follow-up) llm_url: str | None, llm_model: str | None, llm_api_key: str | None, ) -> AsyncGenerator[dict[str, Any], None]: """Async generator that runs all 5 pipeline stages and yields SSE event dicts. Stages: 1. TimelineReconstructor — cluster log entries by time 2. SeverityClassifier — annotate clusters with severity 3. RootCauseHypothesizer — generate hypotheses via LLM 4. FalsePositiveSuppressor — rank and suppress known patterns 5. SummarySynthesizer — produce a narrative diagnosis Yields events in order: {"type": "status", "message": "Building timeline…"} {"type": "pipeline_stage", "stage": 1, ...} {"type": "pipeline_stage", "stage": 2, ...} {"type": "pipeline_stage", "stage": 3, ...} {"type": "pipeline_stage", "stage": 4, ...} {"type": "hypotheses", "data": [...]} {"type": "status", "message": "Synthesizing…"} {"type": "reasoning", "text": "..."} — only when synthesis produces text {"type": "done"} """ # Stage 1: Timeline reconstruction yield {"type": "status", "message": "Building timeline…"} timeline = await asyncio.to_thread( TimelineReconstructor().reconstruct, entries ) n_clusters = len(timeline.clusters) burst = timeline.burst_count yield { "type": "pipeline_stage", "stage": 1, "name": "timeline", "message": f"Built {n_clusters} clusters, {burst} bursts", } # Stage 2: Severity classification classified = await asyncio.to_thread( SeverityClassifier().classify, timeline ) sev_counts: dict[str, int] = {} for sev in classified.cluster_severities.values(): sev_counts[sev] = sev_counts.get(sev, 0) + 1 counts_str = ", ".join(f"{k}:{v}" for k, v in sorted(sev_counts.items())) yield { "type": "pipeline_stage", "stage": 2, "name": "classifier", "message": f"{classified.classifier_used} classifier: {counts_str}", } # Stage 3: Root-cause hypotheses hypotheses = await asyncio.to_thread( RootCauseHypothesizer().hypothesize, classified, ctx, query, llm_url, llm_model, llm_api_key, ) yield { "type": "pipeline_stage", "stage": 3, "name": "hypotheses", "message": f"{len(hypotheses)} hypotheses generated", } # Stage 4: False-positive suppression ranked = await asyncio.to_thread( FalsePositiveSuppressor().suppress, hypotheses, db_path ) suppressed = sum(1 for rh in ranked if rh.suppress) active = len(ranked) - suppressed yield { "type": "pipeline_stage", "stage": 4, "name": "suppressor", "message": f"{suppressed} suppressed, {active} active", } yield { "type": "hypotheses", "data": [dataclasses.asdict(rh) for rh in ranked], } # Stage 5: Summary synthesis yield {"type": "status", "message": "Synthesizing…"} synthesis_text = await asyncio.to_thread( SummarySynthesizer().synthesize, ranked, timeline, ctx, query, llm_url, llm_model, llm_api_key, ) if synthesis_text: yield {"type": "reasoning", "text": synthesis_text} yield {"type": "done"}