Wires the context/RAG layer into FastAPI via a dedicated _ctx router (/turnstone/api/context/*): document upload (POST/GET/DELETE /docs), fact CRUD (POST/GET/DELETE /facts), wizard state machine (/wizard/schema, /wizard/step, /wizard/apply), and a debug search endpoint (/debug/search). All blocking DB calls are dispatched via asyncio.to_thread to keep the event loop free.
574 lines
19 KiB
Python
574 lines
19 KiB
Python
"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix.
|
|
|
|
All routes (API + static files) are mounted at /turnstone so the app works
|
|
identically whether accessed directly (http://host:8534/turnstone/) or through
|
|
Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, FastAPI, HTTPException, Query, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
from app.ingest.pipeline import ensure_schema
|
|
from app.services.incidents import (
|
|
build_bundle,
|
|
create_incident,
|
|
delete_incident,
|
|
get_bundle,
|
|
get_incident,
|
|
get_incident_entries,
|
|
list_bundles,
|
|
list_incidents,
|
|
store_bundle,
|
|
)
|
|
from app.services.search import (
|
|
search as _search,
|
|
list_sources as _list_sources,
|
|
recent_source_errors as _source_errors,
|
|
stats_summary as _stats,
|
|
format_results,
|
|
)
|
|
from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream
|
|
from app.watch.watcher import Watcher, load_watch_config
|
|
from app.context.store import (
|
|
add_fact as _add_fact,
|
|
list_facts as _list_facts,
|
|
delete_fact as _delete_fact,
|
|
list_documents as _list_documents,
|
|
delete_document as _delete_document,
|
|
)
|
|
from app.context.retriever import retrieve_context as _retrieve_context, format_context_block
|
|
from app.ingest.doc_upload import ingest_upload as _ingest_upload
|
|
from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session
|
|
from app.context.chunker import UnsupportedDocType, FileTooLarge
|
|
|
|
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
|
|
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
|
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
|
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
|
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
|
|
PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns"))
|
|
|
|
_watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml")
|
|
|
|
|
|
@asynccontextmanager
|
|
async def _lifespan(app: FastAPI):
|
|
ensure_schema(DB_PATH)
|
|
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
|
configs = load_watch_config(watch_cfg_path)
|
|
if configs:
|
|
_watcher.configure(configs)
|
|
_watcher.start()
|
|
yield
|
|
_watcher.stop()
|
|
|
|
|
|
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["GET", "POST", "DELETE", "PATCH"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
_PREFS_DEFAULTS: dict = {
|
|
"entry_point_style": "topbar",
|
|
"llm_url": "http://localhost:11434",
|
|
"llm_model": "llama3.1:8b",
|
|
"llm_api_key": "",
|
|
"severity_overrides": [
|
|
{
|
|
"name": "PAM auth noise",
|
|
"pattern": r"pam_unix.*auth(?:entication)?\s+fail|auth could not identify",
|
|
"override_severity": "WARN",
|
|
"enabled": True,
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def _load_prefs() -> dict[str, str]:
|
|
if PREFS_PATH.exists():
|
|
try:
|
|
saved = json.loads(PREFS_PATH.read_text())
|
|
return {**_PREFS_DEFAULTS, **saved}
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return dict(_PREFS_DEFAULTS)
|
|
|
|
|
|
def _save_prefs(data: dict[str, str]) -> None:
|
|
PREFS_PATH.write_text(json.dumps(data))
|
|
|
|
|
|
class DiagnoseRequest(BaseModel):
|
|
query: str
|
|
since: str | None = None
|
|
until: str | None = None
|
|
source: str | None = None
|
|
|
|
|
|
class SeverityOverride(BaseModel):
|
|
name: str
|
|
pattern: str
|
|
override_severity: str
|
|
enabled: bool = True
|
|
|
|
|
|
class SettingsBody(BaseModel):
|
|
entry_point_style: str | None = None
|
|
llm_url: str | None = None
|
|
llm_model: str | None = None
|
|
llm_api_key: str | None = None
|
|
severity_overrides: list[SeverityOverride] | None = None
|
|
|
|
|
|
class IncidentCreate(BaseModel):
|
|
label: str
|
|
issue_type: str = ""
|
|
started_at: str | None = None
|
|
ended_at: str | None = None
|
|
notes: str = ""
|
|
severity: str = "medium"
|
|
|
|
|
|
class FactBody(BaseModel):
|
|
category: str
|
|
key: str
|
|
value: str
|
|
source: str | None = None
|
|
|
|
|
|
class WizardStepBody(BaseModel):
|
|
session: dict
|
|
step_id: str
|
|
answer: str | list[str] | None = None
|
|
|
|
|
|
class WizardApplyBody(BaseModel):
|
|
session: dict
|
|
|
|
# Serve built Vue assets at the path Vite embeds in index.html.
|
|
if (DIST_DIR / "assets").exists():
|
|
app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets")
|
|
|
|
# API router — all routes accessible at /turnstone/api/* and /turnstone/health.
|
|
router = APIRouter(prefix="/turnstone")
|
|
|
|
|
|
@router.get("/health")
|
|
def health() -> dict:
|
|
return {"status": "ok", "db": str(DB_PATH)}
|
|
|
|
|
|
@router.get("/api/search")
|
|
def search_logs(
|
|
q: Annotated[str, Query(description="Search query")] = "",
|
|
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
|
|
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
|
|
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
|
|
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
|
|
limit: Annotated[int, Query(ge=1, le=500)] = 50,
|
|
) -> dict:
|
|
if not q:
|
|
return {"count": 0, "results": []}
|
|
results = _search(
|
|
DB_PATH,
|
|
query=q,
|
|
source_filter=source,
|
|
severity=severity,
|
|
since=since,
|
|
until=until,
|
|
limit=limit,
|
|
)
|
|
return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]}
|
|
|
|
|
|
@router.get("/api/diagnose")
|
|
def diagnose(
|
|
q: Annotated[str, Query(description="Service name or problem description")] = "",
|
|
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
|
|
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
|
|
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
|
|
) -> dict:
|
|
if not q:
|
|
return {"count": 0, "results": [], "formatted": ""}
|
|
|
|
# Auto-detect source hints: if a query token matches part of a known source_id,
|
|
# use that token as the source_filter so all matching sources (e.g. all
|
|
# rotated plex logs) are included — not just the first matched rotation.
|
|
detected_source = source
|
|
if not detected_source:
|
|
known_sources = [s["source_id"] for s in _list_sources(DB_PATH)]
|
|
q_lower = q.lower()
|
|
for src in known_sources:
|
|
parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()]
|
|
for p in parts:
|
|
if len(p) > 3 and p in q_lower:
|
|
detected_source = p # use matched token, not full source_id
|
|
break
|
|
if detected_source:
|
|
break
|
|
|
|
common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False)
|
|
# Broad pass uses OR so any symptom keyword surfaces evidence
|
|
broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common)
|
|
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
|
|
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
|
|
|
|
# When a source was auto-detected, also pull its most recent errors via plain SQL —
|
|
# FTS ranking can bury real errors from the named service if their text doesn't
|
|
# match the symptom keywords. Plain-SQL scan returns actual recent errors regardless.
|
|
source_errors: list = []
|
|
if detected_source and not source and not errors:
|
|
source_errors = _source_errors(
|
|
DB_PATH, source_filter=detected_source, severity="ERROR",
|
|
limit=10, since=since, until=until,
|
|
)
|
|
if not source_errors:
|
|
source_errors = _source_errors(
|
|
DB_PATH, source_filter=detected_source, severity="CRITICAL",
|
|
limit=5, since=since, until=until,
|
|
)
|
|
|
|
seen: set[str] = set()
|
|
combined = []
|
|
for r in broad + critical + errors + source_errors:
|
|
if r.entry_id not in seen:
|
|
seen.add(r.entry_id)
|
|
combined.append(r)
|
|
|
|
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
|
|
combined = combined[:20]
|
|
return {
|
|
"count": len(combined),
|
|
"results": [dataclasses.asdict(r) for r in combined],
|
|
"formatted": format_results(combined),
|
|
}
|
|
|
|
|
|
@router.post("/api/diagnose")
|
|
def diagnose_post(body: DiagnoseRequest) -> dict:
|
|
if not body.query.strip():
|
|
return {
|
|
"summary": {
|
|
"total": 0, "window_start": None, "window_end": None,
|
|
"time_detected": False, "by_severity": {}, "by_source": {},
|
|
},
|
|
"entries": [],
|
|
}
|
|
prefs = _load_prefs()
|
|
result = _diagnose(
|
|
DB_PATH,
|
|
query=body.query,
|
|
since=body.since,
|
|
until=body.until,
|
|
source_filter=body.source or None,
|
|
llm_url=prefs.get("llm_url") or None,
|
|
llm_model=prefs.get("llm_model") or None,
|
|
llm_api_key=prefs.get("llm_api_key") or None,
|
|
)
|
|
return {
|
|
"summary": result["summary"],
|
|
"reasoning": result.get("reasoning"),
|
|
"entries": [dataclasses.asdict(r) for r in result["entries"]],
|
|
}
|
|
|
|
|
|
@router.post("/api/diagnose/stream")
|
|
async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
|
|
prefs = _load_prefs()
|
|
|
|
async def sse_gen():
|
|
async for event in _diagnose_stream(
|
|
DB_PATH,
|
|
query=body.query,
|
|
since=body.since,
|
|
until=body.until,
|
|
source_filter=body.source or None,
|
|
llm_url=prefs.get("llm_url") or None,
|
|
llm_model=prefs.get("llm_model") or None,
|
|
llm_api_key=prefs.get("llm_api_key") or None,
|
|
):
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
|
|
return StreamingResponse(
|
|
sse_gen(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
|
)
|
|
|
|
|
|
@router.get("/api/settings")
|
|
def get_settings() -> dict:
|
|
return _load_prefs()
|
|
|
|
|
|
@router.patch("/api/settings")
|
|
def patch_settings(body: SettingsBody) -> dict:
|
|
prefs = _load_prefs()
|
|
if body.entry_point_style is not None:
|
|
if body.entry_point_style not in ("topbar", "fab"):
|
|
raise HTTPException(status_code=422, detail="entry_point_style must be 'topbar' or 'fab'")
|
|
prefs["entry_point_style"] = body.entry_point_style
|
|
if body.llm_url is not None:
|
|
prefs["llm_url"] = body.llm_url
|
|
if body.llm_model is not None:
|
|
prefs["llm_model"] = body.llm_model
|
|
if body.llm_api_key is not None:
|
|
prefs["llm_api_key"] = body.llm_api_key
|
|
if body.severity_overrides is not None:
|
|
prefs["severity_overrides"] = [o.model_dump() for o in body.severity_overrides]
|
|
_save_prefs(prefs)
|
|
return prefs
|
|
|
|
|
|
@router.get("/api/sources")
|
|
def list_sources() -> dict:
|
|
return {"sources": _list_sources(DB_PATH)}
|
|
|
|
|
|
@router.get("/api/watch/status")
|
|
def watch_status() -> dict:
|
|
return {"active": _watcher.is_active(), "sources": _watcher.status}
|
|
|
|
|
|
@router.post("/api/watch/reload")
|
|
def watch_reload() -> dict:
|
|
"""Stop all watch sources and restart with current watch.yaml."""
|
|
_watcher.stop()
|
|
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
|
configs = load_watch_config(watch_cfg_path)
|
|
if configs:
|
|
_watcher.configure(configs)
|
|
_watcher.start()
|
|
return {"reloaded": True, "source_count": len(configs)}
|
|
|
|
|
|
@router.get("/api/stats")
|
|
def get_stats(
|
|
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
|
|
) -> dict:
|
|
prefs = _load_prefs()
|
|
return _stats(DB_PATH, window_hours=window, severity_overrides=prefs.get("severity_overrides", []))
|
|
|
|
|
|
@router.post("/api/incidents")
|
|
def create_incident_endpoint(body: IncidentCreate) -> dict:
|
|
incident = create_incident(
|
|
DB_PATH,
|
|
label=body.label,
|
|
issue_type=body.issue_type,
|
|
started_at=body.started_at,
|
|
ended_at=body.ended_at,
|
|
notes=body.notes,
|
|
severity=body.severity,
|
|
)
|
|
return dataclasses.asdict(incident)
|
|
|
|
|
|
@router.get("/api/incidents")
|
|
def list_incidents_endpoint() -> dict:
|
|
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
|
|
|
|
|
|
@router.get("/api/incidents/{incident_id}")
|
|
def get_incident_endpoint(incident_id: str) -> dict:
|
|
incident = get_incident(DB_PATH, incident_id)
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
entries = get_incident_entries(DB_PATH, incident)
|
|
return {
|
|
**dataclasses.asdict(incident),
|
|
"entries": [dataclasses.asdict(e) for e in entries],
|
|
}
|
|
|
|
|
|
@router.delete("/api/incidents/{incident_id}")
|
|
def delete_incident_endpoint(incident_id: str) -> dict:
|
|
if not delete_incident(DB_PATH, incident_id):
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
return {"deleted": incident_id}
|
|
|
|
|
|
@router.get("/api/incidents/{incident_id}/bundle")
|
|
def get_incident_bundle(incident_id: str) -> dict:
|
|
incident = get_incident(DB_PATH, incident_id)
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
|
|
|
|
|
|
@router.post("/api/incidents/{incident_id}/send")
|
|
def send_incident_bundle(incident_id: str) -> dict:
|
|
if not BUNDLE_ENDPOINT:
|
|
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
|
|
incident = get_incident(DB_PATH, incident_id)
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
|
|
payload = json.dumps(bundle).encode()
|
|
req = urllib.request.Request(
|
|
BUNDLE_ENDPOINT,
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])}
|
|
except urllib.error.HTTPError as exc:
|
|
raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc
|
|
except OSError as exc:
|
|
raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc
|
|
|
|
|
|
@router.post("/api/bundles")
|
|
def receive_bundle(bundle: dict) -> dict:
|
|
record = store_bundle(DB_PATH, bundle)
|
|
return {"id": record.id, "entry_count": record.entry_count}
|
|
|
|
|
|
@router.get("/api/bundles")
|
|
def list_bundles_endpoint() -> dict:
|
|
bundles = list_bundles(DB_PATH)
|
|
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
|
|
|
|
|
@router.get("/api/bundles/{bundle_id}")
|
|
def get_bundle_endpoint(bundle_id: str) -> dict:
|
|
bundle = get_bundle(DB_PATH, bundle_id)
|
|
if not bundle:
|
|
raise HTTPException(status_code=404, detail="Bundle not found")
|
|
return dataclasses.asdict(bundle)
|
|
|
|
|
|
app.include_router(router)
|
|
|
|
_ctx = APIRouter(prefix="/turnstone/api/context")
|
|
|
|
|
|
@_ctx.post("/docs")
|
|
async def upload_doc(file: UploadFile):
|
|
content = await file.read()
|
|
try:
|
|
result = await asyncio.to_thread(
|
|
lambda: _ingest_upload(DB_PATH, file.filename or "upload", content)
|
|
)
|
|
except UnsupportedDocType as e:
|
|
raise HTTPException(status_code=415, detail=str(e))
|
|
except FileTooLarge as e:
|
|
raise HTTPException(status_code=413, detail=str(e))
|
|
return result
|
|
|
|
|
|
@_ctx.get("/docs")
|
|
async def list_docs():
|
|
docs = await asyncio.to_thread(lambda: _list_documents(DB_PATH))
|
|
return [
|
|
{
|
|
"id": d.id,
|
|
"filename": d.filename,
|
|
"doc_type": d.doc_type,
|
|
"file_size": d.file_size,
|
|
"uploaded_at": d.uploaded_at,
|
|
}
|
|
for d in docs
|
|
]
|
|
|
|
|
|
@_ctx.delete("/docs/{doc_id}")
|
|
async def delete_doc(doc_id: str):
|
|
deleted = await asyncio.to_thread(lambda: _delete_document(DB_PATH, doc_id))
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
return {"deleted": doc_id}
|
|
|
|
|
|
@_ctx.post("/facts")
|
|
async def create_fact(body: FactBody):
|
|
fact = await asyncio.to_thread(
|
|
lambda: _add_fact(DB_PATH, body.category, body.key, body.value, body.source)
|
|
)
|
|
return {"id": fact.id, "category": fact.category, "key": fact.key,
|
|
"value": fact.value, "source": fact.source, "created_at": fact.created_at}
|
|
|
|
|
|
@_ctx.get("/facts")
|
|
async def list_facts_endpoint(category: str | None = None):
|
|
facts = await asyncio.to_thread(lambda: _list_facts(DB_PATH, category))
|
|
return [
|
|
{"id": f.id, "category": f.category, "key": f.key,
|
|
"value": f.value, "source": f.source, "created_at": f.created_at}
|
|
for f in facts
|
|
]
|
|
|
|
|
|
@_ctx.delete("/facts/{fact_id}")
|
|
async def delete_fact_endpoint(fact_id: str):
|
|
deleted = await asyncio.to_thread(lambda: _delete_fact(DB_PATH, fact_id))
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Fact not found")
|
|
return {"deleted": fact_id}
|
|
|
|
|
|
@_ctx.get("/wizard/schema")
|
|
async def wizard_schema():
|
|
return _wizard_schema()
|
|
|
|
|
|
@_ctx.post("/wizard/step")
|
|
async def wizard_step(body: WizardStepBody):
|
|
updated = advance_step(body.session, body.step_id, body.answer)
|
|
return {"session": updated, "complete": is_complete(updated)}
|
|
|
|
|
|
@_ctx.post("/wizard/apply")
|
|
async def wizard_apply(body: WizardApplyBody):
|
|
if not is_complete(body.session):
|
|
raise HTTPException(status_code=400, detail="Wizard session is not complete")
|
|
result = await asyncio.to_thread(lambda: apply_session(DB_PATH, body.session))
|
|
return result
|
|
|
|
|
|
@_ctx.get("/debug/search")
|
|
async def debug_search(q: str):
|
|
ctx = await asyncio.to_thread(lambda: _retrieve_context(DB_PATH, q))
|
|
return {"facts": ctx.facts, "chunks": ctx.chunks, "block": format_context_block(ctx)}
|
|
|
|
|
|
app.include_router(_ctx)
|
|
|
|
|
|
# Root redirect → /turnstone/
|
|
@app.get("/")
|
|
def root_redirect() -> RedirectResponse:
|
|
return RedirectResponse(url="/turnstone/")
|
|
|
|
|
|
# SPA catch-all — serves index.html for any /turnstone/* path that isn't a
|
|
# static asset or API route. Must be registered after include_router.
|
|
@app.get("/turnstone/{path:path}")
|
|
def spa_fallback(path: str) -> FileResponse:
|
|
if DIST_DIR.exists():
|
|
candidate = DIST_DIR / path
|
|
if candidate.is_file():
|
|
return FileResponse(str(candidate))
|
|
return FileResponse(str(DIST_DIR / "index.html"))
|
|
return FileResponse("/dev/null", status_code=503)
|