feat(diagnose): 5-stage multi-agent diagnose pipeline (#29) #39

Merged
pyr0ball merged 17 commits from feat/29-multi-agent-diagnose into main 2026-05-25 19:59:35 -07:00
2 changed files with 91 additions and 60 deletions
Showing only changes of commit 9bfae16b54 - Show all commits

View file

@ -80,8 +80,9 @@ def _fetch_resolved_incidents(db_path: Path) -> list[str]:
except sqlite3.OperationalError as exc: except sqlite3.OperationalError as exc:
logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc) logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc)
return [] return []
except Exception as exc: except sqlite3.Error as exc:
logger.warning("Unexpected error fetching resolved incidents (%s) — treating as empty corpus", exc) # Catches all remaining SQLite-family errors (IntegrityError, DatabaseError, etc.)
logger.warning("Unexpected SQLite error fetching resolved incidents (%s) — treating as empty corpus", exc)
return [] return []
texts: list[str] = [] texts: list[str] = []
@ -116,6 +117,7 @@ class FalsePositiveSuppressor:
) -> None: ) -> None:
self._model_id = model_id self._model_id = model_id
self._device = device self._device = device
# _device stored for future use when get_embedder() supports device selection
self._suppress_threshold = suppress_threshold self._suppress_threshold = suppress_threshold
def suppress( def suppress(
@ -158,38 +160,11 @@ class FalsePositiveSuppressor:
# Embed corpus (with caching). # Embed corpus (with caching).
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path) corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path)
# Rank hypotheses using novelty score. # Score each hypothesis and sort by novelty * confidence descending.
ranked: list[RankedHypothesis] = [] ranked = [
for h in hypotheses: self._score_hypothesis(h, embedder, corpus_embeddings)
h_text = f"{h.title}. {h.description}" for h in hypotheses
try: ]
h_emb = embedder.embed(h_text)
# Convert numpy array to plain Python list for _cosine_similarities
h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb)
sims = _cosine_similarities(h_emb_list, corpus_embeddings)
max_sim = max(sims) if sims else 0.0
except Exception as exc:
logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", h.title, exc)
max_sim = 0.0
max_sim = float(max_sim)
novelty_score = 1.0 - max_sim
suppress = bool(novelty_score < self._suppress_threshold)
suppression_reason = (
f"Similar to resolved incident (similarity {max_sim:.2f})"
if suppress
else None
)
ranked.append(
RankedHypothesis(
hypothesis=h,
novelty_score=novelty_score,
similarity_to_known=max_sim,
suppress=suppress,
suppression_reason=suppression_reason,
)
)
ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True) ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True)
return ranked return ranked
@ -197,12 +172,55 @@ class FalsePositiveSuppressor:
# Private helpers # Private helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _score_hypothesis(
self,
hypothesis: Hypothesis,
embedder: Any,
corpus_embeddings: list[list[float]],
) -> RankedHypothesis:
"""Score a single hypothesis against the resolved incident corpus."""
try:
query_text = f"{hypothesis.title}. {hypothesis.description}"
h_emb = embedder.embed(query_text)
# Convert numpy array to plain Python list for _cosine_similarities
h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb)
sims = _cosine_similarities(h_emb_list, corpus_embeddings)
max_sim = float(max(sims)) if sims else 0.0
except Exception as exc:
# Broad catch is intentional: catches unknown embedder runtime errors
# (e.g. CUDA OOM, backend crashes) so one bad hypothesis never halts the pipeline.
logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", hypothesis.title, exc)
return RankedHypothesis(
hypothesis=hypothesis,
novelty_score=1.0,
similarity_to_known=0.0,
suppress=False,
suppression_reason=None,
)
novelty_score = 1.0 - max_sim
suppress = bool(novelty_score < self._suppress_threshold)
suppression_reason = (
f"Similar to resolved incident (similarity {max_sim:.2f})"
if suppress
else None
)
return RankedHypothesis(
hypothesis=hypothesis,
novelty_score=novelty_score,
similarity_to_known=max_sim,
suppress=suppress,
suppression_reason=suppression_reason,
)
def _load_embedder(self) -> Any | None: def _load_embedder(self) -> Any | None:
"""Load the embedding service. Returns None if unavailable.""" """Load the embedding service. Returns None if unavailable."""
try: try:
from app.services.embeddings import get_embedder from app.services.embeddings import get_embedder
return get_embedder() return get_embedder()
except Exception as exc: except Exception as exc:
# Broad catch is intentional: get_embedder() may raise on import or
# backend init failures from any number of third-party libraries.
logger.warning("Failed to import/initialise embedding service: %s", exc) logger.warning("Failed to import/initialise embedding service: %s", exc)
return None return None
@ -230,6 +248,9 @@ class FalsePositiveSuppressor:
for e in raw_embeddings for e in raw_embeddings
] ]
except Exception as exc: except Exception as exc:
# Broad catch is intentional: embed_batch() may raise from any backend
# (network timeout, CUDA error, etc.) — treat as empty corpus so the
# pipeline can continue without suppression.
logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc) logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc)
return [] return []

