Was suppressing when novelty_score < 0.85 (i.e. similarity > 0.15), which would suppress nearly every hypothesis once embeddings are active. Now suppresses when max_sim >= similarity_threshold (0.85), meaning only hypotheses that are 85%+ similar to a resolved incident are suppressed. Also renames suppress_threshold → similarity_threshold for clarity and adds a borderline boundary test (0.85 suppressed, 0.84 not suppressed). Closes: #29
432 lines
17 KiB
Python
432 lines
17 KiB
Python
"""Tests for app/services/diagnose/suppressor.py — FalsePositiveSuppressor.
|
|
|
|
All tests use mocking; no real model downloads are made.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
import app.services.diagnose.suppressor as sup_module
|
|
from app.services.diagnose.models import Hypothesis, RankedHypothesis
|
|
from app.services.diagnose.suppressor import FalsePositiveSuppressor
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_hypothesis(
|
|
title: str = "Test",
|
|
description: str = "A test hypothesis.",
|
|
confidence: float = 0.8,
|
|
severity: str = "ERROR",
|
|
) -> Hypothesis:
|
|
return Hypothesis(
|
|
hypothesis_id="test-id",
|
|
title=title,
|
|
description=description,
|
|
confidence=confidence,
|
|
supporting_cluster_ids=(),
|
|
runbook_refs=(),
|
|
severity=severity, # type: ignore[arg-type]
|
|
)
|
|
|
|
|
|
def _make_db_with_incidents(incidents: list[tuple[str, str]], db_path: Path) -> Path:
|
|
"""Create a temporary SQLite database with resolved incidents. Returns the db path."""
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
conn.execute(
|
|
"CREATE TABLE incidents "
|
|
"(id INTEGER PRIMARY KEY, label TEXT, notes TEXT, ended_at TEXT)"
|
|
)
|
|
for label, notes in incidents:
|
|
conn.execute(
|
|
"INSERT INTO incidents (label, notes, ended_at) VALUES (?, ?, ?)",
|
|
(label, notes, "2024-01-01T00:00:00"),
|
|
)
|
|
conn.commit()
|
|
return db_path
|
|
|
|
|
|
def _make_empty_db(db_path: Path) -> Path:
|
|
"""Create a temporary SQLite DB with no incidents table."""
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
conn.execute("CREATE TABLE unrelated (id INTEGER PRIMARY KEY)")
|
|
conn.commit()
|
|
return db_path
|
|
|
|
|
|
def _make_mock_embedder(
|
|
embed_return: list[float] | None = None,
|
|
embed_batch_return: list[list[float]] | None = None,
|
|
) -> MagicMock:
|
|
"""Build a mock embedder with controllable embed/embed_batch responses."""
|
|
embedder = MagicMock()
|
|
|
|
# Default: unit vector along first dimension
|
|
default_vec = [1.0] + [0.0] * 383
|
|
|
|
raw_single = embed_return if embed_return is not None else default_vec
|
|
raw_batch = embed_batch_return if embed_batch_return is not None else [default_vec]
|
|
|
|
# Wrap scalars in numpy-like MagicMock with .tolist()
|
|
def _wrap(vec: list[float]) -> MagicMock:
|
|
m = MagicMock()
|
|
m.tolist.return_value = vec
|
|
return m
|
|
|
|
embedder.embed.return_value = _wrap(raw_single)
|
|
embedder.embed_batch.return_value = [_wrap(v) for v in raw_batch]
|
|
return embedder
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Autouse fixture: reset module-level cache between tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_suppressor_cache():
|
|
sup_module._corpus_cache.clear()
|
|
yield
|
|
sup_module._corpus_cache.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 1: No model configured — passthrough, ranked by confidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_no_model_passthrough_ranked_by_confidence(tmp_path):
|
|
"""model_id='' → all novelty_score=1.0, suppress=False, ranked by confidence desc."""
|
|
h_low = _make_hypothesis(title="Low", confidence=0.3)
|
|
h_high = _make_hypothesis(title="High", confidence=0.9)
|
|
h_mid = _make_hypothesis(title="Mid", confidence=0.6)
|
|
|
|
db_path = tmp_path / "turnstone.db"
|
|
suppressor = FalsePositiveSuppressor(model_id="")
|
|
results = suppressor.suppress([h_low, h_high, h_mid], db_path)
|
|
|
|
assert len(results) == 3
|
|
assert all(isinstance(r, RankedHypothesis) for r in results)
|
|
assert all(r.novelty_score == pytest.approx(1.0) for r in results)
|
|
assert all(r.similarity_to_known == pytest.approx(0.0) for r in results)
|
|
assert all(r.suppress is False for r in results)
|
|
assert all(r.suppression_reason is None for r in results)
|
|
# Ranked by confidence descending
|
|
confidences = [r.hypothesis.confidence for r in results]
|
|
assert confidences == sorted(confidences, reverse=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 2: High similarity → suppressed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_high_similarity_suppresses_hypothesis(tmp_path):
|
|
"""Hypothesis with embedding nearly identical to corpus → suppress=True."""
|
|
identical_vec = [1.0] + [0.0] * 383
|
|
corpus_vec = [1.0] + [0.0] * 383 # cosine similarity = 1.0
|
|
|
|
mock_embedder = _make_mock_embedder(
|
|
embed_return=identical_vec,
|
|
embed_batch_return=[corpus_vec],
|
|
)
|
|
|
|
db_path = _make_db_with_incidents(
|
|
[("OOM killer", "Memory pressure caused OOM kill")],
|
|
tmp_path / "turnstone.db",
|
|
)
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model", similarity_threshold=0.85)
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
results = suppressor.suppress([_make_hypothesis()], db_path)
|
|
|
|
assert len(results) == 1
|
|
result = results[0]
|
|
assert result.suppress is True
|
|
assert result.suppression_reason is not None
|
|
assert "Similar to resolved incident" in result.suppression_reason
|
|
assert result.similarity_to_known == pytest.approx(1.0, abs=0.01)
|
|
assert result.novelty_score == pytest.approx(0.0, abs=0.01)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 3: Low similarity → not suppressed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_low_similarity_does_not_suppress(tmp_path):
|
|
"""Hypothesis with embedding orthogonal to corpus → suppress=False."""
|
|
hypothesis_vec = [1.0] + [0.0] * 383
|
|
corpus_vec = [0.0, 1.0] + [0.0] * 382 # orthogonal → similarity = 0.0
|
|
|
|
mock_embedder = _make_mock_embedder(
|
|
embed_return=hypothesis_vec,
|
|
embed_batch_return=[corpus_vec],
|
|
)
|
|
|
|
db_path = _make_db_with_incidents(
|
|
[("Disk I/O", "Storage saturation caused latency")],
|
|
tmp_path / "turnstone.db",
|
|
)
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model", similarity_threshold=0.85)
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
results = suppressor.suppress([_make_hypothesis()], db_path)
|
|
|
|
assert len(results) == 1
|
|
result = results[0]
|
|
assert result.suppress is False
|
|
assert result.suppression_reason is None
|
|
assert result.similarity_to_known == pytest.approx(0.0, abs=0.01)
|
|
assert result.novelty_score == pytest.approx(1.0, abs=0.01)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 3b: Borderline similarity — exactly at threshold vs. just below
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_similarity_threshold_boundary(tmp_path):
|
|
"""similarity == threshold is suppressed; similarity just below threshold is not.
|
|
|
|
This test locks down the boundary semantics: suppress when max_sim >= threshold,
|
|
not when novelty_score < threshold (the inverted form that was the original bug).
|
|
With threshold=0.85:
|
|
- similarity=0.85 → suppressed (at boundary, inclusive)
|
|
- similarity=0.84 → NOT suppressed (just below)
|
|
"""
|
|
db_path = _make_db_with_incidents(
|
|
[("Disk I/O", "Storage saturation caused latency")],
|
|
tmp_path / "turnstone.db",
|
|
)
|
|
|
|
# Corpus unit vector along first axis
|
|
corpus_vec = [1.0] + [0.0] * 383
|
|
|
|
for sim_value, expected_suppress in [(0.85, True), (0.84, False)]:
|
|
# Build a hypothesis embedding whose cosine similarity to corpus_vec ≈ sim_value.
|
|
# query = [sim, sqrt(1 - sim^2), 0, ...] → cosine sim = sim exactly.
|
|
import math
|
|
hyp_vec = [sim_value, math.sqrt(max(0.0, 1.0 - sim_value ** 2))] + [0.0] * 382
|
|
|
|
mock_embedder = _make_mock_embedder(
|
|
embed_return=hyp_vec,
|
|
embed_batch_return=[corpus_vec],
|
|
)
|
|
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model", similarity_threshold=0.85)
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
results = suppressor.suppress([_make_hypothesis()], db_path)
|
|
|
|
assert len(results) == 1
|
|
result = results[0]
|
|
assert result.suppress is expected_suppress, (
|
|
f"similarity={sim_value:.2f}: expected suppress={expected_suppress}, "
|
|
f"got suppress={result.suppress} (similarity_to_known={result.similarity_to_known:.4f})"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 4: Empty hypotheses list returns []
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_empty_hypotheses_returns_empty(tmp_path):
|
|
"""suppress([]) → [] regardless of model or db state."""
|
|
db_path = tmp_path / "turnstone.db"
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model")
|
|
results = suppressor.suppress([], db_path)
|
|
assert results == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 5: Ranking by novelty_score * confidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_ranking_by_novelty_times_confidence(tmp_path):
|
|
"""Results are sorted by novelty_score * confidence descending."""
|
|
# Hypothesis A: novelty=0.9, confidence=0.5 → score=0.45
|
|
# Hypothesis B: novelty=0.5, confidence=0.9 → score=0.45 (tie, order stable-ish)
|
|
# Hypothesis C: novelty=0.8, confidence=0.9 → score=0.72 (highest)
|
|
# Expected order: C, then A or B
|
|
|
|
# We'll use orthogonal embeddings to get predictable similarities.
|
|
# Corpus has 3 incidents with different embeddings.
|
|
# We'll control novelty_score by setting similarity carefully.
|
|
|
|
# Simplest: set up so each hypothesis gets a specific similarity to its corpus.
|
|
# corpus_embs[0] = [1,0,0,...], [0,1,0,...], [0,0,1,...] — unit vectors
|
|
# hyp A embed = [cos(0.1), sin(0.1), 0...] → sim to corpus[0] = cos(0.1) ≈ 0.995 high
|
|
# This gets complex. Instead, mock _load_embedder to return None and rely
|
|
# on passthrough with controlled confidence, then verify confidence-based ranking.
|
|
# Then do a second test variant with manual novelty injection via embed return values.
|
|
|
|
# Simpler approach: create 3 hypotheses and verify output is sorted correctly
|
|
# by providing distinct embeddings that produce known similarities.
|
|
|
|
# Corpus: single vector [1, 0, 0, ...]
|
|
corpus_vec = [1.0] + [0.0] * 383
|
|
|
|
# H_A: similarity = 0.1 → novelty = 0.9, confidence = 0.5 → score = 0.45
|
|
angle_a = math.acos(0.1)
|
|
vec_a = [0.1, math.sin(angle_a)] + [0.0] * 382
|
|
|
|
# H_B: similarity = 0.5 → novelty = 0.5, confidence = 0.9 → score = 0.45
|
|
angle_b = math.acos(0.5)
|
|
vec_b = [0.5, math.sin(angle_b)] + [0.0] * 382
|
|
|
|
# H_C: similarity = 0.2 → novelty = 0.8, confidence = 0.9 → score = 0.72 (highest)
|
|
angle_c = math.acos(0.2)
|
|
vec_c = [0.2, math.sin(angle_c)] + [0.0] * 382
|
|
|
|
h_a = _make_hypothesis(title="A", confidence=0.5)
|
|
h_b = _make_hypothesis(title="B", confidence=0.9)
|
|
h_c = _make_hypothesis(title="C", confidence=0.9)
|
|
|
|
call_count = [0]
|
|
vecs_in_order = [vec_a, vec_b, vec_c]
|
|
|
|
def side_effect_embed(text: str) -> MagicMock:
|
|
m = MagicMock()
|
|
m.tolist.return_value = vecs_in_order[call_count[0] % len(vecs_in_order)]
|
|
call_count[0] += 1
|
|
return m
|
|
|
|
mock_embedder = MagicMock()
|
|
batch_m = MagicMock()
|
|
batch_m.tolist.return_value = corpus_vec
|
|
mock_embedder.embed_batch.return_value = [batch_m]
|
|
mock_embedder.embed.side_effect = side_effect_embed
|
|
|
|
db_path = _make_db_with_incidents(
|
|
[("OOM", "Memory exhaustion")],
|
|
tmp_path / "turnstone.db",
|
|
)
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model", similarity_threshold=0.85)
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
results = suppressor.suppress([h_a, h_b, h_c], db_path)
|
|
|
|
assert len(results) == 3
|
|
titles = [r.hypothesis.title for r in results]
|
|
# H_C should be first (highest novelty*confidence score)
|
|
assert titles[0] == "C", f"Expected C first, got {titles}"
|
|
# Verify sort is descending by novelty*confidence
|
|
scores = [r.novelty_score * r.hypothesis.confidence for r in results]
|
|
assert scores == sorted(scores, reverse=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 6: DB with no resolved incidents → novelty_score=1.0
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_no_resolved_incidents_in_db_passthrough(tmp_path):
|
|
"""When incidents table is empty, all hypotheses get novelty_score=1.0."""
|
|
db_path = _make_db_with_incidents([], tmp_path / "turnstone.db") # table exists but zero rows
|
|
mock_embedder = _make_mock_embedder()
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model")
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
results = suppressor.suppress([_make_hypothesis()], db_path)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].novelty_score == pytest.approx(1.0)
|
|
assert results[0].suppress is False
|
|
# embed_batch should NOT have been called (empty corpus short-circuits)
|
|
mock_embedder.embed_batch.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 7: DB query failure → graceful fallback, no crash
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_db_query_failure_graceful_fallback(tmp_path):
|
|
"""When the incidents table is missing, suppress() returns passthrough without raising."""
|
|
db_path = _make_empty_db(tmp_path / "turnstone.db") # no 'incidents' table
|
|
mock_embedder = _make_mock_embedder()
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model")
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
results = suppressor.suppress([_make_hypothesis()], db_path)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].novelty_score == pytest.approx(1.0)
|
|
assert results[0].suppress is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 8: Embedding service unavailable (returns None) → graceful fallback
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_embedding_service_unavailable_passthrough(tmp_path):
|
|
"""When get_embedder() returns None, suppress() falls back without crashing."""
|
|
db_path = _make_db_with_incidents(
|
|
[("OOM", "Memory pressure")],
|
|
tmp_path / "turnstone.db",
|
|
)
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model")
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=None):
|
|
results = suppressor.suppress([_make_hypothesis(confidence=0.7)], db_path)
|
|
|
|
assert len(results) == 1
|
|
assert results[0].novelty_score == pytest.approx(1.0)
|
|
assert results[0].suppress is False
|
|
assert results[0].suppression_reason is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 9: Corpus cache invalidated when corpus changes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_corpus_cache_invalidated_on_corpus_change(tmp_path):
|
|
"""When the corpus changes between calls, embed_batch is called again."""
|
|
# First DB: one incident
|
|
db_path = _make_db_with_incidents(
|
|
[("OOM", "Memory pressure")],
|
|
tmp_path / "turnstone.db",
|
|
)
|
|
|
|
corpus_vec_1 = [1.0] + [0.0] * 383
|
|
corpus_vec_2 = [0.0, 1.0] + [0.0] * 382
|
|
|
|
hyp_vec = [1.0] + [0.0] * 383
|
|
|
|
# embedder will be called twice for embed_batch (different corpus each time)
|
|
mock_embedder = MagicMock()
|
|
single_m = MagicMock()
|
|
single_m.tolist.return_value = hyp_vec
|
|
|
|
batch_m1 = MagicMock()
|
|
batch_m1.tolist.return_value = corpus_vec_1
|
|
batch_m2 = MagicMock()
|
|
batch_m2.tolist.return_value = corpus_vec_2
|
|
|
|
mock_embedder.embed.return_value = single_m
|
|
mock_embedder.embed_batch.side_effect = [[batch_m1], [batch_m2]]
|
|
|
|
suppressor = FalsePositiveSuppressor(model_id="test-model", similarity_threshold=0.85)
|
|
|
|
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
|
|
# First call — populates cache
|
|
results_1 = suppressor.suppress([_make_hypothesis()], db_path)
|
|
assert mock_embedder.embed_batch.call_count == 1
|
|
|
|
# Mutate the DB to add a second incident (changes corpus)
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
conn.execute(
|
|
"INSERT INTO incidents (label, notes, ended_at) VALUES (?, ?, ?)",
|
|
("Disk I/O", "Storage saturation", "2024-01-02T00:00:00"),
|
|
)
|
|
conn.commit()
|
|
|
|
# Second call — corpus changed, should re-embed
|
|
results_2 = suppressor.suppress([_make_hypothesis()], db_path)
|
|
assert mock_embedder.embed_batch.call_count == 2, (
|
|
"embed_batch should be called again when corpus changes"
|
|
)
|
|
|
|
assert len(results_1) == 1
|
|
assert len(results_2) == 1
|