turnstone/app/rest.py
pyr0ball 457b4fd7ae feat: incident labeling, bundle export, and push/receive flow
Turnstone incidents now carry an issue_type tag (free-text with datalist
suggestions) used to categorize patterns for signature building.

Backend:
- Incident model gains issue_type; additive ALTER TABLE migration keeps
  existing DBs working without a full schema rebuild
- New received_bundles table stores incoming JSON bundles with indexes on
  bundled_at and issue_type
- build_bundle() assembles incident + related log entries into a versioned
  bundle dict; store_bundle()/list_bundles()/get_bundle() for the receiver
- POST /api/incidents/{id}/send — pushes bundle to TURNSTONE_BUNDLE_ENDPOINT
- GET  /api/incidents/{id}/bundle — export without sending
- POST /api/bundles — receive and store an incoming bundle
- GET  /api/bundles — list all received bundles
- TURNSTONE_SOURCE_HOST and TURNSTONE_BUNDLE_ENDPOINT env vars; auto-set
  source host from hostname in podman-standalone.sh

Frontend:
- Incidents form: issue_type field with datalist suggestions; Type column
  in the table; Send Bundle button + status feedback in the detail drawer
- New BundlesView: collapsible bundle rows, inline JSON parse (no extra
  round-trip), Export JSON download button
- Router and nav updated with /bundles route
2026-05-11 05:23:55 -07:00

290 lines
10 KiB
Python

"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix.
All routes (API + static files) are mounted at /turnstone so the app works
identically whether accessed directly (http://host:8534/turnstone/) or through
Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
"""
from __future__ import annotations
import dataclasses
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema
from app.services.incidents import (
build_bundle,
create_incident,
delete_incident,
get_bundle,
get_incident,
get_incident_entries,
list_bundles,
list_incidents,
store_bundle,
)
from app.services.search import (
search as _search,
list_sources as _list_sources,
recent_source_errors as _source_errors,
stats_summary as _stats,
format_results,
)
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["*"],
)
@app.on_event("startup")
def _startup() -> None:
ensure_schema(DB_PATH)
class IncidentCreate(BaseModel):
label: str
issue_type: str = ""
started_at: str | None = None
ended_at: str | None = None
notes: str = ""
severity: str = "medium"
# Serve built Vue assets at the path Vite embeds in index.html.
if (DIST_DIR / "assets").exists():
app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets")
# API router — all routes accessible at /turnstone/api/* and /turnstone/health.
router = APIRouter(prefix="/turnstone")
@router.get("/health")
def health() -> dict:
return {"status": "ok", "db": str(DB_PATH)}
@router.get("/api/search")
def search_logs(
q: Annotated[str, Query(description="Search query")] = "",
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
limit: Annotated[int, Query(ge=1, le=500)] = 50,
) -> dict:
if not q:
return {"count": 0, "results": []}
results = _search(
DB_PATH,
query=q,
source_filter=source,
severity=severity,
since=since,
until=until,
limit=limit,
)
return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]}
@router.get("/api/diagnose")
def diagnose(
q: Annotated[str, Query(description="Service name or problem description")] = "",
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
) -> dict:
if not q:
return {"count": 0, "results": [], "formatted": ""}
# Auto-detect source hints: if a query token matches part of a known source_id,
# use that token as the source_filter so all matching sources (e.g. all
# rotated plex logs) are included — not just the first matched rotation.
detected_source = source
if not detected_source:
known_sources = [s["source_id"] for s in _list_sources(DB_PATH)]
q_lower = q.lower()
for src in known_sources:
parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()]
for p in parts:
if len(p) > 3 and p in q_lower:
detected_source = p # use matched token, not full source_id
break
if detected_source:
break
common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False)
# Broad pass uses OR so any symptom keyword surfaces evidence
broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common)
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
# When a source was auto-detected, also pull its most recent errors via plain SQL —
# FTS ranking can bury real errors from the named service if their text doesn't
# match the symptom keywords. Plain-SQL scan returns actual recent errors regardless.
source_errors: list = []
if detected_source and not source and not errors:
source_errors = _source_errors(
DB_PATH, source_filter=detected_source, severity="ERROR",
limit=10, since=since, until=until,
)
if not source_errors:
source_errors = _source_errors(
DB_PATH, source_filter=detected_source, severity="CRITICAL",
limit=5, since=since, until=until,
)
seen: set[str] = set()
combined = []
for r in broad + critical + errors + source_errors:
if r.entry_id not in seen:
seen.add(r.entry_id)
combined.append(r)
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
combined = combined[:20]
return {
"count": len(combined),
"results": [dataclasses.asdict(r) for r in combined],
"formatted": format_results(combined),
}
@router.get("/api/sources")
def list_sources() -> dict:
return {"sources": _list_sources(DB_PATH)}
@router.get("/api/stats")
def get_stats(
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
) -> dict:
return _stats(DB_PATH, window_hours=window)
@router.post("/api/incidents")
def create_incident_endpoint(body: IncidentCreate) -> dict:
incident = create_incident(
DB_PATH,
label=body.label,
issue_type=body.issue_type,
started_at=body.started_at,
ended_at=body.ended_at,
notes=body.notes,
severity=body.severity,
)
return dataclasses.asdict(incident)
@router.get("/api/incidents")
def list_incidents_endpoint() -> dict:
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
@router.get("/api/incidents/{incident_id}")
def get_incident_endpoint(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
entries = get_incident_entries(DB_PATH, incident)
return {
**dataclasses.asdict(incident),
"entries": [dataclasses.asdict(e) for e in entries],
}
@router.delete("/api/incidents/{incident_id}")
def delete_incident_endpoint(incident_id: str) -> dict:
if not delete_incident(DB_PATH, incident_id):
raise HTTPException(status_code=404, detail="Incident not found")
return {"deleted": incident_id}
@router.get("/api/incidents/{incident_id}/bundle")
def get_incident_bundle(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
@router.post("/api/incidents/{incident_id}/send")
def send_incident_bundle(incident_id: str) -> dict:
if not BUNDLE_ENDPOINT:
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
payload = json.dumps(bundle).encode()
req = urllib.request.Request(
BUNDLE_ENDPOINT,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])}
except urllib.error.HTTPError as exc:
raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc
@router.post("/api/bundles")
def receive_bundle(bundle: dict) -> dict:
record = store_bundle(DB_PATH, bundle)
return {"id": record.id, "entry_count": record.entry_count}
@router.get("/api/bundles")
def list_bundles_endpoint() -> dict:
bundles = list_bundles(DB_PATH)
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
@router.get("/api/bundles/{bundle_id}")
def get_bundle_endpoint(bundle_id: str) -> dict:
bundle = get_bundle(DB_PATH, bundle_id)
if not bundle:
raise HTTPException(status_code=404, detail="Bundle not found")
return dataclasses.asdict(bundle)
app.include_router(router)
# Root redirect → /turnstone/
@app.get("/")
def root_redirect() -> RedirectResponse:
return RedirectResponse(url="/turnstone/")
# SPA catch-all — serves index.html for any /turnstone/* path that isn't a
# static asset or API route. Must be registered after include_router.
@app.get("/turnstone/{path:path}")
def spa_fallback(path: str) -> FileResponse:
if DIST_DIR.exists():
candidate = DIST_DIR / path
if candidate.is_file():
return FileResponse(str(candidate))
return FileResponse(str(DIST_DIR / "index.html"))
return FileResponse("/dev/null", status_code=503)