turnstone/app/services/incidents.py
pyr0ball f9ab4e5bb0 feat: incident tagging — DB schema, CRUD service, REST API (#1)
- Add `incidents` table to SQLite schema (id, label, started_at, ended_at,
  notes, created_at, severity)
- Extract `ensure_schema()` from ingest pipeline so tables are always
  created at startup, not only during ingest
- New `app/services/incidents.py`: create/list/get/delete + time-window
  entry association (FTS keyword search + raw window fallback)
- New `entries_in_window()` in search.py: plain SQL scan for incident
  detail when keyword FTS returns nothing
- REST endpoints: POST/GET /api/incidents, GET/DELETE /api/incidents/{id}
- Incident detail returns up to 100 associated log entries sorted by
  timestamp, prioritising FTS keyword hits then ERROR/CRITICAL then all
2026-05-09 15:37:14 -07:00

125 lines
4.1 KiB
Python

"""CRUD operations for user-tagged incidents."""
from __future__ import annotations
import sqlite3
import uuid
from pathlib import Path
from app.ingest.base import now_iso
from app.services.models import Incident
from app.services.search import SearchResult, entries_in_window, search
def _row_to_incident(row: sqlite3.Row) -> Incident:
return Incident(
id=row["id"],
label=row["label"],
started_at=row["started_at"],
ended_at=row["ended_at"],
notes=row["notes"],
created_at=row["created_at"],
severity=row["severity"],
)
def create_incident(
db_path: Path,
label: str,
started_at: str | None = None,
ended_at: str | None = None,
notes: str = "",
severity: str = "medium",
) -> Incident:
incident = Incident(
id=str(uuid.uuid4()),
label=label,
started_at=started_at,
ended_at=ended_at,
notes=notes,
created_at=now_iso(),
severity=severity,
)
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(
"INSERT INTO incidents (id, label, started_at, ended_at, notes, created_at, severity) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(incident.id, incident.label, incident.started_at, incident.ended_at,
incident.notes, incident.created_at, incident.severity),
)
conn.commit()
conn.close()
return incident
def list_incidents(db_path: Path) -> list[Incident]:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM incidents ORDER BY created_at DESC"
).fetchall()
conn.close()
return [_row_to_incident(r) for r in rows]
def get_incident(db_path: Path, incident_id: str) -> Incident | None:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM incidents WHERE id = ?", (incident_id,)
).fetchone()
conn.close()
return _row_to_incident(row) if row else None
def delete_incident(db_path: Path, incident_id: str) -> bool:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
cur = conn.execute("DELETE FROM incidents WHERE id = ?", (incident_id,))
conn.commit()
conn.close()
return cur.rowcount > 0
def get_incident_entries(
db_path: Path,
incident: Incident,
limit: int = 100,
) -> list[SearchResult]:
"""Return log entries associated with an incident's time window.
Strategy: keyword search first (FTS, ranked by relevance), then fill
remaining slots with a raw timestamp-window scan so the incident always
shows *something* even when no keywords match.
"""
half = limit // 2
common: dict = dict(since=incident.started_at, until=incident.ended_at, limit=half)
keyword_hits = search(db_path, query=incident.label, include_repeats=False, **common)
error_hits = search(db_path, query=incident.label, severity="ERROR", include_repeats=False, **common)
critical_hits = search(db_path, query=incident.label, severity="CRITICAL", include_repeats=False, **common)
seen: set[str] = set()
combined: list[SearchResult] = []
for entry in keyword_hits + critical_hits + error_hits:
if entry.entry_id not in seen:
seen.add(entry.entry_id)
combined.append(entry)
# Fallback: fill remaining slots from raw window scan (errors first, then all)
if len(combined) < limit:
for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, severity="ERROR", limit=half):
if entry.entry_id not in seen:
seen.add(entry.entry_id)
combined.append(entry)
if len(combined) < limit:
for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, limit=limit - len(combined)):
if entry.entry_id not in seen:
seen.add(entry.entry_id)
combined.append(entry)
combined.sort(key=lambda e: (e.timestamp_iso or "\xff", e.sequence))
return combined[:limit]