Was suppressing when novelty_score < 0.85 (i.e. similarity > 0.15), which would suppress nearly every hypothesis once embeddings are active. Now suppresses when max_sim >= similarity_threshold (0.85), meaning only hypotheses that are 85%+ similar to a resolved incident are suppressed. Also renames suppress_threshold → similarity_threshold for clarity and adds a borderline boundary test (0.85 suppressed, 0.84 not suppressed). Closes: #29
275 lines
11 KiB
Python
275 lines
11 KiB
Python
"""Stage 4: False-Positive Suppressor — embedding cosine similarity.
|
|
|
|
Compares each hypothesis against a corpus of resolved incidents using
|
|
embedding cosine similarity. Hypotheses that closely match a previously
|
|
resolved incident are suppressed as likely false positives.
|
|
|
|
When no embedding model is configured or the service is unavailable, all
|
|
hypotheses pass through with novelty_score=1.0 (full novelty assumed).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.services.diagnose.models import Hypothesis, RankedHypothesis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Module-level corpus cache: db_path_str -> (corpus_texts, embeddings)
|
|
# Invalidated when the corpus text list changes between calls.
|
|
_corpus_cache: dict[str, tuple[list[str], Any]] = {}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cosine similarity helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
try:
|
|
import numpy as np
|
|
|
|
def _cosine_similarities(
|
|
query_emb: list[float], corpus_embs: list[list[float]]
|
|
) -> list[float]:
|
|
"""Batch cosine similarity of one query embedding against all corpus embeddings."""
|
|
q = np.array(query_emb, dtype=np.float32)
|
|
c = np.array(corpus_embs, dtype=np.float32)
|
|
q_norm = q / (np.linalg.norm(q) + 1e-10)
|
|
c_norm = c / (np.linalg.norm(c, axis=1, keepdims=True) + 1e-10)
|
|
return list(c_norm @ q_norm)
|
|
|
|
_HAS_NUMPY = True
|
|
|
|
except ImportError: # pragma: no cover
|
|
import math
|
|
|
|
_HAS_NUMPY = False
|
|
|
|
def _dot(a: list[float], b: list[float]) -> float:
|
|
return sum(x * y for x, y in zip(a, b))
|
|
|
|
def _norm(a: list[float]) -> float:
|
|
return math.sqrt(sum(x * x for x in a)) + 1e-10
|
|
|
|
def _cosine(a: list[float], b: list[float]) -> float:
|
|
return _dot(a, b) / (_norm(a) * _norm(b))
|
|
|
|
def _cosine_similarities(
|
|
query_emb: list[float], corpus_embs: list[list[float]]
|
|
) -> list[float]:
|
|
return [_cosine(query_emb, c) for c in corpus_embs]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _fetch_resolved_incidents(db_path: Path) -> list[str]:
|
|
"""Fetch resolved incident texts from SQLite.
|
|
|
|
Returns a list of non-empty combined strings for each resolved incident.
|
|
Returns an empty list on any error (missing table, connection failure, etc.).
|
|
"""
|
|
try:
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
cursor = conn.execute(
|
|
"SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200"
|
|
)
|
|
rows = cursor.fetchall()
|
|
except sqlite3.OperationalError as exc:
|
|
logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc)
|
|
return []
|
|
except sqlite3.Error as exc:
|
|
# Catches all remaining SQLite-family errors (IntegrityError, DatabaseError, etc.)
|
|
logger.warning("Unexpected SQLite error fetching resolved incidents (%s) — treating as empty corpus", exc)
|
|
return []
|
|
|
|
texts: list[str] = []
|
|
for label, notes in rows:
|
|
label = (label or "").strip()
|
|
notes = (notes or "").strip()
|
|
combined = f"{label}. {notes}" if label and notes else (label or notes)
|
|
if combined:
|
|
texts.append(combined)
|
|
return texts
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public class
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class FalsePositiveSuppressor:
|
|
"""Stage 4 of the multi-agent diagnose pipeline.
|
|
|
|
Uses embedding cosine similarity to detect hypotheses that closely match
|
|
previously resolved incidents and suppress them as likely false positives.
|
|
|
|
When model_id is empty or the embedding service is unavailable, all
|
|
hypotheses pass through with novelty_score=1.0 (no suppression).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
model_id: str = "",
|
|
device: str = "cpu",
|
|
similarity_threshold: float = 0.85,
|
|
) -> None:
|
|
self._model_id = model_id
|
|
self._device = device
|
|
# _device stored for future use when get_embedder() supports device selection
|
|
# Suppress when cosine similarity to a known resolved incident >= threshold.
|
|
# A threshold of 0.85 means "suppress if 85%+ similar to something already resolved."
|
|
self._similarity_threshold = similarity_threshold
|
|
|
|
def suppress(
|
|
self,
|
|
hypotheses: list[Hypothesis],
|
|
db_path: Path,
|
|
) -> list[RankedHypothesis]:
|
|
"""Rank hypotheses by novelty, suppressing those matching resolved incidents.
|
|
|
|
Args:
|
|
hypotheses: Candidate hypotheses from Stage 3.
|
|
db_path: Path to the Turnstone SQLite database containing incidents.
|
|
|
|
Returns:
|
|
List of RankedHypothesis sorted by (novelty_score * confidence) descending.
|
|
Non-suppressed hypotheses appear first in practice.
|
|
"""
|
|
if not hypotheses:
|
|
return []
|
|
|
|
# No model configured — full passthrough, rank by confidence only.
|
|
if not self._model_id:
|
|
return self._passthrough(hypotheses)
|
|
|
|
# Attempt to obtain an embedder; fall back to passthrough on failure.
|
|
embedder = self._load_embedder()
|
|
if embedder is None:
|
|
logger.warning(
|
|
"Embedding service unavailable for model %r — skipping suppression",
|
|
self._model_id,
|
|
)
|
|
return self._passthrough(hypotheses)
|
|
|
|
# Fetch corpus texts from DB; fall back to passthrough if corpus is empty.
|
|
corpus_texts = _fetch_resolved_incidents(db_path)
|
|
if not corpus_texts:
|
|
logger.debug("No resolved incidents found — all hypotheses treated as novel")
|
|
return self._passthrough(hypotheses)
|
|
|
|
# Embed corpus (with caching).
|
|
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path)
|
|
|
|
# Score each hypothesis and sort by novelty * confidence descending.
|
|
ranked = [
|
|
self._score_hypothesis(h, embedder, corpus_embeddings)
|
|
for h in hypotheses
|
|
]
|
|
ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True)
|
|
return ranked
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _score_hypothesis(
|
|
self,
|
|
hypothesis: Hypothesis,
|
|
embedder: Any,
|
|
corpus_embeddings: list[list[float]],
|
|
) -> RankedHypothesis:
|
|
"""Score a single hypothesis against the resolved incident corpus."""
|
|
try:
|
|
query_text = f"{hypothesis.title}. {hypothesis.description}"
|
|
h_emb = embedder.embed(query_text)
|
|
# Convert numpy array to plain Python list for _cosine_similarities
|
|
h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb)
|
|
sims = _cosine_similarities(h_emb_list, corpus_embeddings)
|
|
max_sim = float(max(sims)) if sims else 0.0
|
|
except Exception as exc:
|
|
# Broad catch is intentional: catches unknown embedder runtime errors
|
|
# (e.g. CUDA OOM, backend crashes) so one bad hypothesis never halts the pipeline.
|
|
logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", hypothesis.title, exc)
|
|
return RankedHypothesis(
|
|
hypothesis=hypothesis,
|
|
novelty_score=1.0,
|
|
similarity_to_known=0.0,
|
|
suppress=False,
|
|
suppression_reason=None,
|
|
)
|
|
|
|
novelty_score = 1.0 - max_sim
|
|
suppress = bool(max_sim >= self._similarity_threshold)
|
|
suppression_reason = (
|
|
f"Similar to resolved incident (similarity {max_sim:.2f})"
|
|
if suppress
|
|
else None
|
|
)
|
|
return RankedHypothesis(
|
|
hypothesis=hypothesis,
|
|
novelty_score=novelty_score,
|
|
similarity_to_known=max_sim,
|
|
suppress=suppress,
|
|
suppression_reason=suppression_reason,
|
|
)
|
|
|
|
def _load_embedder(self) -> Any | None:
|
|
"""Load the embedding service. Returns None if unavailable."""
|
|
try:
|
|
from app.services.embeddings import get_embedder
|
|
return get_embedder()
|
|
except Exception as exc:
|
|
# Broad catch is intentional: get_embedder() may raise on import or
|
|
# backend init failures from any number of third-party libraries.
|
|
logger.warning("Failed to import/initialise embedding service: %s", exc)
|
|
return None
|
|
|
|
def _get_corpus_embeddings(
|
|
self,
|
|
embedder: Any,
|
|
corpus_texts: list[str],
|
|
db_path: Path,
|
|
) -> list[list[float]]:
|
|
"""Return cached corpus embeddings, re-embedding if the corpus has changed."""
|
|
cache_key = str(db_path)
|
|
cached = _corpus_cache.get(cache_key)
|
|
|
|
if cached is not None:
|
|
cached_texts, cached_embeddings = cached
|
|
if cached_texts == corpus_texts:
|
|
return cached_embeddings
|
|
|
|
logger.debug("Embedding corpus of %d resolved incidents", len(corpus_texts))
|
|
try:
|
|
raw_embeddings = embedder.embed_batch(corpus_texts)
|
|
# Normalise each embedding to a plain Python list for portability
|
|
corpus_embeddings: list[list[float]] = [
|
|
e.tolist() if hasattr(e, "tolist") else list(e)
|
|
for e in raw_embeddings
|
|
]
|
|
except Exception as exc:
|
|
# Broad catch is intentional: embed_batch() may raise from any backend
|
|
# (network timeout, CUDA error, etc.) — treat as empty corpus so the
|
|
# pipeline can continue without suppression.
|
|
logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc)
|
|
return []
|
|
|
|
_corpus_cache[cache_key] = (corpus_texts, corpus_embeddings)
|
|
return corpus_embeddings
|
|
|
|
def _passthrough(self, hypotheses: list[Hypothesis]) -> list[RankedHypothesis]:
|
|
"""Return all hypotheses as non-suppressed, ranked by confidence descending."""
|
|
ranked = [
|
|
RankedHypothesis(
|
|
hypothesis=h,
|
|
novelty_score=1.0,
|
|
similarity_to_known=0.0,
|
|
suppress=False,
|
|
suppression_reason=None,
|
|
)
|
|
for h in hypotheses
|
|
]
|
|
ranked.sort(key=lambda rh: rh.hypothesis.confidence, reverse=True)
|
|
return ranked
|