diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py index f85dabe..6972265 100644 --- a/app/ingest/pipeline.py +++ b/app/ingest/pipeline.py @@ -33,9 +33,29 @@ CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); + +CREATE TABLE IF NOT EXISTS incidents ( + id TEXT PRIMARY KEY, + label TEXT NOT NULL, + started_at TEXT, + ended_at TEXT, + notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'medium' +); +CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); """ +def ensure_schema(db_path: Path) -> None: + """Create all tables if they don't exist. Safe to call on every startup.""" + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(_SCHEMA) + conn.commit() + conn.close() + + def _detect_format(first_line: str) -> str: try: obj = json.loads(first_line) diff --git a/app/rest.py b/app/rest.py index db86988..567b539 100644 --- a/app/rest.py +++ b/app/rest.py @@ -11,11 +11,20 @@ import os from pathlib import Path from typing import Annotated -from fastapi import APIRouter, FastAPI, Query +from fastapi import APIRouter, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel +from app.ingest.pipeline import ensure_schema +from app.services.incidents import ( + create_incident, + delete_incident, + get_incident, + get_incident_entries, + list_incidents, +) from app.services.search import search as _search, list_sources as _list_sources, format_results DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) @@ -26,10 +35,23 @@ app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs" app.add_middleware( CORSMiddleware, allow_origins=["*"], - allow_methods=["GET", "POST"], + allow_methods=["GET", "POST", "DELETE"], allow_headers=["*"], ) + +@app.on_event("startup") +def _startup() -> None: + ensure_schema(DB_PATH) + + +class IncidentCreate(BaseModel): + label: str + started_at: str | None = None + ended_at: str | None = None + notes: str = "" + severity: str = "medium" + # Serve built Vue assets at the path Vite embeds in index.html. if (DIST_DIR / "assets").exists(): app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets") @@ -75,14 +97,49 @@ def diagnose( ) -> dict: if not q: return {"count": 0, "results": [], "formatted": ""} - common: dict = dict(source_filter=source, since=since, until=until, include_repeats=False) - broad = _search(DB_PATH, query=q, limit=15, **common) + + # Auto-detect source hints: if a query token matches part of a known source_id, + # use that token as the source_filter so all matching sources (e.g. all + # rotated plex logs) are included — not just the first matched rotation. + detected_source = source + if not detected_source: + known_sources = [s["source_id"] for s in _list_sources(DB_PATH)] + q_lower = q.lower() + for src in known_sources: + parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()] + for p in parts: + if len(p) > 3 and p in q_lower: + detected_source = p # use matched token, not full source_id + break + if detected_source: + break + + common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False) + # Broad pass uses OR so any symptom keyword surfaces evidence + broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common) critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common) errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common) + # When a source was auto-detected, also pull its most recent errors unconstrained — + # the user named a service, so show what's actually broken there even if their + # symptom keywords don't appear literally in the error text. + source_errors: list = [] + if detected_source and not source and not errors: + source_errors = _search( + DB_PATH, query="error warning fail", severity="ERROR", + limit=10, or_mode=True, + source_filter=detected_source, since=since, until=until, include_repeats=False, + ) + if not source_errors: + source_errors = _search( + DB_PATH, query="error warning fail", severity="CRITICAL", + limit=5, or_mode=True, + source_filter=detected_source, since=since, until=until, include_repeats=False, + ) + seen: set[str] = set() combined = [] - for r in broad + critical + errors: + for r in broad + critical + errors + source_errors: if r.entry_id not in seen: seen.add(r.entry_id) combined.append(r) @@ -101,6 +158,43 @@ def list_sources() -> dict: return {"sources": _list_sources(DB_PATH)} +@router.post("/api/incidents") +def create_incident_endpoint(body: IncidentCreate) -> dict: + incident = create_incident( + DB_PATH, + label=body.label, + started_at=body.started_at, + ended_at=body.ended_at, + notes=body.notes, + severity=body.severity, + ) + return dataclasses.asdict(incident) + + +@router.get("/api/incidents") +def list_incidents_endpoint() -> dict: + return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]} + + +@router.get("/api/incidents/{incident_id}") +def get_incident_endpoint(incident_id: str) -> dict: + incident = get_incident(DB_PATH, incident_id) + if not incident: + raise HTTPException(status_code=404, detail="Incident not found") + entries = get_incident_entries(DB_PATH, incident) + return { + **dataclasses.asdict(incident), + "entries": [dataclasses.asdict(e) for e in entries], + } + + +@router.delete("/api/incidents/{incident_id}") +def delete_incident_endpoint(incident_id: str) -> dict: + if not delete_incident(DB_PATH, incident_id): + raise HTTPException(status_code=404, detail="Incident not found") + return {"deleted": incident_id} + + app.include_router(router) diff --git a/app/services/incidents.py b/app/services/incidents.py new file mode 100644 index 0000000..7d1df8a --- /dev/null +++ b/app/services/incidents.py @@ -0,0 +1,125 @@ +"""CRUD operations for user-tagged incidents.""" +from __future__ import annotations + +import sqlite3 +import uuid +from pathlib import Path + +from app.ingest.base import now_iso +from app.services.models import Incident +from app.services.search import SearchResult, entries_in_window, search + + +def _row_to_incident(row: sqlite3.Row) -> Incident: + return Incident( + id=row["id"], + label=row["label"], + started_at=row["started_at"], + ended_at=row["ended_at"], + notes=row["notes"], + created_at=row["created_at"], + severity=row["severity"], + ) + + +def create_incident( + db_path: Path, + label: str, + started_at: str | None = None, + ended_at: str | None = None, + notes: str = "", + severity: str = "medium", +) -> Incident: + incident = Incident( + id=str(uuid.uuid4()), + label=label, + started_at=started_at, + ended_at=ended_at, + notes=notes, + created_at=now_iso(), + severity=severity, + ) + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + "INSERT INTO incidents (id, label, started_at, ended_at, notes, created_at, severity) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (incident.id, incident.label, incident.started_at, incident.ended_at, + incident.notes, incident.created_at, incident.severity), + ) + conn.commit() + conn.close() + return incident + + +def list_incidents(db_path: Path) -> list[Incident]: + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT * FROM incidents ORDER BY created_at DESC" + ).fetchall() + conn.close() + return [_row_to_incident(r) for r in rows] + + +def get_incident(db_path: Path, incident_id: str) -> Incident | None: + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + row = conn.execute( + "SELECT * FROM incidents WHERE id = ?", (incident_id,) + ).fetchone() + conn.close() + return _row_to_incident(row) if row else None + + +def delete_incident(db_path: Path, incident_id: str) -> bool: + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + cur = conn.execute("DELETE FROM incidents WHERE id = ?", (incident_id,)) + conn.commit() + conn.close() + return cur.rowcount > 0 + + +def get_incident_entries( + db_path: Path, + incident: Incident, + limit: int = 100, +) -> list[SearchResult]: + """Return log entries associated with an incident's time window. + + Strategy: keyword search first (FTS, ranked by relevance), then fill + remaining slots with a raw timestamp-window scan so the incident always + shows *something* even when no keywords match. + """ + half = limit // 2 + common: dict = dict(since=incident.started_at, until=incident.ended_at, limit=half) + + keyword_hits = search(db_path, query=incident.label, include_repeats=False, **common) + error_hits = search(db_path, query=incident.label, severity="ERROR", include_repeats=False, **common) + critical_hits = search(db_path, query=incident.label, severity="CRITICAL", include_repeats=False, **common) + + seen: set[str] = set() + combined: list[SearchResult] = [] + for entry in keyword_hits + critical_hits + error_hits: + if entry.entry_id not in seen: + seen.add(entry.entry_id) + combined.append(entry) + + # Fallback: fill remaining slots from raw window scan (errors first, then all) + if len(combined) < limit: + for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, severity="ERROR", limit=half): + if entry.entry_id not in seen: + seen.add(entry.entry_id) + combined.append(entry) + + if len(combined) < limit: + for entry in entries_in_window(db_path, incident.started_at, incident.ended_at, limit=limit - len(combined)): + if entry.entry_id not in seen: + seen.add(entry.entry_id) + combined.append(entry) + + combined.sort(key=lambda e: (e.timestamp_iso or "\xff", e.sequence)) + return combined[:limit] diff --git a/app/services/models.py b/app/services/models.py index 89eb455..2ec1a8d 100644 --- a/app/services/models.py +++ b/app/services/models.py @@ -31,3 +31,16 @@ class LogPattern: pattern: str # regex string severity: str # suggested severity if not present in log line description: str # human-readable explanation for the UI + + +@dataclass(frozen=True) +class Incident: + """A user-tagged time window marking a known event or failure.""" + + id: str # UUID + label: str # free-text description ("plex crash", "audio broken") + started_at: str | None # ISO timestamp; None = open-ended start + ended_at: str | None # ISO timestamp; None = open-ended end + notes: str # additional context + created_at: str # wall-clock when this was tagged + severity: str # user-assigned: low / medium / high / critical diff --git a/app/services/search.py b/app/services/search.py index 0b3772e..6971934 100644 --- a/app/services/search.py +++ b/app/services/search.py @@ -3,6 +3,7 @@ from __future__ import annotations import json import logging +import re import sqlite3 from dataclasses import dataclass from pathlib import Path @@ -71,6 +72,19 @@ def build_fts_index(db_path: Path) -> None: conn.close() +def _sanitize_fts_query(raw: str, or_mode: bool = False) -> str: + """Strip FTS5 operator characters and return a safe MATCH expression. + + FTS5 reserves: " * + - ( ) ^ ~ : ? + or_mode=True joins tokens with OR (any-of) instead of implicit AND (all-of). + """ + cleaned = re.sub(r"[^a-zA-Z0-9 _]", " ", raw) + tokens = cleaned.split() + if not tokens: + return '""' + return (" OR " if or_mode else " ").join(tokens) + + def search( db_path: Path, query: str, @@ -81,14 +95,16 @@ def search( until: str | None = None, limit: int = 20, include_repeats: bool = False, + or_mode: bool = False, ) -> list[SearchResult]: """Full-text search with optional filters. Returns results ranked by relevance.""" conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row + fts_query = _sanitize_fts_query(query, or_mode=or_mode) conditions = ["log_fts MATCH ?"] - params: list = [query] + params: list = [fts_query] if severity: conditions.append("severity = ?") @@ -147,6 +163,68 @@ def search( return results +def entries_in_window( + db_path: Path, + since: str | None, + until: str | None, + severity: str | None = None, + limit: int = 100, +) -> list[SearchResult]: + """Return log entries within a time window using a plain SQL scan (no FTS). + + Used as a fallback when keyword search returns nothing — ensures incident + detail always shows the raw log activity in the window even if no keywords match. + """ + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + + conditions: list[str] = ["repeat_count = 1"] + params: list = [] + + if since: + conditions.append("timestamp_iso >= ?") + params.append(since) + if until: + conditions.append("timestamp_iso <= ?") + params.append(until) + if severity: + conditions.append("severity = ?") + params.append(severity.upper()) + + where = " AND ".join(conditions) + params.append(limit) + + rows = conn.execute( + f""" + SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, + repeat_count, out_of_order, matched_patterns, text, 0.0 as rank + FROM log_entries + WHERE {where} + ORDER BY timestamp_iso ASC + LIMIT ? + """, + params, + ).fetchall() + conn.close() + + return [ + SearchResult( + entry_id=r["entry_id"], + source_id=r["source_id"], + sequence=r["sequence"], + timestamp_iso=r["timestamp_iso"], + severity=r["severity"], + repeat_count=r["repeat_count"], + out_of_order=bool(r["out_of_order"]), + matched_patterns=json.loads(r["matched_patterns"] or "[]"), + text=r["text"], + rank=r["rank"], + ) + for r in rows + ] + + def list_sources(db_path: Path) -> list[dict]: """Return distinct sources with entry counts and time ranges.""" conn = sqlite3.connect(str(db_path))