turnstone/app/rest.py
pyr0ball f9ab4e5bb0 feat: incident tagging — DB schema, CRUD service, REST API (#1)
- Add `incidents` table to SQLite schema (id, label, started_at, ended_at,
  notes, created_at, severity)
- Extract `ensure_schema()` from ingest pipeline so tables are always
  created at startup, not only during ingest
- New `app/services/incidents.py`: create/list/get/delete + time-window
  entry association (FTS keyword search + raw window fallback)
- New `entries_in_window()` in search.py: plain SQL scan for incident
  detail when keyword FTS returns nothing
- REST endpoints: POST/GET /api/incidents, GET/DELETE /api/incidents/{id}
- Incident detail returns up to 100 associated log entries sorted by
  timestamp, prioritising FTS keyword hits then ERROR/CRITICAL then all
2026-05-09 15:37:14 -07:00

216 lines
7.7 KiB
Python

"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix.
All routes (API + static files) are mounted at /turnstone so the app works
identically whether accessed directly (http://host:8534/turnstone/) or through
Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
"""
from __future__ import annotations
import dataclasses
import os
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema
from app.services.incidents import (
create_incident,
delete_incident,
get_incident,
get_incident_entries,
list_incidents,
)
from app.services.search import search as _search, list_sources as _list_sources, format_results
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["*"],
)
@app.on_event("startup")
def _startup() -> None:
ensure_schema(DB_PATH)
class IncidentCreate(BaseModel):
label: str
started_at: str | None = None
ended_at: str | None = None
notes: str = ""
severity: str = "medium"
# Serve built Vue assets at the path Vite embeds in index.html.
if (DIST_DIR / "assets").exists():
app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets")
# API router — all routes accessible at /turnstone/api/* and /turnstone/health.
router = APIRouter(prefix="/turnstone")
@router.get("/health")
def health() -> dict:
return {"status": "ok", "db": str(DB_PATH)}
@router.get("/api/search")
def search_logs(
q: Annotated[str, Query(description="Search query")] = "",
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
limit: Annotated[int, Query(ge=1, le=500)] = 50,
) -> dict:
if not q:
return {"count": 0, "results": []}
results = _search(
DB_PATH,
query=q,
source_filter=source,
severity=severity,
since=since,
until=until,
limit=limit,
)
return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]}
@router.get("/api/diagnose")
def diagnose(
q: Annotated[str, Query(description="Service name or problem description")] = "",
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
) -> dict:
if not q:
return {"count": 0, "results": [], "formatted": ""}
# Auto-detect source hints: if a query token matches part of a known source_id,
# use that token as the source_filter so all matching sources (e.g. all
# rotated plex logs) are included — not just the first matched rotation.
detected_source = source
if not detected_source:
known_sources = [s["source_id"] for s in _list_sources(DB_PATH)]
q_lower = q.lower()
for src in known_sources:
parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()]
for p in parts:
if len(p) > 3 and p in q_lower:
detected_source = p # use matched token, not full source_id
break
if detected_source:
break
common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False)
# Broad pass uses OR so any symptom keyword surfaces evidence
broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common)
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
# When a source was auto-detected, also pull its most recent errors unconstrained —
# the user named a service, so show what's actually broken there even if their
# symptom keywords don't appear literally in the error text.
source_errors: list = []
if detected_source and not source and not errors:
source_errors = _search(
DB_PATH, query="error warning fail", severity="ERROR",
limit=10, or_mode=True,
source_filter=detected_source, since=since, until=until, include_repeats=False,
)
if not source_errors:
source_errors = _search(
DB_PATH, query="error warning fail", severity="CRITICAL",
limit=5, or_mode=True,
source_filter=detected_source, since=since, until=until, include_repeats=False,
)
seen: set[str] = set()
combined = []
for r in broad + critical + errors + source_errors:
if r.entry_id not in seen:
seen.add(r.entry_id)
combined.append(r)
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
combined = combined[:20]
return {
"count": len(combined),
"results": [dataclasses.asdict(r) for r in combined],
"formatted": format_results(combined),
}
@router.get("/api/sources")
def list_sources() -> dict:
return {"sources": _list_sources(DB_PATH)}
@router.post("/api/incidents")
def create_incident_endpoint(body: IncidentCreate) -> dict:
incident = create_incident(
DB_PATH,
label=body.label,
started_at=body.started_at,
ended_at=body.ended_at,
notes=body.notes,
severity=body.severity,
)
return dataclasses.asdict(incident)
@router.get("/api/incidents")
def list_incidents_endpoint() -> dict:
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
@router.get("/api/incidents/{incident_id}")
def get_incident_endpoint(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
entries = get_incident_entries(DB_PATH, incident)
return {
**dataclasses.asdict(incident),
"entries": [dataclasses.asdict(e) for e in entries],
}
@router.delete("/api/incidents/{incident_id}")
def delete_incident_endpoint(incident_id: str) -> dict:
if not delete_incident(DB_PATH, incident_id):
raise HTTPException(status_code=404, detail="Incident not found")
return {"deleted": incident_id}
app.include_router(router)
# Root redirect → /turnstone/
@app.get("/")
def root_redirect() -> RedirectResponse:
return RedirectResponse(url="/turnstone/")
# SPA catch-all — serves index.html for any /turnstone/* path that isn't a
# static asset or API route. Must be registered after include_router.
@app.get("/turnstone/{path:path}")
def spa_fallback(path: str) -> FileResponse:
if DIST_DIR.exists():
candidate = DIST_DIR / path
if candidate.is_file():
return FileResponse(str(candidate))
return FileResponse(str(DIST_DIR / "index.html"))
return FileResponse("/dev/null", status_code=503)