refactor: extract _score_hypothesis helper, fix exception types, pass device in suppressor

This commit is contained in:
pyr0ball 2026-05-25 14:41:33 -07:00
parent 84e0cf5245
commit 54d4ec5325
2 changed files with 91 additions and 60 deletions

View file

@ -80,8 +80,9 @@ def _fetch_resolved_incidents(db_path: Path) -> list[str]:
except sqlite3.OperationalError as exc:
logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc)
return []
except Exception as exc:
logger.warning("Unexpected error fetching resolved incidents (%s) — treating as empty corpus", exc)
except sqlite3.Error as exc:
# Catches all remaining SQLite-family errors (IntegrityError, DatabaseError, etc.)
logger.warning("Unexpected SQLite error fetching resolved incidents (%s) — treating as empty corpus", exc)
return []
texts: list[str] = []
@ -116,6 +117,7 @@ class FalsePositiveSuppressor:
) -> None:
self._model_id = model_id
self._device = device
# _device stored for future use when get_embedder() supports device selection
self._suppress_threshold = suppress_threshold
def suppress(
@ -158,38 +160,11 @@ class FalsePositiveSuppressor:
# Embed corpus (with caching).
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path)
# Rank hypotheses using novelty score.
ranked: list[RankedHypothesis] = []
for h in hypotheses:
h_text = f"{h.title}. {h.description}"
try:
h_emb = embedder.embed(h_text)
# Convert numpy array to plain Python list for _cosine_similarities
h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb)
sims = _cosine_similarities(h_emb_list, corpus_embeddings)
max_sim = max(sims) if sims else 0.0
except Exception as exc:
logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", h.title, exc)
max_sim = 0.0
max_sim = float(max_sim)
novelty_score = 1.0 - max_sim
suppress = bool(novelty_score < self._suppress_threshold)
suppression_reason = (
f"Similar to resolved incident (similarity {max_sim:.2f})"
if suppress
else None
)
ranked.append(
RankedHypothesis(
hypothesis=h,
novelty_score=novelty_score,
similarity_to_known=max_sim,
suppress=suppress,
suppression_reason=suppression_reason,
)
)
# Score each hypothesis and sort by novelty * confidence descending.
ranked = [
self._score_hypothesis(h, embedder, corpus_embeddings)
for h in hypotheses
]
ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True)
return ranked
@ -197,12 +172,55 @@ class FalsePositiveSuppressor:
# Private helpers
# ------------------------------------------------------------------
def _score_hypothesis(
self,
hypothesis: Hypothesis,
embedder: Any,
corpus_embeddings: list[list[float]],
) -> RankedHypothesis:
"""Score a single hypothesis against the resolved incident corpus."""
try:
query_text = f"{hypothesis.title}. {hypothesis.description}"
h_emb = embedder.embed(query_text)
# Convert numpy array to plain Python list for _cosine_similarities
h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb)
sims = _cosine_similarities(h_emb_list, corpus_embeddings)
max_sim = float(max(sims)) if sims else 0.0
except Exception as exc:
# Broad catch is intentional: catches unknown embedder runtime errors
# (e.g. CUDA OOM, backend crashes) so one bad hypothesis never halts the pipeline.
logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", hypothesis.title, exc)
return RankedHypothesis(
hypothesis=hypothesis,
novelty_score=1.0,
similarity_to_known=0.0,
suppress=False,
suppression_reason=None,
)
novelty_score = 1.0 - max_sim
suppress = bool(novelty_score < self._suppress_threshold)
suppression_reason = (
f"Similar to resolved incident (similarity {max_sim:.2f})"
if suppress
else None
)
return RankedHypothesis(
hypothesis=hypothesis,
novelty_score=novelty_score,
similarity_to_known=max_sim,
suppress=suppress,
suppression_reason=suppression_reason,
)
def _load_embedder(self) -> Any | None:
"""Load the embedding service. Returns None if unavailable."""
try:
from app.services.embeddings import get_embedder
return get_embedder()
except Exception as exc:
# Broad catch is intentional: get_embedder() may raise on import or
# backend init failures from any number of third-party libraries.
logger.warning("Failed to import/initialise embedding service: %s", exc)
return None
@ -230,6 +248,9 @@ class FalsePositiveSuppressor:
for e in raw_embeddings
]
except Exception as exc:
# Broad catch is intentional: embed_batch() may raise from any backend
# (network timeout, CUDA error, etc.) — treat as empty corpus so the
# pipeline can continue without suppression.
logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc)
return []

