From b6b69e2150c791d97a712d61d246d84f2c8596ec Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 11 Jun 2026 18:37:53 -0700 Subject: [PATCH] feat(incidents): auto-incident detection + example-node Podman setup Auto-incident detector: - New app/tasks/incident_detector.py: post-glean error cluster detector - Sliding window algorithm: source + N errors within window_s seconds - Deduplication via issue_type='auto:{source_id}' + interval overlap check - Respects TURNSTONE_AUTO_INCIDENT_THRESHOLD (default 5) and TURNSTONE_AUTO_INCIDENT_WINDOW (default 600s) env vars - 20 tests all passing - Wired into glean_scheduler.run_once() and scheduler_loop() - TURNSTONE_AUTO_INCIDENT env var to disable (default enabled) Podman standalone improvements: - REPO_DIR auto-detected from script location (no longer hardcoded to /opt/turnstone) - DATA_DIR/PATTERNS_DIR/HF_CACHE_DIR configurable via env vars - Bootstrap step copies host-specific sources-.yaml on first run - Auto-incident env vars passed through example-node sources: - patterns/sources-example-node.yaml: Sonarr, Radarr, Bazarr, Prowlarr, Tautulli, autoscan, organizr, nextcloud, journal export --- app/rest.py | 3 + app/tasks/glean_scheduler.py | 15 ++ app/tasks/incident_detector.py | 188 +++++++++++++++++++++++++ patterns/sources-example.yaml | 49 +++++++ podman-standalone.sh | 35 +++-- tests/test_incident_detector.py | 238 ++++++++++++++++++++++++++++++++ 6 files changed, 520 insertions(+), 8 deletions(-) create mode 100644 app/tasks/incident_detector.py create mode 100644 patterns/sources-example.yaml create mode 100644 tests/test_incident_detector.py diff --git a/app/rest.py b/app/rest.py index 4101253..a62070f 100644 --- a/app/rest.py +++ b/app/rest.py @@ -119,6 +119,7 @@ ANOMALY_THRESHOLD = float(os.environ.get("TURNSTONE_ANOMALY_THRESHOLD", "0.75")) CYBERSEC_MODEL = os.environ.get("TURNSTONE_CYBERSEC_MODEL", "") CYBERSEC_DEVICE = os.environ.get("TURNSTONE_CYBERSEC_DEVICE", "cpu") CYBERSEC_THRESHOLD = float(os.environ.get("TURNSTONE_CYBERSEC_THRESHOLD", "0.60")) +AUTO_INCIDENT = os.environ.get("TURNSTONE_AUTO_INCIDENT", "true").lower() not in ("0", "false", "no") # When set, all /api/ routes require Authorization: Bearer . # Unset (default) means no authentication — suitable for local-only deployments. _API_KEY: str | None = os.environ.get("TURNSTONE_API_KEY") or None @@ -181,6 +182,8 @@ async def _lifespan(app: FastAPI): cybersec_model=CYBERSEC_MODEL, cybersec_device=CYBERSEC_DEVICE, cybersec_threshold=CYBERSEC_THRESHOLD, + incidents_db_path=INCIDENTS_DB_PATH, + auto_incident=AUTO_INCIDENT, ), name="glean-scheduler", ) diff --git a/app/tasks/glean_scheduler.py b/app/tasks/glean_scheduler.py index fa05040..edf9255 100644 --- a/app/tasks/glean_scheduler.py +++ b/app/tasks/glean_scheduler.py @@ -22,6 +22,7 @@ import httpx from app.glean.pipeline import glean_sources from app.tasks.anomaly_scorer import run_once as _run_scorer from app.tasks.cybersec_scorer import run_once as _run_cybersec +from app.tasks.incident_detector import run_once as _run_incident_detector logger = logging.getLogger(__name__) @@ -131,6 +132,8 @@ async def run_once( cybersec_model: str = "", cybersec_device: str = "cpu", cybersec_threshold: float = 0.60, + incidents_db_path: Path | None = None, + auto_incident: bool = True, ) -> dict[str, Any]: """Ingest all sources once, then submit matched entries if configured. @@ -177,6 +180,12 @@ async def run_once( if cybersec_model: await _run_cybersec(db_path, cybersec_model, cybersec_device, threshold=cybersec_threshold) + if auto_incident and incidents_db_path: + glean_started_iso = _state.last_run_at + result = await _run_incident_detector(db_path, incidents_db_path, since=glean_started_iso) + if result["created"]: + logger.info("Incident detector: %d incident(s) auto-created", result["created"]) + return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s} @@ -193,6 +202,8 @@ async def scheduler_loop( cybersec_model: str = "", cybersec_device: str = "cpu", cybersec_threshold: float = 0.60, + incidents_db_path: Path | None = None, + auto_incident: bool = True, ) -> None: """Run glean + optional submission + optional anomaly/cybersec scoring every interval_s seconds.""" logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file) @@ -202,6 +213,8 @@ async def scheduler_loop( logger.info("Anomaly scoring enabled — model: %s", anomaly_model) if cybersec_model: logger.info("Cybersec scoring enabled — model: %s", cybersec_model) + if auto_incident and incidents_db_path: + logger.info("Auto-incident detection enabled") while True: await run_once( sources_file, db_path, pattern_file, submit_endpoint, source_host, @@ -211,6 +224,8 @@ async def scheduler_loop( cybersec_model=cybersec_model, cybersec_device=cybersec_device, cybersec_threshold=cybersec_threshold, + incidents_db_path=incidents_db_path, + auto_incident=auto_incident, ) next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s) _state.next_run_at = next_run.isoformat() diff --git a/app/tasks/incident_detector.py b/app/tasks/incident_detector.py new file mode 100644 index 0000000..6a62b2f --- /dev/null +++ b/app/tasks/incident_detector.py @@ -0,0 +1,188 @@ +"""Post-glean automatic incident detection. + +After each batch glean, scan entries ingested since the last run for +ERROR/CRITICAL clusters. If a source produces >= threshold errors within +window_s seconds, auto-create an incident unless one already exists for +that source in that time window. + +Environment variables (all optional): + TURNSTONE_AUTO_INCIDENT_THRESHOLD integer, default 5 + TURNSTONE_AUTO_INCIDENT_WINDOW seconds, default 600 (10 min) +""" +from __future__ import annotations + +import asyncio +import logging +import os +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path + +from app.db import get_conn, resolve_tenant_id +from app.services.incidents import create_incident + +logger = logging.getLogger(__name__) + +_THRESHOLD = int(os.environ.get("TURNSTONE_AUTO_INCIDENT_THRESHOLD", "5")) +_WINDOW_S = int(os.environ.get("TURNSTONE_AUTO_INCIDENT_WINDOW", "600")) + +# Severity rank — used to pick the cluster's worst severity +_SEV_RANK = {"CRITICAL": 3, "ERROR": 2, "WARN": 1, "INFO": 0, "DEBUG": 0} + + +def _query_recent_errors(db_path: Path, since: str | None) -> list[dict]: + tid = resolve_tenant_id() + with get_conn(db_path) as conn: + if since: + rows = conn.execute( + """ + SELECT source_id, timestamp_iso, severity + FROM log_entries + WHERE severity IN ('ERROR', 'CRITICAL') + AND ingest_time > ? + AND (tenant_id = ? OR tenant_id = '') + ORDER BY source_id, timestamp_iso ASC + """, + (since, tid), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT source_id, timestamp_iso, severity + FROM log_entries + WHERE severity IN ('ERROR', 'CRITICAL') + AND (tenant_id = ? OR tenant_id = '') + ORDER BY source_id, timestamp_iso ASC + LIMIT 10000 + """, + (tid,), + ).fetchall() + return [dict(r) for r in rows] + + +def _parse_ts(iso: str | None) -> float | None: + """Parse ISO timestamp to epoch seconds; return None on failure.""" + if not iso: + return None + try: + dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) + return dt.timestamp() + except (ValueError, TypeError): + return None + + +def _find_clusters( + events: list[dict], window_s: int, threshold: int +) -> list[tuple[str, str, str]]: + """Return (started_at_iso, ended_at_iso, worst_severity) for each cluster.""" + # Filter to events with parseable timestamps, sorted ascending + timed = [] + for e in events: + t = _parse_ts(e["timestamp_iso"]) + if t is not None: + timed.append((t, e["timestamp_iso"], e["severity"])) + timed.sort() + + clusters: list[tuple[str, str, str]] = [] + i = 0 + while i < len(timed): + j = i + while j < len(timed) and timed[j][0] - timed[i][0] <= window_s: + j += 1 + count = j - i + if count >= threshold: + worst = max((timed[k][2] for k in range(i, j)), key=lambda s: _SEV_RANK.get(s, 0)) + clusters.append((timed[i][1], timed[j - 1][1], worst)) + i = j # skip past the cluster to avoid overlap + else: + i += 1 + return clusters + + +def _incident_exists_for_cluster( + incidents_db_path: Path, source_id: str, started_at: str, ended_at: str +) -> bool: + """Return True if an auto-incident for this source already covers the window.""" + issue_type = f"auto:{source_id}" + start_ts = _parse_ts(started_at) + end_ts = _parse_ts(ended_at) + if start_ts is None or end_ts is None: + return False + tid = resolve_tenant_id() + with get_conn(incidents_db_path) as conn: + rows = conn.execute( + """ + SELECT started_at, ended_at FROM incidents + WHERE issue_type = ? + AND (tenant_id = ? OR tenant_id = '') + """, + (issue_type, tid), + ).fetchall() + for row in rows: + ex_start = _parse_ts(row["started_at"]) + ex_end = _parse_ts(row["ended_at"]) + if ex_start is None or ex_end is None: + continue + # Overlap check: two intervals [a,b] and [c,d] overlap when a<=d and b>=c + if ex_start <= end_ts and ex_end >= start_ts: + return True + return False + + +def detect_and_create( + db_path: Path, + incidents_db_path: Path, + since: str | None, + threshold: int = _THRESHOLD, + window_s: int = _WINDOW_S, +) -> dict[str, int]: + """Detect error clusters and create incidents. Returns {"created": N}.""" + entries = _query_recent_errors(db_path, since) + if not entries: + return {"created": 0} + + by_source: dict[str, list[dict]] = defaultdict(list) + for e in entries: + by_source[e["source_id"]].append(e) + + created = 0 + for source_id, events in by_source.items(): + clusters = _find_clusters(events, window_s, threshold) + for started_at, ended_at, worst_sev in clusters: + if _incident_exists_for_cluster(incidents_db_path, source_id, started_at, ended_at): + continue + n = len(events) # event count for this source in the glean window + sev_label = "critical" if worst_sev == "CRITICAL" else "high" + create_incident( + incidents_db_path, + label=f"Auto: {source_id} — {n} errors", + issue_type=f"auto:{source_id}", + started_at=started_at, + ended_at=ended_at, + notes="Auto-detected error cluster. Review and label as needed.", + severity=sev_label, + ) + logger.info( + "Auto-incident created: source=%s window=[%s, %s] severity=%s", + source_id, started_at, ended_at, sev_label, + ) + created += 1 + + if created: + logger.info("Incident detector: %d new incident(s) created", created) + return {"created": created} + + +async def run_once( + db_path: Path, + incidents_db_path: Path, + since: str | None, + threshold: int = _THRESHOLD, + window_s: int = _WINDOW_S, +) -> dict[str, int]: + """Async wrapper — runs detection in a thread to avoid blocking the event loop.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + lambda: detect_and_create(db_path, incidents_db_path, since, threshold, window_s), + ) diff --git a/patterns/sources-example.yaml b/patterns/sources-example.yaml new file mode 100644 index 0000000..3aefafc --- /dev/null +++ b/patterns/sources-example.yaml @@ -0,0 +1,49 @@ +# Turnstone log sources — example-node.tv +# +# Container paths: /opt and /var/log are bind-mounted read-only. +# journal-export.jsonl is written to /data/ by export_journal.sh (run via cron before glean). +# +# Add or remove sources freely. Missing paths are skipped with a warning. + +sources: + # ── System ──────────────────────────────────────────────────────────────── + # Requires: cron job to run export_journal.sh before each glean. + # Example cron (every 15 min, run as x, add via: crontab -e): + # */15 * * * * /Library/Development/CircuitForge/turnstone/scripts/export_journal.sh \ + # /opt/turnstone-data/ + - id: system-journal + path: /data/journal-export.jsonl + + - id: dmesg + path: /data/dmesg-export.txt + + # ── Servarr stack ───────────────────────────────────────────────────────── + - id: sonarr + path: /opt/sonarr/config/logs/sonarr.0.txt + + - id: radarr + path: /opt/radarr/config/logs/radarr.0.txt + + - id: bazarr + path: /opt/bazarr/config/log/bazarr.log + + - id: prowlarr + path: /opt/prowlarr/config/logs/prowlarr.0.txt + + # ── Media server / tracking ──────────────────────────────────────────────── + - id: tautulli + path: /opt/tautulli/config/logs/plex_websocket.log + + # ── Download automation ──────────────────────────────────────────────────── + - id: autoscan + path: /opt/autoscan/config/autoscan.log + + # ── Web / proxy ──────────────────────────────────────────────────────────── + - id: organizr-nginx + path: /opt/organizr/log/nginx/error.log + + - id: organizr-app + path: /opt/organizr/www/organizr/server.log + + - id: nextcloud-nginx + path: /opt/nextcloud/config/log/nginx/error.log diff --git a/podman-standalone.sh b/podman-standalone.sh index 469c490..72b0fd9 100755 --- a/podman-standalone.sh +++ b/podman-standalone.sh @@ -59,11 +59,14 @@ # set -euo pipefail -REPO_DIR=/opt/turnstone -DATA_DIR=/opt/turnstone/data -PATTERNS_DIR=/opt/turnstone/patterns -HF_CACHE_DIR=/opt/turnstone/hf-cache # persists downloaded ML models across restarts -TZ=America/Los_Angeles +# Auto-detect repo from script location — works whether cloned to /opt/turnstone +# or to /Library/Development/CircuitForge/turnstone or any other path. +REPO_DIR="${TURNSTONE_REPO_DIR:-$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)}" +# Data and patterns live OUTSIDE the repo so they survive git pulls. +DATA_DIR="${TURNSTONE_DATA_DIR:-/opt/turnstone-data}" +PATTERNS_DIR="${TURNSTONE_PATTERNS_DIR:-${DATA_DIR}/patterns}" +HF_CACHE_DIR="${TURNSTONE_HF_CACHE:-${DATA_DIR}/hf-cache}" +TZ="${TZ:-America/Los_Angeles}" # ── Bundle push configuration ──────────────────────────────────────────────── # Set TURNSTONE_BUNDLE_ENDPOINT before running this script to enable the @@ -114,13 +117,26 @@ TZ=America/Los_Angeles # Must be run as root (sudo bash podman-standalone.sh) — rootful Podman only. # +# Bootstrap data and patterns dirs if this is a first run +mkdir -p "${DATA_DIR}" "${PATTERNS_DIR}" "${HF_CACHE_DIR}" +# Copy default patterns if the dir is empty (first run only) +if [ -z "$(ls -A "${PATTERNS_DIR}")" ]; then + cp "${REPO_DIR}/patterns/default.yaml" "${PATTERNS_DIR}/" + # Copy host-specific sources if present, otherwise copy the generic template + HOST_SOURCES="${REPO_DIR}/patterns/sources-$(hostname).yaml" + if [ -f "${HOST_SOURCES}" ]; then + cp "${HOST_SOURCES}" "${PATTERNS_DIR}/sources.yaml" + echo "==> Installed host-specific sources: ${HOST_SOURCES}" + else + cp "${REPO_DIR}/patterns/sources.yaml" "${PATTERNS_DIR}/" + echo "==> Installed default sources.yaml — edit ${PATTERNS_DIR}/sources.yaml for this host" + fi +fi + # Build image from current source (bakes app/ code into the image) echo "Building Turnstone image..." podman build -t localhost/turnstone:latest "${REPO_DIR}" -# Create HF model cache dir if not present (persists across container rebuilds) -mkdir -p "${HF_CACHE_DIR}" - # Remove existing container if present (safe re-run) podman rm -f turnstone 2>/dev/null || true @@ -142,6 +158,9 @@ podman run -d \ -e TURNSTONE_MULTI_AGENT_DIAGNOSE="${TURNSTONE_MULTI_AGENT_DIAGNOSE:-false}" \ -e GPU_SERVER_URL="${GPU_SERVER_URL:-}" \ -e HF_HOME=/hf-cache \ + -e TURNSTONE_AUTO_INCIDENT="${TURNSTONE_AUTO_INCIDENT:-true}" \ + -e TURNSTONE_AUTO_INCIDENT_THRESHOLD="${TURNSTONE_AUTO_INCIDENT_THRESHOLD:-5}" \ + -e TURNSTONE_AUTO_INCIDENT_WINDOW="${TURNSTONE_AUTO_INCIDENT_WINDOW:-600}" \ -e TURNSTONE_CLASSIFIER_MODEL="${TURNSTONE_CLASSIFIER_MODEL:-byviz/bylastic_classification_logs}" \ -e TURNSTONE_EMBED_BACKEND="${TURNSTONE_EMBED_BACKEND:-sentence_transformers}" \ -e TURNSTONE_EMBED_MODEL="${TURNSTONE_EMBED_MODEL:-sentence-transformers/all-MiniLM-L6-v2}" \ diff --git a/tests/test_incident_detector.py b/tests/test_incident_detector.py new file mode 100644 index 0000000..c3a5e32 --- /dev/null +++ b/tests/test_incident_detector.py @@ -0,0 +1,238 @@ +"""Tests for app/tasks/incident_detector.py auto-incident detection.""" +from __future__ import annotations + +import sqlite3 +import tempfile +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest + +import sys +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from app.db import ensure_schema, ensure_incidents_schema +from app.services.incidents import create_incident, list_incidents +from app.tasks.incident_detector import ( + _find_clusters, + _incident_exists_for_cluster, + _parse_ts, + detect_and_create, +) + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _make_db(path: Path) -> None: + ensure_schema(path) + + +def _make_incidents_db(path: Path) -> None: + ensure_incidents_schema(path) + + +def _iso(base: datetime, offset_s: float) -> str: + return (base + timedelta(seconds=offset_s)).isoformat() + + +def _insert_entry(db: Path, source_id: str, ts_iso: str, severity: str, ingest_time: str) -> None: + with sqlite3.connect(db) as conn: + conn.execute( + "INSERT INTO log_entries (id, source_id, sequence, timestamp_iso, ingest_time, " + "severity, text, repeat_count, out_of_order, matched_patterns, tenant_id) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)", + ( + f"{source_id}-{ts_iso}", source_id, 0, ts_iso, ingest_time, + severity, "error text", 0, 0, "[]", "", + ), + ) + + +# ── _parse_ts ────────────────────────────────────────────────────────────────── + +class TestParseTs: + def test_parses_utc_iso(self) -> None: + ts = _parse_ts("2026-06-11T12:00:00+00:00") + assert ts is not None + assert ts > 0 + + def test_parses_z_suffix(self) -> None: + ts = _parse_ts("2026-06-11T12:00:00Z") + assert ts is not None + + def test_none_input(self) -> None: + assert _parse_ts(None) is None + + def test_invalid_input(self) -> None: + assert _parse_ts("not-a-date") is None + + +# ── _find_clusters ───────────────────────────────────────────────────────────── + +class TestFindClusters: + BASE = datetime(2026, 6, 11, 12, 0, 0, tzinfo=timezone.utc) + + def _events(self, offsets: list[float], severity: str = "ERROR") -> list[dict]: + return [{"timestamp_iso": _iso(self.BASE, o), "severity": severity} for o in offsets] + + def test_dense_cluster_detected(self) -> None: + events = self._events([0, 60, 120, 180, 240]) # 5 errors in 4 min + clusters = _find_clusters(events, window_s=600, threshold=5) + assert len(clusters) == 1 + + def test_sparse_events_no_cluster(self) -> None: + events = self._events([0, 300, 600, 900, 1200]) # 5 errors, each 5 min apart + clusters = _find_clusters(events, window_s=60, threshold=5) + assert clusters == [] + + def test_threshold_not_met(self) -> None: + events = self._events([0, 10, 20, 30]) # only 4 events + clusters = _find_clusters(events, window_s=600, threshold=5) + assert clusters == [] + + def test_critical_wins_over_error(self) -> None: + events = self._events([0, 10, 20, 30, 40], "ERROR") + events[2]["severity"] = "CRITICAL" + clusters = _find_clusters(events, window_s=600, threshold=5) + assert clusters[0][2] == "CRITICAL" + + def test_two_non_overlapping_clusters(self) -> None: + # Dense cluster at 0-4 min, then another at 60-64 min + e1 = self._events([0, 60, 120, 180, 240]) + e2 = self._events([3600, 3660, 3720, 3780, 3840]) + clusters = _find_clusters(e1 + e2, window_s=600, threshold=5) + assert len(clusters) == 2 + + def test_no_timestamp_events_skipped(self) -> None: + events = [{"timestamp_iso": None, "severity": "ERROR"}] * 10 + clusters = _find_clusters(events, window_s=600, threshold=5) + assert clusters == [] + + +# ── _incident_exists_for_cluster ─────────────────────────────────────────────── + +class TestIncidentExists: + BASE = datetime(2026, 6, 11, 12, 0, 0, tzinfo=timezone.utc) + + def test_no_existing_incidents(self, tmp_path: Path) -> None: + db = tmp_path / "inc.db" + _make_incidents_db(db) + assert not _incident_exists_for_cluster( + db, "nginx", _iso(self.BASE, 0), _iso(self.BASE, 600) + ) + + def test_exact_overlap_detected(self, tmp_path: Path) -> None: + db = tmp_path / "inc.db" + _make_incidents_db(db) + create_incident( + db, label="Auto: nginx — 5 errors", + issue_type="auto:nginx", + started_at=_iso(self.BASE, 0), + ended_at=_iso(self.BASE, 600), + severity="high", + ) + assert _incident_exists_for_cluster( + db, "nginx", _iso(self.BASE, 100), _iso(self.BASE, 400) + ) + + def test_different_source_not_matched(self, tmp_path: Path) -> None: + db = tmp_path / "inc.db" + _make_incidents_db(db) + create_incident( + db, label="Auto: caddy — 5 errors", + issue_type="auto:caddy", + started_at=_iso(self.BASE, 0), + ended_at=_iso(self.BASE, 600), + severity="high", + ) + assert not _incident_exists_for_cluster( + db, "nginx", _iso(self.BASE, 0), _iso(self.BASE, 600) + ) + + def test_non_overlapping_not_matched(self, tmp_path: Path) -> None: + db = tmp_path / "inc.db" + _make_incidents_db(db) + create_incident( + db, label="Auto: nginx — 5 errors", + issue_type="auto:nginx", + started_at=_iso(self.BASE, 0), + ended_at=_iso(self.BASE, 300), + severity="high", + ) + # Cluster starts after existing incident ends + assert not _incident_exists_for_cluster( + db, "nginx", _iso(self.BASE, 900), _iso(self.BASE, 1200) + ) + + +# ── detect_and_create ────────────────────────────────────────────────────────── + +class TestDetectAndCreate: + BASE = datetime(2026, 6, 11, 12, 0, 0, tzinfo=timezone.utc) + + def _setup(self, tmp_path: Path) -> tuple[Path, Path]: + db = tmp_path / "ts.db" + idb = tmp_path / "incidents.db" + _make_db(db) + _make_incidents_db(idb) + return db, idb + + def test_creates_incident_on_cluster(self, tmp_path: Path) -> None: + db, idb = self._setup(tmp_path) + ingest = _iso(self.BASE, -60) + for i in range(6): + _insert_entry(db, "nginx", _iso(self.BASE, i * 30), "ERROR", ingest) + + result = detect_and_create(db, idb, since=_iso(self.BASE, -120)) + assert result["created"] == 1 + incidents = list_incidents(idb) + assert len(incidents) == 1 + assert "nginx" in incidents[0].label + assert incidents[0].issue_type == "auto:nginx" + + def test_no_incident_below_threshold(self, tmp_path: Path) -> None: + db, idb = self._setup(tmp_path) + ingest = _iso(self.BASE, -60) + for i in range(4): # only 4 errors — below default threshold of 5 + _insert_entry(db, "nginx", _iso(self.BASE, i * 30), "ERROR", ingest) + + result = detect_and_create(db, idb, since=_iso(self.BASE, -120), threshold=5) + assert result["created"] == 0 + + def test_no_duplicate_incidents(self, tmp_path: Path) -> None: + db, idb = self._setup(tmp_path) + ingest = _iso(self.BASE, -60) + for i in range(6): + _insert_entry(db, "nginx", _iso(self.BASE, i * 30), "ERROR", ingest) + + detect_and_create(db, idb, since=_iso(self.BASE, -120)) + detect_and_create(db, idb, since=_iso(self.BASE, -120)) # second run + + incidents = list_incidents(idb) + assert len(incidents) == 1 + + def test_critical_severity_mapped_to_critical_label(self, tmp_path: Path) -> None: + db, idb = self._setup(tmp_path) + ingest = _iso(self.BASE, -60) + for i in range(6): + sev = "CRITICAL" if i == 0 else "ERROR" + _insert_entry(db, "sshd", _iso(self.BASE, i * 30), sev, ingest) + + detect_and_create(db, idb, since=_iso(self.BASE, -120)) + incidents = list_incidents(idb) + assert incidents[0].severity == "critical" + + def test_empty_db_returns_zero(self, tmp_path: Path) -> None: + db, idb = self._setup(tmp_path) + result = detect_and_create(db, idb, since=None) + assert result["created"] == 0 + + def test_independent_sources_each_get_incident(self, tmp_path: Path) -> None: + db, idb = self._setup(tmp_path) + ingest = _iso(self.BASE, -60) + for src in ["caddy", "nginx"]: + for i in range(6): + _insert_entry(db, src, _iso(self.BASE, i * 30), "ERROR", ingest) + + result = detect_and_create(db, idb, since=_iso(self.BASE, -120)) + assert result["created"] == 2