"""CRUD operations for user-tagged incidents.""" from __future__ import annotations import sqlite3 import uuid from pathlib import Path from app.ingest.base import now_iso from app.services.models import Incident from app.services.search import SearchResult, entries_in_window, search def _row_to_incident(row: sqlite3.Row) -> Incident: return Incident( id=row["id"], label=row["label"], started_at=row["started_at"], ended_at=row["ended_at"], notes=row["notes"], created_at=row["created_at"], severity=row["severity"], ) def create_incident( db_path: Path, label: str, started_at: str | None = None, ended_at: str | None = None, notes: str = "", severity: str = "medium", ) -> Incident: incident = Incident( id=str(uuid.uuid4()), label=label, started_at=started_at, ended_at=ended_at, notes=notes, created_at=now_iso(), severity=severity, ) conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.execute( "INSERT INTO incidents (id, label, started_at, ended_at, notes, created_at, severity) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (incident.id, incident.label, incident.started_at, incident.ended_at, incident.notes, incident.created_at, incident.severity), ) conn.commit() conn.close() return incident def list_incidents(db_path: Path) -> list[Incident]: conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT * FROM incidents ORDER BY created_at DESC" ).fetchall() conn.close() return [_row_to_incident(r) for r in rows] def get_incident(db_path: Path, incident_id: str) -> Incident | None: conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM incidents WHERE id = ?", (incident_id,) ).fetchone() conn.close() return _row_to_incident(row) if row else None def delete_incident(db_path: Path, incident_id: str) -> bool: conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") cur = conn.execute("DELETE FROM incidents WHERE id = ?", (incident_id,)) conn.commit() conn.close() return cur.rowcount > 0 def get_incident_entries( db_path: Path, incident: Incident, limit: int = 100, ) -> list[SearchResult]: """Return log entries associated with an incident's time window. Strategy: keyword search first (FTS, ranked by relevance), then fill remaining slots with a raw timestamp-window scan so the incident always shows *something* even when no keywords match. """ half = limit // 2 common: dict = dict(since=incident.started_at, until=incident.ended_at, limit=half) keyword_hits = search(db_path, query=incident.label, include_repeats=False, **common) error_hits = search(db_path, query=incident.label, severity="ERROR", include_repeats=False, **common) critical_hits = search(db_path, query=incident.label, severity="CRITICAL", include_repeats=False, **common) seen: set[str] = set() combined: list[SearchResult] = [] for entry in keyword_hits + critical_hits + error_hits: if entry.entry_id not in seen: seen.add(entry.entry_id) combined.append(entry) # Fallback: fill remaining slots from raw window scan (errors first, then all) if len(combined) < limit: for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, severity="ERROR", limit=half): if entry.entry_id not in seen: seen.add(entry.entry_id) combined.append(entry) if len(combined) < limit: for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, limit=limit - len(combined)): if entry.entry_id not in seen: seen.add(entry.entry_id) combined.append(entry) combined.sort(key=lambda e: (e.timestamp_iso or "\xff", e.sequence)) return combined[:limit]