turnstone/app/rest.py

371 lines
12 KiB
Python

"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix.
All routes (API + static files) are mounted at /turnstone so the app works
identically whether accessed directly (http://host:8534/turnstone/) or through
Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
"""
from __future__ import annotations
import dataclasses
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema
from app.services.incidents import (
build_bundle,
create_incident,
delete_incident,
get_bundle,
get_incident,
get_incident_entries,
list_bundles,
list_incidents,
store_bundle,
)
from app.services.search import (
search as _search,
list_sources as _list_sources,
recent_source_errors as _source_errors,
stats_summary as _stats,
format_results,
)
from app.services.diagnose import diagnose as _diagnose
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
PREFS_PATH = DB_PATH.parent / "preferences.json"
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "DELETE", "PATCH"],
allow_headers=["*"],
)
@app.on_event("startup")
def _startup() -> None:
ensure_schema(DB_PATH)
_PREFS_DEFAULTS: dict[str, str] = {
"entry_point_style": "topbar",
"llm_url": "http://localhost:11434",
"llm_model": "llama3.1:8b",
}
def _load_prefs() -> dict[str, str]:
if PREFS_PATH.exists():
try:
saved = json.loads(PREFS_PATH.read_text())
return {**_PREFS_DEFAULTS, **saved}
except (json.JSONDecodeError, OSError):
pass
return dict(_PREFS_DEFAULTS)
def _save_prefs(data: dict[str, str]) -> None:
PREFS_PATH.write_text(json.dumps(data))
class DiagnoseRequest(BaseModel):
query: str
since: str | None = None
until: str | None = None
class SettingsBody(BaseModel):
entry_point_style: str | None = None
llm_url: str | None = None
llm_model: str | None = None
class IncidentCreate(BaseModel):
label: str
issue_type: str = ""
started_at: str | None = None
ended_at: str | None = None
notes: str = ""
severity: str = "medium"
# Serve built Vue assets at the path Vite embeds in index.html.
if (DIST_DIR / "assets").exists():
app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets")
# API router — all routes accessible at /turnstone/api/* and /turnstone/health.
router = APIRouter(prefix="/turnstone")
@router.get("/health")
def health() -> dict:
return {"status": "ok", "db": str(DB_PATH)}
@router.get("/api/search")
def search_logs(
q: Annotated[str, Query(description="Search query")] = "",
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
limit: Annotated[int, Query(ge=1, le=500)] = 50,
) -> dict:
if not q:
return {"count": 0, "results": []}
results = _search(
DB_PATH,
query=q,
source_filter=source,
severity=severity,
since=since,
until=until,
limit=limit,
)
return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]}
@router.get("/api/diagnose")
def diagnose(
q: Annotated[str, Query(description="Service name or problem description")] = "",
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
) -> dict:
if not q:
return {"count": 0, "results": [], "formatted": ""}
# Auto-detect source hints: if a query token matches part of a known source_id,
# use that token as the source_filter so all matching sources (e.g. all
# rotated plex logs) are included — not just the first matched rotation.
detected_source = source
if not detected_source:
known_sources = [s["source_id"] for s in _list_sources(DB_PATH)]
q_lower = q.lower()
for src in known_sources:
parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()]
for p in parts:
if len(p) > 3 and p in q_lower:
detected_source = p # use matched token, not full source_id
break
if detected_source:
break
common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False)
# Broad pass uses OR so any symptom keyword surfaces evidence
broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common)
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
# When a source was auto-detected, also pull its most recent errors via plain SQL —
# FTS ranking can bury real errors from the named service if their text doesn't
# match the symptom keywords. Plain-SQL scan returns actual recent errors regardless.
source_errors: list = []
if detected_source and not source and not errors:
source_errors = _source_errors(
DB_PATH, source_filter=detected_source, severity="ERROR",
limit=10, since=since, until=until,
)
if not source_errors:
source_errors = _source_errors(
DB_PATH, source_filter=detected_source, severity="CRITICAL",
limit=5, since=since, until=until,
)
seen: set[str] = set()
combined = []
for r in broad + critical + errors + source_errors:
if r.entry_id not in seen:
seen.add(r.entry_id)
combined.append(r)
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
combined = combined[:20]
return {
"count": len(combined),
"results": [dataclasses.asdict(r) for r in combined],
"formatted": format_results(combined),
}
@router.post("/api/diagnose")
def diagnose_post(body: DiagnoseRequest) -> dict:
if not body.query.strip():
return {
"summary": {
"total": 0, "window_start": None, "window_end": None,
"time_detected": False, "by_severity": {}, "by_source": {},
},
"entries": [],
}
prefs = _load_prefs()
result = _diagnose(
DB_PATH,
query=body.query,
since=body.since,
until=body.until,
llm_url=prefs.get("llm_url") or None,
llm_model=prefs.get("llm_model") or None,
)
return {
"summary": result["summary"],
"reasoning": result.get("reasoning"),
"entries": [dataclasses.asdict(r) for r in result["entries"]],
}
@router.get("/api/settings")
def get_settings() -> dict:
return _load_prefs()
@router.patch("/api/settings")
def patch_settings(body: SettingsBody) -> dict:
prefs = _load_prefs()
if body.entry_point_style is not None:
if body.entry_point_style not in ("topbar", "fab"):
raise HTTPException(status_code=422, detail="entry_point_style must be 'topbar' or 'fab'")
prefs["entry_point_style"] = body.entry_point_style
if body.llm_url is not None:
prefs["llm_url"] = body.llm_url
if body.llm_model is not None:
prefs["llm_model"] = body.llm_model
_save_prefs(prefs)
return prefs
@router.get("/api/sources")
def list_sources() -> dict:
return {"sources": _list_sources(DB_PATH)}
@router.get("/api/stats")
def get_stats(
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
) -> dict:
return _stats(DB_PATH, window_hours=window)
@router.post("/api/incidents")
def create_incident_endpoint(body: IncidentCreate) -> dict:
incident = create_incident(
DB_PATH,
label=body.label,
issue_type=body.issue_type,
started_at=body.started_at,
ended_at=body.ended_at,
notes=body.notes,
severity=body.severity,
)
return dataclasses.asdict(incident)
@router.get("/api/incidents")
def list_incidents_endpoint() -> dict:
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
@router.get("/api/incidents/{incident_id}")
def get_incident_endpoint(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
entries = get_incident_entries(DB_PATH, incident)
return {
**dataclasses.asdict(incident),
"entries": [dataclasses.asdict(e) for e in entries],
}
@router.delete("/api/incidents/{incident_id}")
def delete_incident_endpoint(incident_id: str) -> dict:
if not delete_incident(DB_PATH, incident_id):
raise HTTPException(status_code=404, detail="Incident not found")
return {"deleted": incident_id}
@router.get("/api/incidents/{incident_id}/bundle")
def get_incident_bundle(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
@router.post("/api/incidents/{incident_id}/send")
def send_incident_bundle(incident_id: str) -> dict:
if not BUNDLE_ENDPOINT:
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
payload = json.dumps(bundle).encode()
req = urllib.request.Request(
BUNDLE_ENDPOINT,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])}
except urllib.error.HTTPError as exc:
raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc
@router.post("/api/bundles")
def receive_bundle(bundle: dict) -> dict:
record = store_bundle(DB_PATH, bundle)
return {"id": record.id, "entry_count": record.entry_count}
@router.get("/api/bundles")
def list_bundles_endpoint() -> dict:
bundles = list_bundles(DB_PATH)
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
@router.get("/api/bundles/{bundle_id}")
def get_bundle_endpoint(bundle_id: str) -> dict:
bundle = get_bundle(DB_PATH, bundle_id)
if not bundle:
raise HTTPException(status_code=404, detail="Bundle not found")
return dataclasses.asdict(bundle)
app.include_router(router)
# Root redirect → /turnstone/
@app.get("/")
def root_redirect() -> RedirectResponse:
return RedirectResponse(url="/turnstone/")
# SPA catch-all — serves index.html for any /turnstone/* path that isn't a
# static asset or API route. Must be registered after include_router.
@app.get("/turnstone/{path:path}")
def spa_fallback(path: str) -> FileResponse:
if DIST_DIR.exists():
candidate = DIST_DIR / path
if candidate.is_file():
return FileResponse(str(candidate))
return FileResponse(str(DIST_DIR / "index.html"))
return FileResponse("/dev/null", status_code=503)