"""Multi-agent diagnose pipeline orchestrator — Stage 1–5 wiring.""" from __future__ import annotations import asyncio import dataclasses import logging import os from collections.abc import AsyncGenerator from pathlib import Path from typing import Any # Optional ML classifier model for Stage 2. # When empty (default), Stage 2 falls back to pattern_tags then regex. # Set TURNSTONE_CLASSIFIER_MODEL to a HuggingFace model ID to enable ML classification. # Recommended: byviz/bylastic_classification_logs (DistilBERT, ~300MB) _CLASSIFIER_MODEL: str = os.environ.get("TURNSTONE_CLASSIFIER_MODEL", "") from app.context.retriever import RetrievedContext from app.services.diagnose.classifier import SeverityClassifier from app.services.diagnose.hypothesizer import RootCauseHypothesizer from app.services.diagnose.suppressor import FalsePositiveSuppressor from app.services.diagnose.synthesizer import SummarySynthesizer from app.services.diagnose.timeline import TimelineReconstructor from app.services.search import SearchResult logger = logging.getLogger(__name__) async def run_pipeline( db_path: Path, entries: list[SearchResult], ctx: RetrievedContext, query: str, since: str | None, # reserved for future range-filtering in stage queries (#29 follow-up) until: str | None, # reserved for future range-filtering in stage queries (#29 follow-up) llm_url: str | None, llm_model: str | None, llm_api_key: str | None, ) -> AsyncGenerator[dict[str, Any], None]: """Async generator that runs all 5 pipeline stages and yields SSE event dicts. Stages: 1. TimelineReconstructor — cluster log entries by time 2. SeverityClassifier — annotate clusters with severity 3. RootCauseHypothesizer — generate hypotheses via LLM 4. FalsePositiveSuppressor — rank and suppress known patterns 5. SummarySynthesizer — produce a narrative diagnosis Yields events in order: {"type": "status", "message": "Building timeline…"} {"type": "pipeline_stage", "stage": 1, ...} {"type": "pipeline_stage", "stage": 2, ...} {"type": "pipeline_stage", "stage": 3, ...} {"type": "pipeline_stage", "stage": 4, ...} {"type": "hypotheses", "data": [...]} {"type": "status", "message": "Synthesizing…"} {"type": "reasoning", "text": "..."} — only when synthesis produces text {"type": "done"} """ # Stage 1: Timeline reconstruction yield {"type": "status", "message": "Building timeline…"} try: timeline = await asyncio.to_thread( TimelineReconstructor().reconstruct, entries ) except Exception as exc: logger.exception("Stage 1 (timeline) failed: %s", exc) yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"} yield {"type": "done"} return n_clusters = len(timeline.clusters) burst = timeline.burst_count yield { "type": "pipeline_stage", "stage": 1, "name": "timeline", "message": f"Built {n_clusters} clusters, {burst} bursts", } # Stage 2: Severity classification try: classified = await asyncio.to_thread( SeverityClassifier(model_id=_CLASSIFIER_MODEL).classify, timeline ) except Exception as exc: logger.exception("Stage 2 (classifier) failed: %s", exc) yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"} yield {"type": "done"} return sev_counts: dict[str, int] = {} for sev in classified.cluster_severities.values(): sev_counts[sev] = sev_counts.get(sev, 0) + 1 counts_str = ", ".join(f"{k}:{v}" for k, v in sorted(sev_counts.items())) yield { "type": "pipeline_stage", "stage": 2, "name": "classifier", "message": f"{classified.classifier_used} classifier: {counts_str}", } # Stage 3: Root-cause hypotheses try: hypotheses = await asyncio.to_thread( RootCauseHypothesizer().hypothesize, classified, ctx, query, llm_url, llm_model, llm_api_key, ) except Exception as exc: logger.exception("Stage 3 (hypothesizer) failed: %s", exc) yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"} yield {"type": "done"} return yield { "type": "pipeline_stage", "stage": 3, "name": "hypotheses", "message": f"{len(hypotheses)} hypotheses generated", } # Stage 4: False-positive suppression try: ranked = await asyncio.to_thread( FalsePositiveSuppressor().suppress, hypotheses, db_path ) except Exception as exc: logger.exception("Stage 4 (suppressor) failed: %s", exc) yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"} yield {"type": "done"} return suppressed = sum(1 for rh in ranked if rh.suppress) active = len(ranked) - suppressed yield { "type": "pipeline_stage", "stage": 4, "name": "suppressor", "message": f"{suppressed} suppressed, {active} active", } yield { "type": "hypotheses", "data": [dataclasses.asdict(rh) for rh in ranked], } # Stage 5: Summary synthesis yield {"type": "status", "message": "Synthesizing…"} try: synthesis_text = await asyncio.to_thread( SummarySynthesizer().synthesize, ranked, timeline, ctx, query, llm_url, llm_model, llm_api_key, ) except Exception as exc: logger.exception("Stage 5 (synthesizer) failed: %s", exc) yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"} yield {"type": "done"} return if synthesis_text: yield {"type": "reasoning", "text": synthesis_text} yield {"type": "done"}