View file

@ -4,8 +4,8 @@ All tests use mocking; no real model downloads are made.
"""
from __future__ import annotations
import math
import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
@ -37,10 +37,8 @@ def _make_hypothesis(
)
def _make_db_with_incidents(incidents: list[tuple[str, str]]) -> Path:
def _make_db_with_incidents(incidents: list[tuple[str, str]], db_path: Path) -> Path:
"""Create a temporary SQLite database with resolved incidents. Returns the db path."""
tmp = tempfile.mktemp(suffix=".db")
db_path = Path(tmp)
with sqlite3.connect(str(db_path)) as conn:
conn.execute(
"CREATE TABLE incidents "
@ -55,10 +53,8 @@ def _make_db_with_incidents(incidents: list[tuple[str, str]]) -> Path:
return db_path
def _make_empty_db() -> Path:
def _make_empty_db(db_path: Path) -> Path:
"""Create a temporary SQLite DB with no incidents table."""
tmp = tempfile.mktemp(suffix=".db")
db_path = Path(tmp)
with sqlite3.connect(str(db_path)) as conn:
conn.execute("CREATE TABLE unrelated (id INTEGER PRIMARY KEY)")
conn.commit()
@ -104,13 +100,13 @@ def reset_suppressor_cache():
# Test 1: No model configured — passthrough, ranked by confidence
# ---------------------------------------------------------------------------
def test_no_model_passthrough_ranked_by_confidence():
def test_no_model_passthrough_ranked_by_confidence(tmp_path):
"""model_id='' → all novelty_score=1.0, suppress=False, ranked by confidence desc."""
h_low = _make_hypothesis(title="Low", confidence=0.3)
h_high = _make_hypothesis(title="High", confidence=0.9)
h_mid = _make_hypothesis(title="Mid", confidence=0.6)
db_path = Path(tempfile.mktemp(suffix=".db"))
db_path = tmp_path / "turnstone.db"
suppressor = FalsePositiveSuppressor(model_id="")
results = suppressor.suppress([h_low, h_high, h_mid], db_path)
@ -129,7 +125,7 @@ def test_no_model_passthrough_ranked_by_confidence():
# Test 2: High similarity → suppressed
# ---------------------------------------------------------------------------
def test_high_similarity_suppresses_hypothesis():
def test_high_similarity_suppresses_hypothesis(tmp_path):
"""Hypothesis with embedding nearly identical to corpus → suppress=True."""
identical_vec = [1.0] + [0.0] * 383
corpus_vec = [1.0] + [0.0] * 383 # cosine similarity = 1.0
@ -139,7 +135,10 @@ def test_high_similarity_suppresses_hypothesis():
embed_batch_return=[corpus_vec],
)
db_path = _make_db_with_incidents([("OOM killer", "Memory pressure caused OOM kill")])
db_path = _make_db_with_incidents(
[("OOM killer", "Memory pressure caused OOM kill")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85)
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
@ -158,7 +157,7 @@ def test_high_similarity_suppresses_hypothesis():
# Test 3: Low similarity → not suppressed
# ---------------------------------------------------------------------------
def test_low_similarity_does_not_suppress():
def test_low_similarity_does_not_suppress(tmp_path):
"""Hypothesis with embedding orthogonal to corpus → suppress=False."""
hypothesis_vec = [1.0] + [0.0] * 383
corpus_vec = [0.0, 1.0] + [0.0] * 382 # orthogonal → similarity = 0.0
@ -168,7 +167,10 @@ def test_low_similarity_does_not_suppress():
embed_batch_return=[corpus_vec],
)
db_path = _make_db_with_incidents([("Disk I/O", "Storage saturation caused latency")])
db_path = _make_db_with_incidents(
[("Disk I/O", "Storage saturation caused latency")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85)
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
@ -186,9 +188,9 @@ def test_low_similarity_does_not_suppress():
# Test 4: Empty hypotheses list returns []
# ---------------------------------------------------------------------------
def test_empty_hypotheses_returns_empty():
def test_empty_hypotheses_returns_empty(tmp_path):
"""suppress([]) → [] regardless of model or db state."""
db_path = Path(tempfile.mktemp(suffix=".db"))
db_path = tmp_path / "turnstone.db"
suppressor = FalsePositiveSuppressor(model_id="test-model")
results = suppressor.suppress([], db_path)
assert results == []
@ -198,7 +200,7 @@ def test_empty_hypotheses_returns_empty():
# Test 5: Ranking by novelty_score * confidence
# ---------------------------------------------------------------------------
def test_ranking_by_novelty_times_confidence():
def test_ranking_by_novelty_times_confidence(tmp_path):
"""Results are sorted by novelty_score * confidence descending."""
# Hypothesis A: novelty=0.9, confidence=0.5 → score=0.45
# Hypothesis B: novelty=0.5, confidence=0.9 → score=0.45 (tie, order stable-ish)
@ -218,7 +220,6 @@ def test_ranking_by_novelty_times_confidence():
# Simpler approach: create 3 hypotheses and verify output is sorted correctly
# by providing distinct embeddings that produce known similarities.
import math
# Corpus: single vector [1, 0, 0, ...]
corpus_vec = [1.0] + [0.0] * 383
@ -254,7 +255,10 @@ def test_ranking_by_novelty_times_confidence():
mock_embedder.embed_batch.return_value = [batch_m]
mock_embedder.embed.side_effect = side_effect_embed
db_path = _make_db_with_incidents([("OOM", "Memory exhaustion")])
db_path = _make_db_with_incidents(
[("OOM", "Memory exhaustion")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model", suppress_threshold=0.85)
with patch.object(suppressor, "_load_embedder", return_value=mock_embedder):
@ -273,9 +277,9 @@ def test_ranking_by_novelty_times_confidence():
# Test 6: DB with no resolved incidents → novelty_score=1.0
# ---------------------------------------------------------------------------
def test_no_resolved_incidents_in_db_passthrough():
def test_no_resolved_incidents_in_db_passthrough(tmp_path):
"""When incidents table is empty, all hypotheses get novelty_score=1.0."""
db_path = _make_db_with_incidents([]) # table exists but zero rows
db_path = _make_db_with_incidents([], tmp_path / "turnstone.db") # table exists but zero rows
mock_embedder = _make_mock_embedder()
suppressor = FalsePositiveSuppressor(model_id="test-model")
@ -293,9 +297,9 @@ def test_no_resolved_incidents_in_db_passthrough():
# Test 7: DB query failure → graceful fallback, no crash
# ---------------------------------------------------------------------------
def test_db_query_failure_graceful_fallback():
def test_db_query_failure_graceful_fallback(tmp_path):
"""When the incidents table is missing, suppress() returns passthrough without raising."""
db_path = _make_empty_db() # no 'incidents' table
db_path = _make_empty_db(tmp_path / "turnstone.db") # no 'incidents' table
mock_embedder = _make_mock_embedder()
suppressor = FalsePositiveSuppressor(model_id="test-model")
@ -311,9 +315,12 @@ def test_db_query_failure_graceful_fallback():
# Test 8: Embedding service unavailable (returns None) → graceful fallback
# ---------------------------------------------------------------------------
def test_embedding_service_unavailable_passthrough():
def test_embedding_service_unavailable_passthrough(tmp_path):
"""When get_embedder() returns None, suppress() falls back without crashing."""
db_path = _make_db_with_incidents([("OOM", "Memory pressure")])
db_path = _make_db_with_incidents(
[("OOM", "Memory pressure")],
tmp_path / "turnstone.db",
)
suppressor = FalsePositiveSuppressor(model_id="test-model")
with patch.object(suppressor, "_load_embedder", return_value=None):
@ -329,10 +336,13 @@ def test_embedding_service_unavailable_passthrough():
# Test 9: Corpus cache invalidated when corpus changes
# ---------------------------------------------------------------------------
def test_corpus_cache_invalidated_on_corpus_change():
def test_corpus_cache_invalidated_on_corpus_change(tmp_path):
"""When the corpus changes between calls, embed_batch is called again."""
# First DB: one incident
db_path = _make_db_with_incidents([("OOM", "Memory pressure")])
db_path = _make_db_with_incidents(
[("OOM", "Memory pressure")],
tmp_path / "turnstone.db",
)
corpus_vec_1 = [1.0] + [0.0] * 383
corpus_vec_2 = [0.0, 1.0] + [0.0] * 382