turnstone/app/services/diagnose/suppressor.py
pyr0ball 5b151c2509 fix: split incidents tables to dedicated turnstone-incidents.db (#60)
FTS5 bulk-insert write locks starved the incident API and bundle endpoints
during log bursts (sonarr/radarr, high-volume docker sources). Fix mirrors
the context_facts split (context -> turnstone-context.db):

- Add INCIDENTS_DB_PATH / TURNSTONE_INCIDENTS_DB env var in rest.py
- Add _INCIDENTS_SCHEMA, ensure_incidents_schema(), and
  migrate_incidents_to_dedicated_db() in glean/pipeline.py
- Stub out incidents/received_bundles/sent_bundles in _SCHEMA (no-op
  CREATE IF NOT EXISTS) so legacy single-file deployments still open
- Thread incidents_db_path through diagnose_stream -> run_pipeline ->
  FalsePositiveSuppressor.suppress -> _fetch_resolved_incidents
- One-shot migration on startup: copy existing rows from main DB to
  incidents DB via INSERT OR IGNORE (idempotent, safe to re-run)
- Fix test_blocklist_endpoints fixtures to patch CONTEXT_DB_PATH and
  INCIDENTS_DB_PATH alongside DB_PATH (worktree has no data/ dir)

372 tests passing.

Closes: #60
2026-06-01 15:54:23 -07:00

275 lines
11 KiB
Python

"""Stage 4: False-Positive Suppressor — embedding cosine similarity.
Compares each hypothesis against a corpus of resolved incidents using
embedding cosine similarity. Hypotheses that closely match a previously
resolved incident are suppressed as likely false positives.
When no embedding model is configured or the service is unavailable, all
hypotheses pass through with novelty_score=1.0 (full novelty assumed).
"""
from __future__ import annotations
import logging
import sqlite3
from pathlib import Path
from typing import Any
from app.services.diagnose.models import Hypothesis, RankedHypothesis
logger = logging.getLogger(__name__)
# Module-level corpus cache: db_path_str -> (corpus_texts, embeddings)
# Invalidated when the corpus text list changes between calls.
_corpus_cache: dict[str, tuple[list[str], Any]] = {}
# ---------------------------------------------------------------------------
# Cosine similarity helpers
# ---------------------------------------------------------------------------
try:
import numpy as np
def _cosine_similarities(
query_emb: list[float], corpus_embs: list[list[float]]
) -> list[float]:
"""Batch cosine similarity of one query embedding against all corpus embeddings."""
q = np.array(query_emb, dtype=np.float32)
c = np.array(corpus_embs, dtype=np.float32)
q_norm = q / (np.linalg.norm(q) + 1e-10)
c_norm = c / (np.linalg.norm(c, axis=1, keepdims=True) + 1e-10)
return list(c_norm @ q_norm)
_HAS_NUMPY = True
except ImportError: # pragma: no cover
import math
_HAS_NUMPY = False
def _dot(a: list[float], b: list[float]) -> float:
return sum(x * y for x, y in zip(a, b))
def _norm(a: list[float]) -> float:
return math.sqrt(sum(x * x for x in a)) + 1e-10
def _cosine(a: list[float], b: list[float]) -> float:
return _dot(a, b) / (_norm(a) * _norm(b))
def _cosine_similarities(
query_emb: list[float], corpus_embs: list[list[float]]
) -> list[float]:
return [_cosine(query_emb, c) for c in corpus_embs]
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def _fetch_resolved_incidents(incidents_db_path: Path) -> list[str]:
"""Fetch resolved incident texts from the incidents database.
Returns a list of non-empty combined strings for each resolved incident.
Returns an empty list on any error (missing table, connection failure, etc.).
"""
try:
with sqlite3.connect(str(incidents_db_path), timeout=30.0) as conn:
cursor = conn.execute(
"SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200"
)
rows = cursor.fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Could not query resolved incidents (%s) — treating as empty corpus", exc)
return []
except sqlite3.Error as exc:
# Catches all remaining SQLite-family errors (IntegrityError, DatabaseError, etc.)
logger.warning("Unexpected SQLite error fetching resolved incidents (%s) — treating as empty corpus", exc)
return []
texts: list[str] = []
for label, notes in rows:
label = (label or "").strip()
notes = (notes or "").strip()
combined = f"{label}. {notes}" if label and notes else (label or notes)
if combined:
texts.append(combined)
return texts
# ---------------------------------------------------------------------------
# Public class
# ---------------------------------------------------------------------------
class FalsePositiveSuppressor:
"""Stage 4 of the multi-agent diagnose pipeline.
Uses embedding cosine similarity to detect hypotheses that closely match
previously resolved incidents and suppress them as likely false positives.
When model_id is empty or the embedding service is unavailable, all
hypotheses pass through with novelty_score=1.0 (no suppression).
"""
def __init__(
self,
model_id: str = "",
device: str = "cpu",
similarity_threshold: float = 0.85,
) -> None:
self._model_id = model_id
self._device = device
# _device stored for future use when get_embedder() supports device selection
# Suppress when cosine similarity to a known resolved incident >= threshold.
# A threshold of 0.85 means "suppress if 85%+ similar to something already resolved."
self._similarity_threshold = similarity_threshold
def suppress(
self,
hypotheses: list[Hypothesis],
incidents_db_path: Path,
) -> list[RankedHypothesis]:
"""Rank hypotheses by novelty, suppressing those matching resolved incidents.
Args:
hypotheses: Candidate hypotheses from Stage 3.
incidents_db_path: Path to the dedicated incidents SQLite database.
Returns:
List of RankedHypothesis sorted by (novelty_score * confidence) descending.
Non-suppressed hypotheses appear first in practice.
"""
if not hypotheses:
return []
# No model configured — full passthrough, rank by confidence only.
if not self._model_id:
return self._passthrough(hypotheses)
# Attempt to obtain an embedder; fall back to passthrough on failure.
embedder = self._load_embedder()
if embedder is None:
logger.warning(
"Embedding service unavailable for model %r — skipping suppression",
self._model_id,
)
return self._passthrough(hypotheses)
# Fetch corpus texts from incidents DB; fall back to passthrough if empty.
corpus_texts = _fetch_resolved_incidents(incidents_db_path)
if not corpus_texts:
logger.debug("No resolved incidents found — all hypotheses treated as novel")
return self._passthrough(hypotheses)
# Embed corpus (with caching).
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, incidents_db_path)
# Score each hypothesis and sort by novelty * confidence descending.
ranked = [
self._score_hypothesis(h, embedder, corpus_embeddings)
for h in hypotheses
]
ranked.sort(key=lambda rh: rh.novelty_score * rh.hypothesis.confidence, reverse=True)
return ranked
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _score_hypothesis(
self,
hypothesis: Hypothesis,
embedder: Any,
corpus_embeddings: list[list[float]],
) -> RankedHypothesis:
"""Score a single hypothesis against the resolved incident corpus."""
try:
query_text = f"{hypothesis.title}. {hypothesis.description}"
h_emb = embedder.embed(query_text)
# Convert numpy array to plain Python list for _cosine_similarities
h_emb_list: list[float] = h_emb.tolist() if hasattr(h_emb, "tolist") else list(h_emb)
sims = _cosine_similarities(h_emb_list, corpus_embeddings)
max_sim = float(max(sims)) if sims else 0.0
except Exception as exc:
# Broad catch is intentional: catches unknown embedder runtime errors
# (e.g. CUDA OOM, backend crashes) so one bad hypothesis never halts the pipeline.
logger.warning("Embedding failed for hypothesis %r: %s — treating as novel", hypothesis.title, exc)
return RankedHypothesis(
hypothesis=hypothesis,
novelty_score=1.0,
similarity_to_known=0.0,
suppress=False,
suppression_reason=None,
)
novelty_score = 1.0 - max_sim
suppress = bool(max_sim >= self._similarity_threshold)
suppression_reason = (
f"Similar to resolved incident (similarity {max_sim:.2f})"
if suppress
else None
)
return RankedHypothesis(
hypothesis=hypothesis,
novelty_score=novelty_score,
similarity_to_known=max_sim,
suppress=suppress,
suppression_reason=suppression_reason,
)
def _load_embedder(self) -> Any | None:
"""Load the embedding service. Returns None if unavailable."""
try:
from app.services.embeddings import get_embedder
return get_embedder()
except Exception as exc:
# Broad catch is intentional: get_embedder() may raise on import or
# backend init failures from any number of third-party libraries.
logger.warning("Failed to import/initialise embedding service: %s", exc)
return None
def _get_corpus_embeddings(
self,
embedder: Any,
corpus_texts: list[str],
incidents_db_path: Path,
) -> list[list[float]]:
"""Return cached corpus embeddings, re-embedding if the corpus has changed."""
cache_key = str(incidents_db_path)
cached = _corpus_cache.get(cache_key)
if cached is not None:
cached_texts, cached_embeddings = cached
if cached_texts == corpus_texts:
return cached_embeddings
logger.debug("Embedding corpus of %d resolved incidents", len(corpus_texts))
try:
raw_embeddings = embedder.embed_batch(corpus_texts)
# Normalise each embedding to a plain Python list for portability
corpus_embeddings: list[list[float]] = [
e.tolist() if hasattr(e, "tolist") else list(e)
for e in raw_embeddings
]
except Exception as exc:
# Broad catch is intentional: embed_batch() may raise from any backend
# (network timeout, CUDA error, etc.) — treat as empty corpus so the
# pipeline can continue without suppression.
logger.warning("Corpus embedding failed: %s — treating as empty corpus", exc)
return []
_corpus_cache[cache_key] = (corpus_texts, corpus_embeddings)
return corpus_embeddings
def _passthrough(self, hypotheses: list[Hypothesis]) -> list[RankedHypothesis]:
"""Return all hypotheses as non-suppressed, ranked by confidence descending."""
ranked = [
RankedHypothesis(
hypothesis=h,
novelty_score=1.0,
similarity_to_known=0.0,
suppress=False,
suppression_reason=None,
)
for h in hypotheses
]
ranked.sort(key=lambda rh: rh.hypothesis.confidence, reverse=True)
return ranked