"""Post-glean automatic incident detection. After each batch glean, scan entries ingested since the last run for ERROR/CRITICAL clusters. If a source produces >= threshold errors within window_s seconds, auto-create an incident unless one already exists for that source in that time window. Environment variables (all optional): TURNSTONE_AUTO_INCIDENT_THRESHOLD integer, default 5 TURNSTONE_AUTO_INCIDENT_WINDOW seconds, default 600 (10 min) """ from __future__ import annotations import asyncio import logging import os from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from app.db import get_conn, resolve_tenant_id from app.services.incidents import create_incident logger = logging.getLogger(__name__) _THRESHOLD = int(os.environ.get("TURNSTONE_AUTO_INCIDENT_THRESHOLD", "5")) _WINDOW_S = int(os.environ.get("TURNSTONE_AUTO_INCIDENT_WINDOW", "600")) # Severity rank — used to pick the cluster's worst severity _SEV_RANK = {"CRITICAL": 3, "ERROR": 2, "WARN": 1, "INFO": 0, "DEBUG": 0} def _query_recent_errors(db_path: Path, since: str | None) -> list[dict]: tid = resolve_tenant_id() with get_conn(db_path) as conn: if since: rows = conn.execute( """ SELECT source_id, timestamp_iso, severity FROM log_entries WHERE severity IN ('ERROR', 'CRITICAL') AND ingest_time > ? AND (tenant_id = ? OR tenant_id = '') ORDER BY source_id, timestamp_iso ASC """, (since, tid), ).fetchall() else: rows = conn.execute( """ SELECT source_id, timestamp_iso, severity FROM log_entries WHERE severity IN ('ERROR', 'CRITICAL') AND (tenant_id = ? OR tenant_id = '') ORDER BY source_id, timestamp_iso ASC LIMIT 10000 """, (tid,), ).fetchall() return [dict(r) for r in rows] def _parse_ts(iso: str | None) -> float | None: """Parse ISO timestamp to epoch seconds; return None on failure.""" if not iso: return None try: dt = datetime.fromisoformat(iso.replace("Z", "+00:00")) return dt.timestamp() except (ValueError, TypeError): return None def _find_clusters( events: list[dict], window_s: int, threshold: int ) -> list[tuple[str, str, str]]: """Return (started_at_iso, ended_at_iso, worst_severity) for each cluster.""" # Filter to events with parseable timestamps, sorted ascending timed = [] for e in events: t = _parse_ts(e["timestamp_iso"]) if t is not None: timed.append((t, e["timestamp_iso"], e["severity"])) timed.sort() clusters: list[tuple[str, str, str]] = [] i = 0 while i < len(timed): j = i while j < len(timed) and timed[j][0] - timed[i][0] <= window_s: j += 1 count = j - i if count >= threshold: worst = max((timed[k][2] for k in range(i, j)), key=lambda s: _SEV_RANK.get(s, 0)) clusters.append((timed[i][1], timed[j - 1][1], worst)) i = j # skip past the cluster to avoid overlap else: i += 1 return clusters def _incident_exists_for_cluster( incidents_db_path: Path, source_id: str, started_at: str, ended_at: str ) -> bool: """Return True if an auto-incident for this source already covers the window.""" issue_type = f"auto:{source_id}" start_ts = _parse_ts(started_at) end_ts = _parse_ts(ended_at) if start_ts is None or end_ts is None: return False tid = resolve_tenant_id() with get_conn(incidents_db_path) as conn: rows = conn.execute( """ SELECT started_at, ended_at FROM incidents WHERE issue_type = ? AND (tenant_id = ? OR tenant_id = '') """, (issue_type, tid), ).fetchall() for row in rows: ex_start = _parse_ts(row["started_at"]) ex_end = _parse_ts(row["ended_at"]) if ex_start is None or ex_end is None: continue # Overlap check: two intervals [a,b] and [c,d] overlap when a<=d and b>=c if ex_start <= end_ts and ex_end >= start_ts: return True return False def detect_and_create( db_path: Path, incidents_db_path: Path, since: str | None, threshold: int = _THRESHOLD, window_s: int = _WINDOW_S, ) -> dict[str, int]: """Detect error clusters and create incidents. Returns {"created": N}.""" entries = _query_recent_errors(db_path, since) if not entries: return {"created": 0} by_source: dict[str, list[dict]] = defaultdict(list) for e in entries: by_source[e["source_id"]].append(e) created = 0 for source_id, events in by_source.items(): clusters = _find_clusters(events, window_s, threshold) for started_at, ended_at, worst_sev in clusters: if _incident_exists_for_cluster(incidents_db_path, source_id, started_at, ended_at): continue n = len(events) # event count for this source in the glean window sev_label = "critical" if worst_sev == "CRITICAL" else "high" create_incident( incidents_db_path, label=f"Auto: {source_id} — {n} errors", issue_type=f"auto:{source_id}", started_at=started_at, ended_at=ended_at, notes="Auto-detected error cluster. Review and label as needed.", severity=sev_label, ) logger.info( "Auto-incident created: source=%s window=[%s, %s] severity=%s", source_id, started_at, ended_at, sev_label, ) created += 1 if created: logger.info("Incident detector: %d new incident(s) created", created) return {"created": created} async def run_once( db_path: Path, incidents_db_path: Path, since: str | None, threshold: int = _THRESHOLD, window_s: int = _WINDOW_S, ) -> dict[str, int]: """Async wrapper — runs detection in a thread to avoid blocking the event loop.""" loop = asyncio.get_running_loop() return await loop.run_in_executor( None, lambda: detect_and_create(db_path, incidents_db_path, since, threshold, window_s), )