FTS5 bulk-insert write locks starved the incident API and bundle endpoints during log bursts (sonarr/radarr, high-volume docker sources). Fix mirrors the context_facts split (context -> turnstone-context.db): - Add INCIDENTS_DB_PATH / TURNSTONE_INCIDENTS_DB env var in rest.py - Add _INCIDENTS_SCHEMA, ensure_incidents_schema(), and migrate_incidents_to_dedicated_db() in glean/pipeline.py - Stub out incidents/received_bundles/sent_bundles in _SCHEMA (no-op CREATE IF NOT EXISTS) so legacy single-file deployments still open - Thread incidents_db_path through diagnose_stream -> run_pipeline -> FalsePositiveSuppressor.suppress -> _fetch_resolved_incidents - One-shot migration on startup: copy existing rows from main DB to incidents DB via INSERT OR IGNORE (idempotent, safe to re-run) - Fix test_blocklist_endpoints fixtures to patch CONTEXT_DB_PATH and INCIDENTS_DB_PATH alongside DB_PATH (worktree has no data/ dir) 372 tests passing. Closes: #60
173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
"""Multi-agent diagnose pipeline orchestrator — Stage 1–5 wiring."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import dataclasses
|
||
import logging
|
||
import os
|
||
from collections.abc import AsyncGenerator
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
# Optional ML classifier model for Stage 2.
|
||
# When empty (default), Stage 2 falls back to pattern_tags then regex.
|
||
# Set TURNSTONE_CLASSIFIER_MODEL to a HuggingFace model ID to enable ML classification.
|
||
# Recommended: byviz/bylastic_classification_logs (DistilBERT, ~300MB)
|
||
_CLASSIFIER_MODEL: str = os.environ.get("TURNSTONE_CLASSIFIER_MODEL", "")
|
||
|
||
from app.context.retriever import RetrievedContext
|
||
from app.services.diagnose.classifier import SeverityClassifier
|
||
from app.services.diagnose.hypothesizer import RootCauseHypothesizer
|
||
from app.services.diagnose.suppressor import FalsePositiveSuppressor
|
||
from app.services.diagnose.synthesizer import SummarySynthesizer
|
||
from app.services.diagnose.timeline import TimelineReconstructor
|
||
from app.services.search import SearchResult
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def run_pipeline(
|
||
db_path: Path,
|
||
entries: list[SearchResult],
|
||
ctx: RetrievedContext,
|
||
query: str,
|
||
since: str | None, # reserved for future range-filtering in stage queries (#29 follow-up)
|
||
until: str | None, # reserved for future range-filtering in stage queries (#29 follow-up)
|
||
llm_url: str | None,
|
||
llm_model: str | None,
|
||
llm_api_key: str | None,
|
||
tech_level: str = "sysadmin",
|
||
incidents_db_path: Path | None = None,
|
||
) -> AsyncGenerator[dict[str, Any], None]:
|
||
"""Async generator that runs all 5 pipeline stages and yields SSE event dicts.
|
||
|
||
Stages:
|
||
1. TimelineReconstructor — cluster log entries by time
|
||
2. SeverityClassifier — annotate clusters with severity
|
||
3. RootCauseHypothesizer — generate hypotheses via LLM
|
||
4. FalsePositiveSuppressor — rank and suppress known patterns
|
||
5. SummarySynthesizer — produce a narrative diagnosis
|
||
|
||
Yields events in order:
|
||
{"type": "status", "message": "Building timeline…"}
|
||
{"type": "pipeline_stage", "stage": 1, ...}
|
||
{"type": "pipeline_stage", "stage": 2, ...}
|
||
{"type": "pipeline_stage", "stage": 3, ...}
|
||
{"type": "pipeline_stage", "stage": 4, ...}
|
||
{"type": "hypotheses", "data": [...]}
|
||
{"type": "status", "message": "Synthesizing…"}
|
||
{"type": "reasoning", "text": "..."} — only when synthesis produces text
|
||
{"type": "done"}
|
||
"""
|
||
# Stage 1: Timeline reconstruction
|
||
yield {"type": "status", "message": "Building timeline…"}
|
||
try:
|
||
timeline = await asyncio.to_thread(
|
||
TimelineReconstructor().reconstruct, entries
|
||
)
|
||
except Exception as exc:
|
||
logger.exception("Stage 1 (timeline) failed: %s", exc)
|
||
yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"}
|
||
yield {"type": "done"}
|
||
return
|
||
n_clusters = len(timeline.clusters)
|
||
burst = timeline.burst_count
|
||
yield {
|
||
"type": "pipeline_stage",
|
||
"stage": 1,
|
||
"name": "timeline",
|
||
"message": f"Built {n_clusters} clusters, {burst} bursts",
|
||
}
|
||
|
||
# Stage 2: Severity classification
|
||
try:
|
||
classified = await asyncio.to_thread(
|
||
SeverityClassifier(model_id=_CLASSIFIER_MODEL).classify, timeline
|
||
)
|
||
except Exception as exc:
|
||
logger.exception("Stage 2 (classifier) failed: %s", exc)
|
||
yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"}
|
||
yield {"type": "done"}
|
||
return
|
||
sev_counts: dict[str, int] = {}
|
||
for sev in classified.cluster_severities.values():
|
||
sev_counts[sev] = sev_counts.get(sev, 0) + 1
|
||
counts_str = ", ".join(f"{k}:{v}" for k, v in sorted(sev_counts.items()))
|
||
yield {
|
||
"type": "pipeline_stage",
|
||
"stage": 2,
|
||
"name": "classifier",
|
||
"message": f"{classified.classifier_used} classifier: {counts_str}",
|
||
}
|
||
|
||
# Stage 3: Root-cause hypotheses
|
||
try:
|
||
hypotheses = await asyncio.to_thread(
|
||
RootCauseHypothesizer().hypothesize,
|
||
classified,
|
||
ctx,
|
||
query,
|
||
llm_url,
|
||
llm_model,
|
||
llm_api_key,
|
||
)
|
||
except Exception as exc:
|
||
logger.exception("Stage 3 (hypothesizer) failed: %s", exc)
|
||
yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"}
|
||
yield {"type": "done"}
|
||
return
|
||
yield {
|
||
"type": "pipeline_stage",
|
||
"stage": 3,
|
||
"name": "hypotheses",
|
||
"message": f"{len(hypotheses)} hypotheses generated",
|
||
}
|
||
|
||
# Stage 4: False-positive suppression
|
||
_incidents_db = incidents_db_path or db_path
|
||
try:
|
||
ranked = await asyncio.to_thread(
|
||
FalsePositiveSuppressor().suppress, hypotheses, _incidents_db
|
||
)
|
||
except Exception as exc:
|
||
logger.exception("Stage 4 (suppressor) failed: %s", exc)
|
||
yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"}
|
||
yield {"type": "done"}
|
||
return
|
||
suppressed = sum(1 for rh in ranked if rh.suppress)
|
||
active = len(ranked) - suppressed
|
||
yield {
|
||
"type": "pipeline_stage",
|
||
"stage": 4,
|
||
"name": "suppressor",
|
||
"message": f"{suppressed} suppressed, {active} active",
|
||
}
|
||
yield {
|
||
"type": "hypotheses",
|
||
"data": [dataclasses.asdict(rh) for rh in ranked],
|
||
}
|
||
|
||
# Stage 5: Summary synthesis
|
||
yield {"type": "status", "message": "Synthesizing…"}
|
||
try:
|
||
synthesis_text = await asyncio.to_thread(
|
||
SummarySynthesizer().synthesize,
|
||
ranked,
|
||
timeline,
|
||
ctx,
|
||
query,
|
||
llm_url,
|
||
llm_model,
|
||
llm_api_key,
|
||
tech_level,
|
||
)
|
||
except Exception as exc:
|
||
logger.exception("Stage 5 (synthesizer) failed: %s", exc)
|
||
yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"}
|
||
yield {"type": "done"}
|
||
return
|
||
if synthesis_text:
|
||
yield {"type": "reasoning", "text": synthesis_text}
|
||
|
||
yield {"type": "done"}
|