turnstone/tests/test_incident_detector.py
pyr0ball a9d8171fe8 feat(incidents): auto-incident detection + example-node Podman setup
Auto-incident detector:
- New app/tasks/incident_detector.py: post-glean error cluster detector
  - Sliding window algorithm: source + N errors within window_s seconds
  - Deduplication via issue_type='auto:{source_id}' + interval overlap check
  - Respects TURNSTONE_AUTO_INCIDENT_THRESHOLD (default 5) and
    TURNSTONE_AUTO_INCIDENT_WINDOW (default 600s) env vars
  - 20 tests all passing
- Wired into glean_scheduler.run_once() and scheduler_loop()
- TURNSTONE_AUTO_INCIDENT env var to disable (default enabled)

Podman standalone improvements:
- REPO_DIR auto-detected from script location (no longer hardcoded to /opt/turnstone)
- DATA_DIR/PATTERNS_DIR/HF_CACHE_DIR configurable via env vars
- Bootstrap step copies host-specific sources-<hostname>.yaml on first run
- Auto-incident env vars passed through

example-node sources:
- patterns/sources-example-node.yaml: Sonarr, Radarr, Bazarr, Prowlarr,
  Tautulli, autoscan, organizr, nextcloud, journal export
2026-06-11 18:37:53 -07:00

238 lines
9.2 KiB
Python

