"""Stage 4: False-Positive Suppressor — embedding cosine similarity. Compares each hypothesis against a corpus of resolved incidents using embedding cosine similarity. Hypotheses that closely match a previously resolved incident are suppressed as likely false positives. When no embedding model is configured or the service is unavailable, all hypotheses pass through with novelty_score=1.0 (full novelty assumed). """ from __future__ import annotations import logging import sqlite3 from pathlib import Path from typing import Any from app.services.diagnose.models import Hypothesis, RankedHypothesis logger = logging.getLogger(__name__) # Module-level corpus cache: db_path_str -> (corpus_texts, embeddings) # Invalidated when the corpus text list changes between calls. _corpus_cache: dict[str, tuple[list[str], Any]] = {} # --------------------------------------------------------------------------- # Cosine similarity helpers # --------------------------------------------------------------------------- try: import numpy as np def _cosine_similarities( query_emb: list[float], corpus_embs: list[list[float]] ) -> list[float]: """Batch cosine similarity of one query embedding against all corpus embeddings.""" q = np.array(query_emb, dtype=np.float32) c = np.array(corpus_embs, dtype=np.float32) q_norm = q / (np.linalg.norm(q) + 1e-10) c_norm = c / (np.linalg.norm(c, axis=1, keepdims=True) + 1e-10) return list(c_norm @ q_norm) _HAS_NUMPY = True except ImportError: # pragma: no cover import math _HAS_NUMPY = False def _dot(a: list[float], b: list[float]) -> float: return sum(x * y for x, y in zip(a, b)) def _norm(a: list[float]) -> float: return math.sqrt(sum(x * x for x in a)) + 1e-10 def _cosine(a: list[float], b: list[float]) -> float: return _dot(a, b) / (_norm(a) * _norm(b)) def _cosine_similarities( query_emb: list[float], corpus_embs: list[list[float]] ) -> list[float]: return [_cosine(query_emb, c) for c in corpus_embs] # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _fetch_resolved_incidents(db_path: Path) -> list[str]: """Fetch resolved incident texts from SQLite. Returns a list of non-empty combined strings for each resolved incident. Returns an empty list on any error (missing table, connection failure, etc.). """ try: with sqlite3.connect(str(db_path), timeout=30.0) as conn: cursor = conn.execute( "SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200" ) rows = cursor.fetchall() except sqlite3.OperationalError as exc: logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc) return [] except sqlite3.Error as exc: # Catches all remaining SQLite-family errors (IntegrityError, DatabaseError, etc.) logger.warning("Unexpected SQLite error fetching resolved incidents (%s) — treating as empty corpus", exc) return [] texts: list[str] = [] for label, notes in rows: label = (label or "").strip() notes = (notes or "").strip() combined = f"{label}. {notes}" if label and notes else (label or notes) if combined: texts.append(combined) return texts # --------------------------------------------------------------------------- # Public class # --------------------------------------------------------------------------- class FalsePositiveSuppressor: """Stage 4 of the multi-agent diagnose pipeline. Uses embedding cosine similarity to detect hypotheses that closely match previously resolved incidents and suppress them as likely false positives. When model_id is empty or the embedding service is unavailable, all hypotheses pass through with novelty_score=1.0 (no suppression). """ def __init__( self, model_id: str = "", device: str = "cpu", similarity_threshold: float = 0.85, ) -> None: self._model_id = model_id self._device = device # _device stored for future use when get_embedder() supports device selection # Suppress when cosine similarity to a known resolved incident >= threshold. # A threshold of 0.85 means "suppress if 85%+ similar to something already resolved." self._similarity_threshold = similarity_threshold def suppress( self, hypotheses: list[Hypothesis], db_path: Path, ) -> list[RankedHypothesis]: """Rank hypotheses by novelty, suppressing those matching resolved incidents. Args: hypotheses: Candidate hypotheses from Stage 3. db_path: Path to the Turnstone SQLite database containing incidents. Returns: List of RankedHypothesis sorted by (novelty_score * confidence) descending. Non-suppressed hypotheses appear first in practice. """ if not hypotheses: return [] # No model configured — full passthrough, rank by confidence only. if not self._model_id: return self._passthrough(hypotheses) # Attempt to obtain an embedder; fall back to passthrough on failure. embedder = self._load_embedder() if embedder is None: logger.warning( "Embedding service unavailable for model %r — skipping suppression", self._model_id, ) return self._passthrough(hypotheses) # Fetch corpus texts from DB; fall back to passthrough if corpus is empty. corpus_texts = _fetch_resolved_incidents(db_path) if not corpus_texts: logger.debug("No resolved incidents found — all hypotheses treated as novel") return self._passthrough(hypotheses) # Embed corpus (with caching). corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path) # Score each hypothesis and sort by novelty * confidence descending. ranked = [ self._score_hypothesis(h, embedder, corpus_embeddings) for h in hypotheses ] ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True) return ranked # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _score_hypothesis( self, hypothesis: Hypothesis, embedder: Any, corpus_embeddings: list[list[float]], ) -> RankedHypothesis: """Score a single hypothesis against the resolved incident corpus.""" try: query_text = f"{hypothesis.title}. {hypothesis.description}" h_emb = embedder.embed(query_text) # Convert numpy array to plain Python list for _cosine_similarities h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb) sims = _cosine_similarities(h_emb_list, corpus_embeddings) max_sim = float(max(sims)) if sims else 0.0 except Exception as exc: # Broad catch is intentional: catches unknown embedder runtime errors # (e.g. CUDA OOM, backend crashes) so one bad hypothesis never halts the pipeline. logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", hypothesis.title, exc) return RankedHypothesis( hypothesis=hypothesis, novelty_score=1.0, similarity_to_known=0.0, suppress=False, suppression_reason=None, ) novelty_score = 1.0 - max_sim suppress = bool(max_sim >= self._similarity_threshold) suppression_reason = ( f"Similar to resolved incident (similarity {max_sim:.2f})" if suppress else None ) return RankedHypothesis( hypothesis=hypothesis, novelty_score=novelty_score, similarity_to_known=max_sim, suppress=suppress, suppression_reason=suppression_reason, ) def _load_embedder(self) -> Any | None: """Load the embedding service. Returns None if unavailable.""" try: from app.services.embeddings import get_embedder return get_embedder() except Exception as exc: # Broad catch is intentional: get_embedder() may raise on import or # backend init failures from any number of third-party libraries. logger.warning("Failed to import/initialise embedding service: %s", exc) return None def _get_corpus_embeddings( self, embedder: Any, corpus_texts: list[str], db_path: Path, ) -> list[list[float]]: """Return cached corpus embeddings, re-embedding if the corpus has changed.""" cache_key = str(db_path) cached = _corpus_cache.get(cache_key) if cached is not None: cached_texts, cached_embeddings = cached if cached_texts == corpus_texts: return cached_embeddings logger.debug("Embedding corpus of %d resolved incidents", len(corpus_texts)) try: raw_embeddings = embedder.embed_batch(corpus_texts) # Normalise each embedding to a plain Python list for portability corpus_embeddings: list[list[float]] = [ e.tolist() if hasattr(e, "tolist") else list(e) for e in raw_embeddings ] except Exception as exc: # Broad catch is intentional: embed_batch() may raise from any backend # (network timeout, CUDA error, etc.) — treat as empty corpus so the # pipeline can continue without suppression. logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc) return [] _corpus_cache[cache_key] = (corpus_texts, corpus_embeddings) return corpus_embeddings def _passthrough(self, hypotheses: list[Hypothesis]) -> list[RankedHypothesis]: """Return all hypotheses as non-suppressed, ranked by confidence descending.""" ranked = [ RankedHypothesis( hypothesis=h, novelty_score=1.0, similarity_to_known=0.0, suppress=False, suppression_reason=None, ) for h in hypotheses ] ranked.sort(key=lambda rh: rh.hypothesis.confidence, reverse=True) return ranked