turnstone/app/services/diagnose/pipeline.py
pyr0ball bd3923e163 fix: split incidents tables to dedicated turnstone-incidents.db (#60)
FTS5 bulk-insert write locks starved the incident API and bundle endpoints
during log bursts (sonarr/radarr, high-volume docker sources). Fix mirrors
the context_facts split (context -> turnstone-context.db):

- Add INCIDENTS_DB_PATH / TURNSTONE_INCIDENTS_DB env var in rest.py
- Add _INCIDENTS_SCHEMA, ensure_incidents_schema(), and
  migrate_incidents_to_dedicated_db() in glean/pipeline.py
- Stub out incidents/received_bundles/sent_bundles in _SCHEMA (no-op
  CREATE IF NOT EXISTS) so legacy single-file deployments still open
- Thread incidents_db_path through diagnose_stream -> run_pipeline ->
  FalsePositiveSuppressor.suppress -> _fetch_resolved_incidents
- One-shot migration on startup: copy existing rows from main DB to
  incidents DB via INSERT OR IGNORE (idempotent, safe to re-run)
- Fix test_blocklist_endpoints fixtures to patch CONTEXT_DB_PATH and
  INCIDENTS_DB_PATH alongside DB_PATH (worktree has no data/ dir)

372 tests passing.

Closes: #60
2026-06-01 15:54:23 -07:00

173 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Multi-agent diagnose pipeline orchestrator — Stage 15 wiring."""
from __future__ import annotations
import asyncio
import dataclasses
import logging
import os
from collections.abc import AsyncGenerator
from pathlib import Path
from typing import Any
# Optional ML classifier model for Stage 2.
# When empty (default), Stage 2 falls back to pattern_tags then regex.
# Set TURNSTONE_CLASSIFIER_MODEL to a HuggingFace model ID to enable ML classification.
# Recommended: byviz/bylastic_classification_logs (DistilBERT, ~300MB)
_CLASSIFIER_MODEL: str = os.environ.get("TURNSTONE_CLASSIFIER_MODEL", "")
from app.context.retriever import RetrievedContext
from app.services.diagnose.classifier import SeverityClassifier
from app.services.diagnose.hypothesizer import RootCauseHypothesizer
from app.services.diagnose.suppressor import FalsePositiveSuppressor
from app.services.diagnose.synthesizer import SummarySynthesizer
from app.services.diagnose.timeline import TimelineReconstructor
from app.services.search import SearchResult
logger = logging.getLogger(__name__)
async def run_pipeline(
db_path: Path,
entries: list[SearchResult],
ctx: RetrievedContext,
query: str,
since: str | None, # reserved for future range-filtering in stage queries (#29 follow-up)
until: str | None, # reserved for future range-filtering in stage queries (#29 follow-up)
llm_url: str | None,
llm_model: str | None,
llm_api_key: str | None,
tech_level: str = "sysadmin",
incidents_db_path: Path | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""Async generator that runs all 5 pipeline stages and yields SSE event dicts.
Stages:
1. TimelineReconstructor — cluster log entries by time
2. SeverityClassifier — annotate clusters with severity
3. RootCauseHypothesizer — generate hypotheses via LLM
4. FalsePositiveSuppressor — rank and suppress known patterns
5. SummarySynthesizer — produce a narrative diagnosis
Yields events in order:
{"type": "status", "message": "Building timeline…"}
{"type": "pipeline_stage", "stage": 1, ...}
{"type": "pipeline_stage", "stage": 2, ...}
{"type": "pipeline_stage", "stage": 3, ...}
{"type": "pipeline_stage", "stage": 4, ...}
{"type": "hypotheses", "data": [...]}
{"type": "status", "message": "Synthesizing…"}
{"type": "reasoning", "text": "..."} — only when synthesis produces text
{"type": "done"}
"""
# Stage 1: Timeline reconstruction
yield {"type": "status", "message": "Building timeline…"}
try:
timeline = await asyncio.to_thread(
TimelineReconstructor().reconstruct, entries
)
except Exception as exc:
logger.exception("Stage 1 (timeline) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 1 (timeline)"}
yield {"type": "done"}
return
n_clusters = len(timeline.clusters)
burst = timeline.burst_count
yield {
"type": "pipeline_stage",
"stage": 1,
"name": "timeline",
"message": f"Built {n_clusters} clusters, {burst} bursts",
}
# Stage 2: Severity classification
try:
classified = await asyncio.to_thread(
SeverityClassifier(model_id=_CLASSIFIER_MODEL).classify, timeline
)
except Exception as exc:
logger.exception("Stage 2 (classifier) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 2 (classifier)"}
yield {"type": "done"}
return
sev_counts: dict[str, int] = {}
for sev in classified.cluster_severities.values():
sev_counts[sev] = sev_counts.get(sev, 0) + 1
counts_str = ", ".join(f"{k}:{v}" for k, v in sorted(sev_counts.items()))
yield {
"type": "pipeline_stage",
"stage": 2,
"name": "classifier",
"message": f"{classified.classifier_used} classifier: {counts_str}",
}
# Stage 3: Root-cause hypotheses
try:
hypotheses = await asyncio.to_thread(
RootCauseHypothesizer().hypothesize,
classified,
ctx,
query,
llm_url,
llm_model,
llm_api_key,
)
except Exception as exc:
logger.exception("Stage 3 (hypothesizer) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 3 (hypothesizer)"}
yield {"type": "done"}
return
yield {
"type": "pipeline_stage",
"stage": 3,
"name": "hypotheses",
"message": f"{len(hypotheses)} hypotheses generated",
}
# Stage 4: False-positive suppression
_incidents_db = incidents_db_path or db_path
try:
ranked = await asyncio.to_thread(
FalsePositiveSuppressor().suppress, hypotheses, _incidents_db
)
except Exception as exc:
logger.exception("Stage 4 (suppressor) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 4 (suppressor)"}
yield {"type": "done"}
return
suppressed = sum(1 for rh in ranked if rh.suppress)
active = len(ranked) - suppressed
yield {
"type": "pipeline_stage",
"stage": 4,
"name": "suppressor",
"message": f"{suppressed} suppressed, {active} active",
}
yield {
"type": "hypotheses",
"data": [dataclasses.asdict(rh) for rh in ranked],
}
# Stage 5: Summary synthesis
yield {"type": "status", "message": "Synthesizing…"}
try:
synthesis_text = await asyncio.to_thread(
SummarySynthesizer().synthesize,
ranked,
timeline,
ctx,
query,
llm_url,
llm_model,
llm_api_key,
tech_level,
)
except Exception as exc:
logger.exception("Stage 5 (synthesizer) failed: %s", exc)
yield {"type": "error", "message": "Pipeline error in stage 5 (synthesizer)"}
yield {"type": "done"}
return
if synthesis_text:
yield {"type": "reasoning", "text": synthesis_text}
yield {"type": "done"}