Auto-incident detector:
- New app/tasks/incident_detector.py: post-glean error cluster detector
- Sliding window algorithm: source + N errors within window_s seconds
- Deduplication via issue_type='auto:{source_id}' + interval overlap check
- Respects TURNSTONE_AUTO_INCIDENT_THRESHOLD (default 5) and
TURNSTONE_AUTO_INCIDENT_WINDOW (default 600s) env vars
- 20 tests all passing
- Wired into glean_scheduler.run_once() and scheduler_loop()
- TURNSTONE_AUTO_INCIDENT env var to disable (default enabled)
Podman standalone improvements:
- REPO_DIR auto-detected from script location (no longer hardcoded to /opt/turnstone)
- DATA_DIR/PATTERNS_DIR/HF_CACHE_DIR configurable via env vars
- Bootstrap step copies host-specific sources-<hostname>.yaml on first run
- Auto-incident env vars passed through
example-node sources:
- patterns/sources-example-node.yaml: Sonarr, Radarr, Bazarr, Prowlarr,
Tautulli, autoscan, organizr, nextcloud, journal export
188 lines
6.4 KiB
Python
188 lines
6.4 KiB
Python
"""Post-glean automatic incident detection.
|
|
|
|
After each batch glean, scan entries ingested since the last run for
|
|
ERROR/CRITICAL clusters. If a source produces >= threshold errors within
|
|
window_s seconds, auto-create an incident unless one already exists for
|
|
that source in that time window.
|
|
|
|
Environment variables (all optional):
|
|
TURNSTONE_AUTO_INCIDENT_THRESHOLD integer, default 5
|
|
TURNSTONE_AUTO_INCIDENT_WINDOW seconds, default 600 (10 min)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from app.db import get_conn, resolve_tenant_id
|
|
from app.services.incidents import create_incident
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_THRESHOLD = int(os.environ.get("TURNSTONE_AUTO_INCIDENT_THRESHOLD", "5"))
|
|
_WINDOW_S = int(os.environ.get("TURNSTONE_AUTO_INCIDENT_WINDOW", "600"))
|
|
|
|
# Severity rank — used to pick the cluster's worst severity
|
|
_SEV_RANK = {"CRITICAL": 3, "ERROR": 2, "WARN": 1, "INFO": 0, "DEBUG": 0}
|
|
|
|
|
|
def _query_recent_errors(db_path: Path, since: str | None) -> list[dict]:
|
|
tid = resolve_tenant_id()
|
|
with get_conn(db_path) as conn:
|
|
if since:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT source_id, timestamp_iso, severity
|
|
FROM log_entries
|
|
WHERE severity IN ('ERROR', 'CRITICAL')
|
|
AND ingest_time > ?
|
|
AND (tenant_id = ? OR tenant_id = '')
|
|
ORDER BY source_id, timestamp_iso ASC
|
|
""",
|
|
(since, tid),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT source_id, timestamp_iso, severity
|
|
FROM log_entries
|
|
WHERE severity IN ('ERROR', 'CRITICAL')
|
|
AND (tenant_id = ? OR tenant_id = '')
|
|
ORDER BY source_id, timestamp_iso ASC
|
|
LIMIT 10000
|
|
""",
|
|
(tid,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def _parse_ts(iso: str | None) -> float | None:
|
|
"""Parse ISO timestamp to epoch seconds; return None on failure."""
|
|
if not iso:
|
|
return None
|
|
try:
|
|
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
|
|
return dt.timestamp()
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _find_clusters(
|
|
events: list[dict], window_s: int, threshold: int
|
|
) -> list[tuple[str, str, str]]:
|
|
"""Return (started_at_iso, ended_at_iso, worst_severity) for each cluster."""
|
|
# Filter to events with parseable timestamps, sorted ascending
|
|
timed = []
|
|
for e in events:
|
|
t = _parse_ts(e["timestamp_iso"])
|
|
if t is not None:
|
|
timed.append((t, e["timestamp_iso"], e["severity"]))
|
|
timed.sort()
|
|
|
|
clusters: list[tuple[str, str, str]] = []
|
|
i = 0
|
|
while i < len(timed):
|
|
j = i
|
|
while j < len(timed) and timed[j][0] - timed[i][0] <= window_s:
|
|
j += 1
|
|
count = j - i
|
|
if count >= threshold:
|
|
worst = max((timed[k][2] for k in range(i, j)), key=lambda s: _SEV_RANK.get(s, 0))
|
|
clusters.append((timed[i][1], timed[j - 1][1], worst))
|
|
i = j # skip past the cluster to avoid overlap
|
|
else:
|
|
i += 1
|
|
return clusters
|
|
|
|
|
|
def _incident_exists_for_cluster(
|
|
incidents_db_path: Path, source_id: str, started_at: str, ended_at: str
|
|
) -> bool:
|
|
"""Return True if an auto-incident for this source already covers the window."""
|
|
issue_type = f"auto:{source_id}"
|
|
start_ts = _parse_ts(started_at)
|
|
end_ts = _parse_ts(ended_at)
|
|
if start_ts is None or end_ts is None:
|
|
return False
|
|
tid = resolve_tenant_id()
|
|
with get_conn(incidents_db_path) as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT started_at, ended_at FROM incidents
|
|
WHERE issue_type = ?
|
|
AND (tenant_id = ? OR tenant_id = '')
|
|
""",
|
|
(issue_type, tid),
|
|
).fetchall()
|
|
for row in rows:
|
|
ex_start = _parse_ts(row["started_at"])
|
|
ex_end = _parse_ts(row["ended_at"])
|
|
if ex_start is None or ex_end is None:
|
|
continue
|
|
# Overlap check: two intervals [a,b] and [c,d] overlap when a<=d and b>=c
|
|
if ex_start <= end_ts and ex_end >= start_ts:
|
|
return True
|
|
return False
|
|
|
|
|
|
def detect_and_create(
|
|
db_path: Path,
|
|
incidents_db_path: Path,
|
|
since: str | None,
|
|
threshold: int = _THRESHOLD,
|
|
window_s: int = _WINDOW_S,
|
|
) -> dict[str, int]:
|
|
"""Detect error clusters and create incidents. Returns {"created": N}."""
|
|
entries = _query_recent_errors(db_path, since)
|
|
if not entries:
|
|
return {"created": 0}
|
|
|
|
by_source: dict[str, list[dict]] = defaultdict(list)
|
|
for e in entries:
|
|
by_source[e["source_id"]].append(e)
|
|
|
|
created = 0
|
|
for source_id, events in by_source.items():
|
|
clusters = _find_clusters(events, window_s, threshold)
|
|
for started_at, ended_at, worst_sev in clusters:
|
|
if _incident_exists_for_cluster(incidents_db_path, source_id, started_at, ended_at):
|
|
continue
|
|
n = len(events) # event count for this source in the glean window
|
|
sev_label = "critical" if worst_sev == "CRITICAL" else "high"
|
|
create_incident(
|
|
incidents_db_path,
|
|
label=f"Auto: {source_id} — {n} errors",
|
|
issue_type=f"auto:{source_id}",
|
|
started_at=started_at,
|
|
ended_at=ended_at,
|
|
notes="Auto-detected error cluster. Review and label as needed.",
|
|
severity=sev_label,
|
|
)
|
|
logger.info(
|
|
"Auto-incident created: source=%s window=[%s, %s] severity=%s",
|
|
source_id, started_at, ended_at, sev_label,
|
|
)
|
|
created += 1
|
|
|
|
if created:
|
|
logger.info("Incident detector: %d new incident(s) created", created)
|
|
return {"created": created}
|
|
|
|
|
|
async def run_once(
|
|
db_path: Path,
|
|
incidents_db_path: Path,
|
|
since: str | None,
|
|
threshold: int = _THRESHOLD,
|
|
window_s: int = _WINDOW_S,
|
|
) -> dict[str, int]:
|
|
"""Async wrapper — runs detection in a thread to avoid blocking the event loop."""
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(
|
|
None,
|
|
lambda: detect_and_create(db_path, incidents_db_path, since, threshold, window_s),
|
|
)
|