From e3da2ab5148bee0aa48c3751215344752aa2c7c1 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 25 May 2026 14:28:31 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Stage=204=20=E2=80=94=20FalsePositiveSu?= =?UTF-8?q?ppressor=20for=20multi-agent=20diagnose=20pipeline=20(issue=20#?= =?UTF-8?q?29)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implements FalsePositiveSuppressor using embedding cosine similarity - Lazy corpus embedding via get_embedder() with module-level cache keyed by db_path - Cache invalidated automatically when the resolved incident corpus changes - Suppresses hypotheses with novelty_score below configurable threshold (default 0.85) - Full fallback path (novelty=1.0, no suppression) when model_id empty, embedding service unavailable, or no resolved incidents found in DB - Graceful handling of missing incidents table and DB query failures - Numpy bool_ leakage prevented by explicit float()/bool() coercion at assignment - Pure-Python cosine fallback for environments without numpy - 9 new tests (all mocked, no real model downloads): passthrough, suppress, no-suppress, empty list, ranking, empty corpus, DB failure, service unavailable, cache invalidation - 350 total tests passing (341 pre-existing + 9 new) Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/29 --- app/services/diagnose/suppressor.py | 252 +++++++++++++++++++ tests/test_diagnose_suppressor.py | 377 ++++++++++++++++++++++++++++ 2 files changed, 629 insertions(+) create mode 100644 app/services/diagnose/suppressor.py create mode 100644 tests/test_diagnose_suppressor.py diff --git a/app/services/diagnose/suppressor.py b/app/services/diagnose/suppressor.py new file mode 100644 index 0000000..c7c9ecf --- /dev/null +++ b/app/services/diagnose/suppressor.py @@ -0,0 +1,252 @@ +"""Stage 4: False-Positive Suppressor — embedding cosine similarity. + +Compares each hypothesis against a corpus of resolved incidents using +embedding cosine similarity. Hypotheses that closely match a previously +resolved incident are suppressed as likely false positives. + +When no embedding model is configured or the service is unavailable, all +hypotheses pass through with novelty_score=1.0 (full novelty assumed). +""" +from __future__ import annotations + +import logging +import sqlite3 +from pathlib import Path +from typing import Any + +from app.services.diagnose.models import Hypothesis, RankedHypothesis + +logger = logging.getLogger(__name__) + +# Module-level corpus cache: db_path_str -> (corpus_texts, embeddings) +# Invalidated when the corpus text list changes between calls. +_corpus_cache: dict[str, tuple[list[str], Any]] = {} + +# --------------------------------------------------------------------------- +# Cosine similarity helpers +# --------------------------------------------------------------------------- + +try: + import numpy as np + + def _cosine_similarities( + query_emb: list[float], corpus_embs: list[list[float]] + ) -> list[float]: + """Batch cosine similarity of one query embedding against all corpus embeddings.""" + q = np.array(query_emb, dtype=np.float32) + c = np.array(corpus_embs, dtype=np.float32) + q_norm = q / (np.linalg.norm(q) + 1e-10) + c_norm = c / (np.linalg.norm(c, axis=1, keepdims=True) + 1e-10) + return list(c_norm @ q_norm) + + _HAS_NUMPY = True + +except ImportError: # pragma: no cover + import math + + _HAS_NUMPY = False + + def _dot(a: list[float], b: list[float]) -> float: + return sum(x * y for x, y in zip(a, b)) + + def _norm(a: list[float]) -> float: + return math.sqrt(sum(x * x for x in a)) + 1e-10 + + def _cosine(a: list[float], b: list[float]) -> float: + return _dot(a, b) / (_norm(a) * _norm(b)) + + def _cosine_similarities( + query_emb: list[float], corpus_embs: list[list[float]] + ) -> list[float]: + return [_cosine(query_emb, c) for c in corpus_embs] + + +# --------------------------------------------------------------------------- +# DB helpers +# --------------------------------------------------------------------------- + +def _fetch_resolved_incidents(db_path: Path) -> list[str]: + """Fetch resolved incident texts from SQLite. + + Returns a list of non-empty combined strings for each resolved incident. + Returns an empty list on any error (missing table, connection failure, etc.). + """ + try: + with sqlite3.connect(str(db_path)) as conn: + cursor = conn.execute( + "SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200" + ) + rows = cursor.fetchall() + except sqlite3.OperationalError as exc: + logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc) + return [] + except Exception as exc: + logger.warning("Unexpected error fetching resolved incidents (%s) — treating as empty corpus", exc) + return [] + + texts: list[str] = [] + for label, notes in rows: + label = (label or "").strip() + notes = (notes or "").strip() + combined = f"{label}. {notes}" if label and notes else (label or notes) + if combined: + texts.append(combined) + return texts + + +# --------------------------------------------------------------------------- +# Public class +# --------------------------------------------------------------------------- + +class FalsePositiveSuppressor: + """Stage 4 of the multi-agent diagnose pipeline. + + Uses embedding cosine similarity to detect hypotheses that closely match + previously resolved incidents and suppress them as likely false positives. + + When model_id is empty or the embedding service is unavailable, all + hypotheses pass through with novelty_score=1.0 (no suppression). + """ + + def __init__( + self, + model_id: str = "", + device: str = "cpu", + suppress_threshold: float = 0.85, + ) -> None: + self._model_id = model_id + self._device = device + self._suppress_threshold = suppress_threshold + + def suppress( + self, + hypotheses: list[Hypothesis], + db_path: Path, + ) -> list[RankedHypothesis]: + """Rank hypotheses by novelty, suppressing those matching resolved incidents. + + Args: + hypotheses: Candidate hypotheses from Stage 3. + db_path: Path to the Turnstone SQLite database containing incidents. + + Returns: + List of RankedHypothesis sorted by (novelty_score * confidence) descending. + Non-suppressed hypotheses appear first in practice. + """ + if not hypotheses: + return [] + + # No model configured — full passthrough, rank by confidence only. + if not self._model_id: + return self._passthrough(hypotheses) + + # Attempt to obtain an embedder; fall back to passthrough on failure. + embedder = self._load_embedder() + if embedder is None: + logger.warning( + "Embedding service unavailable for model %r — skipping suppression", + self._model_id, + ) + return self._passthrough(hypotheses) + + # Fetch corpus texts from DB; fall back to passthrough if corpus is empty. + corpus_texts = _fetch_resolved_incidents(db_path) + if not corpus_texts: + logger.debug("No resolved incidents found — all hypotheses treated as novel") + return self._passthrough(hypotheses) + + # Embed corpus (with caching). + corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path) + + # Rank hypotheses using novelty score. + ranked: list[RankedHypothesis] = [] + for h in hypotheses: + h_text = f"{h.title}. {h.description}" + try: + h_emb = embedder.embed(h_text) + # Convert numpy array to plain Python list for _cosine_similarities + h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb) + sims = _cosine_similarities(h_emb_list, corpus_embeddings) + max_sim = max(sims) if sims else 0.0 + except Exception as exc: + logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", h.title, exc) + max_sim = 0.0 + + max_sim = float(max_sim) + novelty_score = 1.0 - max_sim + suppress = bool(novelty_score < self._suppress_threshold) + suppression_reason = ( + f"Similar to resolved incident (similarity {max_sim:.2f})" + if suppress + else None + ) + ranked.append( + RankedHypothesis( + hypothesis=h, + novelty_score=novelty_score, + similarity_to_known=max_sim, + suppress=suppress, + suppression_reason=suppression_reason, + ) + ) + + ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True) + return ranked + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _load_embedder(self) -> Any | None: + """Load the embedding service. Returns None if unavailable.""" + try: + from app.services.embeddings import get_embedder + return get_embedder() + except Exception as exc: + logger.warning("Failed to import/initialise embedding service: %s", exc) + return None + + def _get_corpus_embeddings( + self, + embedder: Any, + corpus_texts: list[str], + db_path: Path, + ) -> list[list[float]]: + """Return cached corpus embeddings, re-embedding if the corpus has changed.""" + cache_key = str(db_path) + cached = _corpus_cache.get(cache_key) + + if cached is not None: + cached_texts, cached_embeddings = cached + if cached_texts == corpus_texts: + return cached_embeddings + + logger.debug("Embedding corpus of %d resolved incidents", len(corpus_texts)) + try: + raw_embeddings = embedder.embed_batch(corpus_texts) + # Normalise each embedding to a plain Python list for portability + corpus_embeddings: list[list[float]] = [ + e.tolist() if hasattr(e, "tolist") else list(e) + for e in raw_embeddings + ] + except Exception as exc: + logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc) + return [] + + _corpus_cache[cache_key] = (corpus_texts, corpus_embeddings) + return corpus_embeddings + + def _passthrough(self, hypotheses: list[Hypothesis]) -> list[RankedHypothesis]: + """Return all hypotheses as non-suppressed, ranked by confidence descending.""" + ranked = [ + RankedHypothesis( + hypothesis=h, + novelty_score=1.0, + similarity_to_known=0.0, + suppress=False, + suppression_reason=None, + ) + for h in hypotheses + ] + ranked.sort(key=lambda rh: rh.hypothesis.confidence, reverse=True) + return ranked diff --git a/tests/test_diagnose_suppressor.py b/tests/test_diagnose_suppressor.py new file mode 100644 index 0000000..0c155de --- /dev/null +++ b/tests/test_diagnose_suppressor.py @@ -0,0 +1,377 @@ +"""Tests for app/services/diagnose/suppressor.py — FalsePositiveSuppressor. + +All tests use mocking; no real model downloads are made. +""" +from __future__ import annotations + +import sqlite3 +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +import app.services.diagnose.suppressor as sup_module +from app.services.diagnose.models import Hypothesis, RankedHypothesis +from app.services.diagnose.suppressor import FalsePositiveSuppressor + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_hypothesis( + title: str = "Test", + description: str = "A test hypothesis.", + confidence: float = 0.8, + severity: str = "ERROR", +) -> Hypothesis: + return Hypothesis( + hypothesis_id="test-id", + title=title, + description=description, + confidence=confidence, + supporting_cluster_ids=(), + runbook_refs=(), + severity=severity, # type: ignore[arg-type] + ) + + +def _make_db_with_incidents(incidents: list[tuple[str, str]]) -> Path: + """Create a temporary SQLite database with resolved incidents. Returns the db path.""" + tmp = tempfile.mktemp(suffix=".db") + db_path = Path(tmp) + with sqlite3.connect(str(db_path)) as conn: + conn.execute( + "CREATE TABLE incidents " + "(id INTEGER PRIMARY KEY, label TEXT, notes TEXT, ended_at TEXT)" + ) + for label, notes in incidents: + conn.execute( + "INSERT INTO incidents (label, notes, ended_at) VALUES (?, ?, ?)", + (label, notes, "2024-01-01T00:00:00"), + ) + conn.commit() + return db_path + + +def _make_empty_db() -> Path: + """Create a temporary SQLite DB with no incidents table.""" + tmp = tempfile.mktemp(suffix=".db") + db_path = Path(tmp) + with sqlite3.connect(str(db_path)) as conn: + conn.execute("CREATE TABLE unrelated (id INTEGER PRIMARY KEY)") + conn.commit() + return db_path + + +def _make_mock_embedder( + embed_return: list[float] | None = None, + embed_batch_return: list[list[float]] | None = None, +) -> MagicMock: + """Build a mock embedder with controllable embed/embed_batch responses.""" + embedder = MagicMock() + + # Default: unit vector along first dimension + default_vec = [1.0] + [0.0] * 383 + + raw_single = embed_return if embed_return is not None else default_vec + raw_batch = embed_batch_return if embed_batch_return is not None else [default_vec] + + # Wrap scalars in numpy-like MagicMock with .tolist() + def _wrap(vec: list[float]) -> MagicMock: + m = MagicMock() + m.tolist.return_value = vec + return m + + embedder.embed.return_value = _wrap(raw_single) + embedder.embed_batch.return_value = [_wrap(v) for v in raw_batch] + return embedder + + +# --------------------------------------------------------------------------- +# Autouse fixture: reset module-level cache between tests +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def reset_suppressor_cache(): + sup_module._corpus_cache.clear() + yield + sup_module._corpus_cache.clear() + + +# --------------------------------------------------------------------------- +# Test 1: No model configured — passthrough, ranked by confidence +# --------------------------------------------------------------------------- + +def test_no_model_passthrough_ranked_by_confidence(): + """model_id='' → all novelty_score=1.0, suppress=False, ranked by confidence desc.""" + h_low = _make_hypothesis(title="Low", confidence=0.3) + h_high = _make_hypothesis(title="High", confidence=0.9) + h_mid = _make_hypothesis(title="Mid", confidence=0.6) + + db_path = Path(tempfile.mktemp(suffix=".db")) + suppressor = FalsePositiveSuppressor(model_id="") + results = suppressor.suppress([h_low, h_high, h_mid], db_path) + + assert len(results) == 3 + assert all(isinstance(r, RankedHypothesis) for r in results) + assert all(r.novelty_score == pytest.approx(1.0) for r in results) + assert all(r.similarity_to_known == pytest.approx(0.0) for r in results) + assert all(r.suppress is False for r in results) + assert all(r.suppression_reason is None for r in results) + # Ranked by confidence descending + confidences = [r.hypothesis.confidence for r in results] + assert confidences == sorted(confidences, reverse=True) + + +# --------------------------------------------------------------------------- +# Test 2: High similarity → suppressed +# --------------------------------------------------------------------------- + +def test_high_similarity_suppresses_hypothesis(): + """Hypothesis with embedding nearly identical to corpus → suppress=True.""" + identical_vec = [1.0] + [0.0] * 383 + corpus_vec = [1.0] + [0.0] * 383 # cosine similarity = 1.0 + + mock_embedder = _make_mock_embedder( + embed_return=identical_vec, + embed_batch_return=[corpus_vec], + ) + + db_path = _make_db_with_incidents([("OOM killer", "Memory pressure caused OOM kill")]) + suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) + + with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): + results = suppressor.suppress([_make_hypothesis()], db_path) + + assert len(results) == 1 + result = results[0] + assert result.suppress is True + assert result.suppression_reason is not None + assert "Similar to resolved incident" in result.suppression_reason + assert result.similarity_to_known == pytest.approx(1.0, abs=0.01) + assert result.novelty_score == pytest.approx(0.0, abs=0.01) + + +# --------------------------------------------------------------------------- +# Test 3: Low similarity → not suppressed +# --------------------------------------------------------------------------- + +def test_low_similarity_does_not_suppress(): + """Hypothesis with embedding orthogonal to corpus → suppress=False.""" + hypothesis_vec = [1.0] + [0.0] * 383 + corpus_vec = [0.0, 1.0] + [0.0] * 382 # orthogonal → similarity = 0.0 + + mock_embedder = _make_mock_embedder( + embed_return=hypothesis_vec, + embed_batch_return=[corpus_vec], + ) + + db_path = _make_db_with_incidents([("Disk I/O", "Storage saturation caused latency")]) + suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) + + with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): + results = suppressor.suppress([_make_hypothesis()], db_path) + + assert len(results) == 1 + result = results[0] + assert result.suppress is False + assert result.suppression_reason is None + assert result.similarity_to_known == pytest.approx(0.0, abs=0.01) + assert result.novelty_score == pytest.approx(1.0, abs=0.01) + + +# --------------------------------------------------------------------------- +# Test 4: Empty hypotheses list returns [] +# --------------------------------------------------------------------------- + +def test_empty_hypotheses_returns_empty(): + """suppress([]) → [] regardless of model or db state.""" + db_path = Path(tempfile.mktemp(suffix=".db")) + suppressor = FalsePositiveSuppressor(model_id="test-model") + results = suppressor.suppress([], db_path) + assert results == [] + + +# --------------------------------------------------------------------------- +# Test 5: Ranking by novelty_score * confidence +# --------------------------------------------------------------------------- + +def test_ranking_by_novelty_times_confidence(): + """Results are sorted by novelty_score * confidence descending.""" + # Hypothesis A: novelty=0.9, confidence=0.5 → score=0.45 + # Hypothesis B: novelty=0.5, confidence=0.9 → score=0.45 (tie, order stable-ish) + # Hypothesis C: novelty=0.8, confidence=0.9 → score=0.72 (highest) + # Expected order: C, then A or B + + # We'll use orthogonal embeddings to get predictable similarities. + # Corpus has 3 incidents with different embeddings. + # We'll control novelty_score by setting similarity carefully. + + # Simplest: set up so each hypothesis gets a specific similarity to its corpus. + # corpus_embs[0] = [1,0,0,...], [0,1,0,...], [0,0,1,...] — unit vectors + # hyp A embed = [cos(0.1), sin(0.1), 0...] → sim to corpus[0] = cos(0.1) ≈ 0.995 high + # This gets complex. Instead, mock _load_embedder to return None and rely + # on passthrough with controlled confidence, then verify confidence-based ranking. + # Then do a second test variant with manual novelty injection via embed return values. + + # Simpler approach: create 3 hypotheses and verify output is sorted correctly + # by providing distinct embeddings that produce known similarities. + import math + + # Corpus: single vector [1, 0, 0, ...] + corpus_vec = [1.0] + [0.0] * 383 + + # H_A: similarity = 0.1 → novelty = 0.9, confidence = 0.5 → score = 0.45 + angle_a = math.acos(0.1) + vec_a = [0.1, math.sin(angle_a)] + [0.0] * 382 + + # H_B: similarity = 0.5 → novelty = 0.5, confidence = 0.9 → score = 0.45 + angle_b = math.acos(0.5) + vec_b = [0.5, math.sin(angle_b)] + [0.0] * 382 + + # H_C: similarity = 0.2 → novelty = 0.8, confidence = 0.9 → score = 0.72 (highest) + angle_c = math.acos(0.2) + vec_c = [0.2, math.sin(angle_c)] + [0.0] * 382 + + h_a = _make_hypothesis(title="A", confidence=0.5) + h_b = _make_hypothesis(title="B", confidence=0.9) + h_c = _make_hypothesis(title="C", confidence=0.9) + + call_count = [0] + vecs_in_order = [vec_a, vec_b, vec_c] + + def side_effect_embed(text: str) -> MagicMock: + m = MagicMock() + m.tolist.return_value = vecs_in_order[call_count[0] % len(vecs_in_order)] + call_count[0] += 1 + return m + + mock_embedder = MagicMock() + batch_m = MagicMock() + batch_m.tolist.return_value = corpus_vec + mock_embedder.embed_batch.return_value = [batch_m] + mock_embedder.embed.side_effect = side_effect_embed + + db_path = _make_db_with_incidents([("OOM", "Memory exhaustion")]) + suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) + + with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): + results = suppressor.suppress([h_a, h_b, h_c], db_path) + + assert len(results) == 3 + titles = [r.hypothesis.title for r in results] + # H_C should be first (highest novelty*confidence score) + assert titles[0] == "C", f"Expected C first, got {titles}" + # Verify sort is descending by novelty*confidence + scores = [r.novelty_score * r.hypothesis.confidence for r in results] + assert scores == sorted(scores, reverse=True) + + +# --------------------------------------------------------------------------- +# Test 6: DB with no resolved incidents → novelty_score=1.0 +# --------------------------------------------------------------------------- + +def test_no_resolved_incidents_in_db_passthrough(): + """When incidents table is empty, all hypotheses get novelty_score=1.0.""" + db_path = _make_db_with_incidents([]) # table exists but zero rows + mock_embedder = _make_mock_embedder() + suppressor = FalsePositiveSuppressor(model_id="test-model") + + with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): + results = suppressor.suppress([_make_hypothesis()], db_path) + + assert len(results) == 1 + assert results[0].novelty_score == pytest.approx(1.0) + assert results[0].suppress is False + # embed_batch should NOT have been called (empty corpus short-circuits) + mock_embedder.embed_batch.assert_not_called() + + +# --------------------------------------------------------------------------- +# Test 7: DB query failure → graceful fallback, no crash +# --------------------------------------------------------------------------- + +def test_db_query_failure_graceful_fallback(): + """When the incidents table is missing, suppress() returns passthrough without raising.""" + db_path = _make_empty_db() # no 'incidents' table + mock_embedder = _make_mock_embedder() + suppressor = FalsePositiveSuppressor(model_id="test-model") + + with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): + results = suppressor.suppress([_make_hypothesis()], db_path) + + assert len(results) == 1 + assert results[0].novelty_score == pytest.approx(1.0) + assert results[0].suppress is False + + +# --------------------------------------------------------------------------- +# Test 8: Embedding service unavailable (returns None) → graceful fallback +# --------------------------------------------------------------------------- + +def test_embedding_service_unavailable_passthrough(): + """When get_embedder() returns None, suppress() falls back without crashing.""" + db_path = _make_db_with_incidents([("OOM", "Memory pressure")]) + suppressor = FalsePositiveSuppressor(model_id="test-model") + + with patch.object(suppressor, "_load_embedder", return_value=None): + results = suppressor.suppress([_make_hypothesis(confidence=0.7)], db_path) + + assert len(results) == 1 + assert results[0].novelty_score == pytest.approx(1.0) + assert results[0].suppress is False + assert results[0].suppression_reason is None + + +# --------------------------------------------------------------------------- +# Test 9: Corpus cache invalidated when corpus changes +# --------------------------------------------------------------------------- + +def test_corpus_cache_invalidated_on_corpus_change(): + """When the corpus changes between calls, embed_batch is called again.""" + # First DB: one incident + db_path = _make_db_with_incidents([("OOM", "Memory pressure")]) + + corpus_vec_1 = [1.0] + [0.0] * 383 + corpus_vec_2 = [0.0, 1.0] + [0.0] * 382 + + hyp_vec = [1.0] + [0.0] * 383 + + # embedder will be called twice for embed_batch (different corpus each time) + mock_embedder = MagicMock() + single_m = MagicMock() + single_m.tolist.return_value = hyp_vec + + batch_m1 = MagicMock() + batch_m1.tolist.return_value = corpus_vec_1 + batch_m2 = MagicMock() + batch_m2.tolist.return_value = corpus_vec_2 + + mock_embedder.embed.return_value = single_m + mock_embedder.embed_batch.side_effect = [[batch_m1], [batch_m2]] + + suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85) + + with patch.object(suppressor, "_load_embedder", return_value=mock_embedder): + # First call — populates cache + results_1 = suppressor.suppress([_make_hypothesis()], db_path) + assert mock_embedder.embed_batch.call_count == 1 + + # Mutate the DB to add a second incident (changes corpus) + with sqlite3.connect(str(db_path)) as conn: + conn.execute( + "INSERT INTO incidents (label, notes, ended_at) VALUES (?, ?, ?)", + ("Disk I/O", "Storage saturation", "2024-01-02T00:00:00"), + ) + conn.commit() + + # Second call — corpus changed, should re-embed + results_2 = suppressor.suppress([_make_hypothesis()], db_path) + assert mock_embedder.embed_batch.call_count == 2, ( + "embed_batch should be called again when corpus changes" + ) + + assert len(results_1) == 1 + assert len(results_2) == 1