"""Tests for app/tasks/incident_detector.py auto-incident detection."""
from __future__ import annotations
import sqlite3
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.db import ensure_schema, ensure_incidents_schema
from app.services.incidents import create_incident, list_incidents
from app.tasks.incident_detector import (
_find_clusters,
_incident_exists_for_cluster,
_parse_ts,
detect_and_create,
)
# ── Helpers ────────────────────────────────────────────────────────────────────
def _make_db(path: Path) -> None:
ensure_schema(path)
def _make_incidents_db(path: Path) -> None:
ensure_incidents_schema(path)
def _iso(base: datetime, offset_s: float) -> str:
return (base + timedelta(seconds=offset_s)).isoformat()
def _insert_entry(db: Path, source_id: str, ts_iso: str, severity: str, ingest_time: str) -> None:
with sqlite3.connect(db) as conn:
conn.execute(
"INSERT INTO log_entries (id, source_id, sequence, timestamp_iso, ingest_time, "
"severity, text, repeat_count, out_of_order, matched_patterns, tenant_id) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?)",
(
f"{source_id}-{ts_iso}", source_id, 0, ts_iso, ingest_time,
severity, "error text", 0, 0, "[]", "",
),
)
# ── _parse_ts ──────────────────────────────────────────────────────────────────
class TestParseTs:
def test_parses_utc_iso(self) -> None:
ts = _parse_ts("2026-06-11T12:00:00+00:00")
assert ts is not None
assert ts > 0
def test_parses_z_suffix(self) -> None:
ts = _parse_ts("2026-06-11T12:00:00Z")
assert ts is not None
def test_none_input(self) -> None:
assert _parse_ts(None) is None
def test_invalid_input(self) -> None:
assert _parse_ts("not-a-date") is None
# ── _find_clusters ─────────────────────────────────────────────────────────────
class TestFindClusters:
BASE = datetime(2026, 6, 11, 12, 0, 0, tzinfo=timezone.utc)
def _events(self, offsets: list[float], severity: str = "ERROR") -> list[dict]:
return [{"timestamp_iso": _iso(self.BASE, o), "severity": severity} for o in offsets]
def test_dense_cluster_detected(self) -> None:
events = self._events([0, 60, 120, 180, 240]) # 5 errors in 4 min
clusters = _find_clusters(events, window_s=600, threshold=5)
assert len(clusters) == 1
def test_sparse_events_no_cluster(self) -> None:
events = self._events([0, 300, 600, 900, 1200]) # 5 errors, each 5 min apart
clusters = _find_clusters(events, window_s=60, threshold=5)
assert clusters == []
def test_threshold_not_met(self) -> None:
events = self._events([0, 10, 20, 30]) # only 4 events
clusters = _find_clusters(events, window_s=600, threshold=5)
assert clusters == []
def test_critical_wins_over_error(self) -> None:
events = self._events([0, 10, 20, 30, 40], "ERROR")
events[2]["severity"] = "CRITICAL"
clusters = _find_clusters(events, window_s=600, threshold=5)
assert clusters[0][2] == "CRITICAL"
def test_two_non_overlapping_clusters(self) -> None:
# Dense cluster at 0-4 min, then another at 60-64 min
e1 = self._events([0, 60, 120, 180, 240])
e2 = self._events([3600, 3660, 3720, 3780, 3840])
clusters = _find_clusters(e1 + e2, window_s=600, threshold=5)
assert len(clusters) == 2
def test_no_timestamp_events_skipped(self) -> None:
events = [{"timestamp_iso": None, "severity": "ERROR"}] * 10
clusters = _find_clusters(events, window_s=600, threshold=5)
assert clusters == []
# ── _incident_exists_for_cluster ───────────────────────────────────────────────
class TestIncidentExists:
BASE = datetime(2026, 6, 11, 12, 0, 0, tzinfo=timezone.utc)
def test_no_existing_incidents(self, tmp_path: Path) -> None:
db = tmp_path / "inc.db"
_make_incidents_db(db)
assert not _incident_exists_for_cluster(
db, "nginx", _iso(self.BASE, 0), _iso(self.BASE, 600)
)
def test_exact_overlap_detected(self, tmp_path: Path) -> None:
db = tmp_path / "inc.db"
_make_incidents_db(db)
create_incident(
db, label="Auto: nginx — 5 errors",
issue_type="auto:nginx",
started_at=_iso(self.BASE, 0),
ended_at=_iso(self.BASE, 600),
severity="high",
)
assert _incident_exists_for_cluster(
db, "nginx", _iso(self.BASE, 100), _iso(self.BASE, 400)
)
def test_different_source_not_matched(self, tmp_path: Path) -> None:
db = tmp_path / "inc.db"
_make_incidents_db(db)
create_incident(
db, label="Auto: caddy — 5 errors",
issue_type="auto:caddy",
started_at=_iso(self.BASE, 0),
ended_at=_iso(self.BASE, 600),
severity="high",
)
assert not _incident_exists_for_cluster(
db, "nginx", _iso(self.BASE, 0), _iso(self.BASE, 600)
)
def test_non_overlapping_not_matched(self, tmp_path: Path) -> None:
db = tmp_path / "inc.db"
_make_incidents_db(db)
create_incident(
db, label="Auto: nginx — 5 errors",
issue_type="auto:nginx",
started_at=_iso(self.BASE, 0),
ended_at=_iso(self.BASE, 300),
severity="high",
)
# Cluster starts after existing incident ends
assert not _incident_exists_for_cluster(
db, "nginx", _iso(self.BASE, 900), _iso(self.BASE, 1200)
)
# ── detect_and_create ──────────────────────────────────────────────────────────
class TestDetectAndCreate:
BASE = datetime(2026, 6, 11, 12, 0, 0, tzinfo=timezone.utc)
def _setup(self, tmp_path: Path) -> tuple[Path, Path]:
db = tmp_path / "ts.db"
idb = tmp_path / "incidents.db"
_make_db(db)
_make_incidents_db(idb)
return db, idb
def test_creates_incident_on_cluster(self, tmp_path: Path) -> None:
db, idb = self._setup(tmp_path)
ingest = _iso(self.BASE, -60)
for i in range(6):
_insert_entry(db, "nginx", _iso(self.BASE, i * 30), "ERROR", ingest)
result = detect_and_create(db, idb, since=_iso(self.BASE, -120))
assert result["created"] == 1
incidents = list_incidents(idb)
assert len(incidents) == 1
assert "nginx" in incidents[0].label
assert incidents[0].issue_type == "auto:nginx"
def test_no_incident_below_threshold(self, tmp_path: Path) -> None:
db, idb = self._setup(tmp_path)
ingest = _iso(self.BASE, -60)
for i in range(4): # only 4 errors — below default threshold of 5
_insert_entry(db, "nginx", _iso(self.BASE, i * 30), "ERROR", ingest)
result = detect_and_create(db, idb, since=_iso(self.BASE, -120), threshold=5)
assert result["created"] == 0
def test_no_duplicate_incidents(self, tmp_path: Path) -> None:
db, idb = self._setup(tmp_path)
ingest = _iso(self.BASE, -60)
for i in range(6):
_insert_entry(db, "nginx", _iso(self.BASE, i * 30), "ERROR", ingest)
detect_and_create(db, idb, since=_iso(self.BASE, -120))
detect_and_create(db, idb, since=_iso(self.BASE, -120)) # second run
incidents = list_incidents(idb)
assert len(incidents) == 1
def test_critical_severity_mapped_to_critical_label(self, tmp_path: Path) -> None:
db, idb = self._setup(tmp_path)
ingest = _iso(self.BASE, -60)
for i in range(6):
sev = "CRITICAL" if i == 0 else "ERROR"
_insert_entry(db, "sshd", _iso(self.BASE, i * 30), sev, ingest)
detect_and_create(db, idb, since=_iso(self.BASE, -120))
incidents = list_incidents(idb)
assert incidents[0].severity == "critical"
def test_empty_db_returns_zero(self, tmp_path: Path) -> None:
db, idb = self._setup(tmp_path)
result = detect_and_create(db, idb, since=None)
assert result["created"] == 0
def test_independent_sources_each_get_incident(self, tmp_path: Path) -> None:
db, idb = self._setup(tmp_path)
ingest = _iso(self.BASE, -60)
for src in ["caddy", "nginx"]:
for i in range(6):
_insert_entry(db, src, _iso(self.BASE, i * 30), "ERROR", ingest)
result = detect_and_create(db, idb, since=_iso(self.BASE, -120))
assert result["created"] == 2