View file

@ -4,8 +4,8 @@ All tests use mocking; no real model downloads are made.
""" """
from __future__ import annotations from __future__ import annotations
import math
import sqlite3 import sqlite3
import tempfile
from pathlib import Path from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
@ -37,10 +37,8 @@ def _make_hypothesis(
) )
def _make_db_with_incidents(incidents: list[tuple[str, str]]) -> Path: def _make_db_with_incidents(incidents: list[tuple[str, str]], db_path: Path) -> Path:
"""Create a temporary SQLite database with resolved incidents. Returns the db path.""" """Create a temporary SQLite database with resolved incidents. Returns the db path."""
tmp = tempfile.mktemp(suffix=".db")
db_path = Path(tmp)
with sqlite3.connect(str(db_path)) as conn: with sqlite3.connect(str(db_path)) as conn:
conn.execute( conn.execute(
"CREATE TABLE incidents " "CREATE TABLE incidents "
@ -55,10 +53,8 @@ def _make_db_with_incidents(incidents: list[tuple[str, str]]) -> Path:
return db_path return db_path
def _make_empty_db() -> Path: def _make_empty_db(db_path: Path) -> Path:
"""Create a temporary SQLite DB with no incidents table.""" """Create a temporary SQLite DB with no incidents table."""
tmp = tempfile.mktemp(suffix=".db")
db_path = Path(tmp)
with sqlite3.connect(str(db_path)) as conn: with sqlite3.connect(str(db_path)) as conn:
conn.execute("CREATE TABLE unrelated (id INTEGER PRIMARY KEY)") conn.execute("CREATE TABLE unrelated (id INTEGER PRIMARY KEY)")
conn.commit() conn.commit()
@ -104,13 +100,13 @@ def reset_suppressor_cache():
# Test 1: No model configured — passthrough, ranked by confidence # Test 1: No model configured — passthrough, ranked by confidence
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_no_model_passthrough_ranked_by_confidence(): def test_no_model_passthrough_ranked_by_confidence(tmp_path):
"""model_id='' → all novelty_score=1.0, suppress=False, ranked by confidence desc.""" """model_id='' → all novelty_score=1.0, suppress=False, ranked by confidence desc."""
h_low = _make_hypothesis(title="Low", confidence=0.3) h_low = _make_hypothesis(title="Low", confidence=0.3)
h_high = _make_hypothesis(title="High", confidence=0.9) h_high = _make_hypothesis(title="High", confidence=0.9)
h_mid = _make_hypothesis(title="Mid", confidence=0.6) h_mid = _make_hypothesis(title="Mid", confidence=0.6)
db_path = Path(tempfile.mktemp(suffix=".db")) db_path = tmp_path / "turnstone.db"
suppressor = FalsePositiveSuppressor(model_id="") suppressor = FalsePositiveSuppressor(model_id="")
results = suppressor.suppress([h_low, h_high, h_mid], db_path) results = suppressor.suppress([h_low, h_high, h_mid], db_path)
@ -129,7 +125,7 @@ def test_no_model_passthrough_ranked_by_confidence():
# Test 2: High similarity → suppressed # Test 2: High similarity → suppressed
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_high_similarity_suppresses_hypothesis(): def test_high_similarity_suppresses_hypothesis(tmp_path):
"""Hypothesis with embedding nearly identical to corpus → suppress=True.""" """Hypothesis with embedding nearly identical to corpus → suppress=True."""
identical_vec = [1.0] + [0.0] * 383 identical_vec = [1.0] + [0.0] * 383
corpus_vec = [1.0] + [0.0] * 383 # cosine similarity = 1.0 corpus_vec = [1.0] + [0.0] * 383 # cosine similarity = 1.0
@ -139,7 +135,10 @@ def test_high_similarity_suppresses_hypothesis():
embed_batch_return=[corpus_vec], embed_batch_return=[corpus_vec],
) )
db_path = _make_db_with_incidents([("OOM killer", "Memory pressure caused OOM kill")]) db_path = _make_db_with_incidents(
[("OOM killer", "Memory pressure caused OOM kill")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85)
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
@ -158,7 +157,7 @@ def test_high_similarity_suppresses_hypothesis():
# Test 3: Low similarity → not suppressed # Test 3: Low similarity → not suppressed
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_low_similarity_does_not_suppress(): def test_low_similarity_does_not_suppress(tmp_path):
"""Hypothesis with embedding orthogonal to corpus → suppress=False.""" """Hypothesis with embedding orthogonal to corpus → suppress=False."""
hypothesis_vec = [1.0] + [0.0] * 383 hypothesis_vec = [1.0] + [0.0] * 383
corpus_vec = [0.0, 1.0] + [0.0] * 382 # orthogonal → similarity = 0.0 corpus_vec = [0.0, 1.0] + [0.0] * 382 # orthogonal → similarity = 0.0
@ -168,7 +167,10 @@ def test_low_similarity_does_not_suppress():
embed_batch_return=[corpus_vec], embed_batch_return=[corpus_vec],
) )
db_path = _make_db_with_incidents([("Disk I/O", "Storage saturation caused latency")]) db_path = _make_db_with_incidents(
[("Disk I/O", "Storage saturation caused latency")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85)
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
@ -186,9 +188,9 @@ def test_low_similarity_does_not_suppress():
# Test 4: Empty hypotheses list returns [] # Test 4: Empty hypotheses list returns []
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_empty_hypotheses_returns_empty(): def test_empty_hypotheses_returns_empty(tmp_path):
"""suppress([]) → [] regardless of model or db state.""" """suppress([]) → [] regardless of model or db state."""
db_path = Path(tempfile.mktemp(suffix=".db")) db_path = tmp_path / "turnstone.db"
suppressor = FalsePositiveSuppressor(model_id="test-model") suppressor = FalsePositiveSuppressor(model_id="test-model")
results = suppressor.suppress([], db_path) results = suppressor.suppress([], db_path)
assert results == [] assert results == []
@ -198,7 +200,7 @@ def test_empty_hypotheses_returns_empty():
# Test 5: Ranking by novelty_score * confidence # Test 5: Ranking by novelty_score * confidence
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_ranking_by_novelty_times_confidence(): def test_ranking_by_novelty_times_confidence(tmp_path):
"""Results are sorted by novelty_score * confidence descending.""" """Results are sorted by novelty_score * confidence descending."""
# Hypothesis A: novelty=0.9, confidence=0.5 → score=0.45 # Hypothesis A: novelty=0.9, confidence=0.5 → score=0.45
# Hypothesis B: novelty=0.5, confidence=0.9 → score=0.45 (tie, order stable-ish) # Hypothesis B: novelty=0.5, confidence=0.9 → score=0.45 (tie, order stable-ish)
@ -218,7 +220,6 @@ def test_ranking_by_novelty_times_confidence():
# Simpler approach: create 3 hypotheses and verify output is sorted correctly # Simpler approach: create 3 hypotheses and verify output is sorted correctly
# by providing distinct embeddings that produce known similarities. # by providing distinct embeddings that produce known similarities.
import math
# Corpus: single vector [1, 0, 0, ...] # Corpus: single vector [1, 0, 0, ...]
corpus_vec = [1.0] + [0.0] * 383 corpus_vec = [1.0] + [0.0] * 383
@ -254,7 +255,10 @@ def test_ranking_by_novelty_times_confidence():
mock_embedder.embed_batch.return_value = [batch_m] mock_embedder.embed_batch.return_value = [batch_m]
mock_embedder.embed.side_effect = side_effect_embed mock_embedder.embed.side_effect = side_effect_embed
db_path = _make_db_with_incidents([("OOM", "Memory exhaustion")]) db_path = _make_db_with_incidents(
[("OOM", "Memory exhaustion")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85)
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
@ -273,9 +277,9 @@ def test_ranking_by_novelty_times_confidence():
# Test 6: DB with no resolved incidents → novelty_score=1.0 # Test 6: DB with no resolved incidents → novelty_score=1.0
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_no_resolved_incidents_in_db_passthrough(): def test_no_resolved_incidents_in_db_passthrough(tmp_path):
"""When incidents table is empty, all hypotheses get novelty_score=1.0.""" """When incidents table is empty, all hypotheses get novelty_score=1.0."""
db_path = _make_db_with_incidents([]) # table exists but zero rows db_path = _make_db_with_incidents([], tmp_path / "turnstone.db") # table exists but zero rows
mock_embedder = _make_mock_embedder() mock_embedder = _make_mock_embedder()
suppressor = FalsePositiveSuppressor(model_id="test-model") suppressor = FalsePositiveSuppressor(model_id="test-model")
@ -293,9 +297,9 @@ def test_no_resolved_incidents_in_db_passthrough():
# Test 7: DB query failure → graceful fallback, no crash # Test 7: DB query failure → graceful fallback, no crash
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_db_query_failure_graceful_fallback(): def test_db_query_failure_graceful_fallback(tmp_path):
"""When the incidents table is missing, suppress() returns passthrough without raising.""" """When the incidents table is missing, suppress() returns passthrough without raising."""
db_path = _make_empty_db() # no 'incidents' table db_path = _make_empty_db(tmp_path / "turnstone.db") # no 'incidents' table
mock_embedder = _make_mock_embedder() mock_embedder = _make_mock_embedder()
suppressor = FalsePositiveSuppressor(model_id="test-model") suppressor = FalsePositiveSuppressor(model_id="test-model")
@ -311,9 +315,12 @@ def test_db_query_failure_graceful_fallback():
# Test 8: Embedding service unavailable (returns None) → graceful fallback # Test 8: Embedding service unavailable (returns None) → graceful fallback
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_embedding_service_unavailable_passthrough(): def test_embedding_service_unavailable_passthrough(tmp_path):
"""When get_embedder() returns None, suppress() falls back without crashing.""" """When get_embedder() returns None, suppress() falls back without crashing."""
db_path = _make_db_with_incidents([("OOM", "Memory pressure")]) db_path = _make_db_with_incidents(
[("OOM", "Memory pressure")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model") suppressor = FalsePositiveSuppressor(model_id="test-model")
with patch.object(suppressor, "_load_embedder", return_value=None): with patch.object(suppressor, "_load_embedder", return_value=None):
@ -329,10 +336,13 @@ def test_embedding_service_unavailable_passthrough():
# Test 9: Corpus cache invalidated when corpus changes # Test 9: Corpus cache invalidated when corpus changes
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def test_corpus_cache_invalidated_on_corpus_change(): def test_corpus_cache_invalidated_on_corpus_change(tmp_path):
"""When the corpus changes between calls, embed_batch is called again.""" """When the corpus changes between calls, embed_batch is called again."""
# First DB: one incident # First DB: one incident
db_path = _make_db_with_incidents([("OOM", "Memory pressure")]) db_path = _make_db_with_incidents(
[("OOM", "Memory pressure")],
tmp_path / "turnstone.db",
)
corpus_vec_1 = [1.0] + [0.0] * 383 corpus_vec_1 = [1.0] + [0.0] * 383
corpus_vec_2 = [0.0, 1.0] + [0.0] * 382 corpus_vec_2 = [0.0, 1.0] + [0.0] * 382