Turnstone incidents now carry an issue_type tag (free-text with datalist
suggestions) used to categorize patterns for signature building.
Backend:
- Incident model gains issue_type; additive ALTER TABLE migration keeps
existing DBs working without a full schema rebuild
- New received_bundles table stores incoming JSON bundles with indexes on
bundled_at and issue_type
- build_bundle() assembles incident + related log entries into a versioned
bundle dict; store_bundle()/list_bundles()/get_bundle() for the receiver
- POST /api/incidents/{id}/send — pushes bundle to TURNSTONE_BUNDLE_ENDPOINT
- GET /api/incidents/{id}/bundle — export without sending
- POST /api/bundles — receive and store an incoming bundle
- GET /api/bundles — list all received bundles
- TURNSTONE_SOURCE_HOST and TURNSTONE_BUNDLE_ENDPOINT env vars; auto-set
source host from hostname in podman-standalone.sh
Frontend:
- Incidents form: issue_type field with datalist suggestions; Type column
in the table; Send Bundle button + status feedback in the detail drawer
- New BundlesView: collapsible bundle rows, inline JSON parse (no extra
round-trip), Export JSON download button
- Router and nav updated with /bundles route
290 lines
10 KiB
Python
290 lines
10 KiB
Python
"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix.
|
|
|
|
All routes (API + static files) are mounted at /turnstone so the app works
|
|
identically whether accessed directly (http://host:8534/turnstone/) or through
|
|
Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, FastAPI, HTTPException, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
|
|
from app.ingest.pipeline import ensure_schema
|
|
from app.services.incidents import (
|
|
build_bundle,
|
|
create_incident,
|
|
delete_incident,
|
|
get_bundle,
|
|
get_incident,
|
|
get_incident_entries,
|
|
list_bundles,
|
|
list_incidents,
|
|
store_bundle,
|
|
)
|
|
from app.services.search import (
|
|
search as _search,
|
|
list_sources as _list_sources,
|
|
recent_source_errors as _source_errors,
|
|
stats_summary as _stats,
|
|
format_results,
|
|
)
|
|
|
|
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
|
|
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
|
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
|
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
|
|
|
|
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["GET", "POST", "DELETE"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.on_event("startup")
|
|
def _startup() -> None:
|
|
ensure_schema(DB_PATH)
|
|
|
|
|
|
class IncidentCreate(BaseModel):
|
|
label: str
|
|
issue_type: str = ""
|
|
started_at: str | None = None
|
|
ended_at: str | None = None
|
|
notes: str = ""
|
|
severity: str = "medium"
|
|
|
|
# Serve built Vue assets at the path Vite embeds in index.html.
|
|
if (DIST_DIR / "assets").exists():
|
|
app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets")
|
|
|
|
# API router — all routes accessible at /turnstone/api/* and /turnstone/health.
|
|
router = APIRouter(prefix="/turnstone")
|
|
|
|
|
|
@router.get("/health")
|
|
def health() -> dict:
|
|
return {"status": "ok", "db": str(DB_PATH)}
|
|
|
|
|
|
@router.get("/api/search")
|
|
def search_logs(
|
|
q: Annotated[str, Query(description="Search query")] = "",
|
|
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
|
|
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
|
|
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
|
|
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
|
|
limit: Annotated[int, Query(ge=1, le=500)] = 50,
|
|
) -> dict:
|
|
if not q:
|
|
return {"count": 0, "results": []}
|
|
results = _search(
|
|
DB_PATH,
|
|
query=q,
|
|
source_filter=source,
|
|
severity=severity,
|
|
since=since,
|
|
until=until,
|
|
limit=limit,
|
|
)
|
|
return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]}
|
|
|
|
|
|
@router.get("/api/diagnose")
|
|
def diagnose(
|
|
q: Annotated[str, Query(description="Service name or problem description")] = "",
|
|
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
|
|
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
|
|
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
|
|
) -> dict:
|
|
if not q:
|
|
return {"count": 0, "results": [], "formatted": ""}
|
|
|
|
# Auto-detect source hints: if a query token matches part of a known source_id,
|
|
# use that token as the source_filter so all matching sources (e.g. all
|
|
# rotated plex logs) are included — not just the first matched rotation.
|
|
detected_source = source
|
|
if not detected_source:
|
|
known_sources = [s["source_id"] for s in _list_sources(DB_PATH)]
|
|
q_lower = q.lower()
|
|
for src in known_sources:
|
|
parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()]
|
|
for p in parts:
|
|
if len(p) > 3 and p in q_lower:
|
|
detected_source = p # use matched token, not full source_id
|
|
break
|
|
if detected_source:
|
|
break
|
|
|
|
common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False)
|
|
# Broad pass uses OR so any symptom keyword surfaces evidence
|
|
broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common)
|
|
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
|
|
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
|
|
|
|
# When a source was auto-detected, also pull its most recent errors via plain SQL —
|
|
# FTS ranking can bury real errors from the named service if their text doesn't
|
|
# match the symptom keywords. Plain-SQL scan returns actual recent errors regardless.
|
|
source_errors: list = []
|
|
if detected_source and not source and not errors:
|
|
source_errors = _source_errors(
|
|
DB_PATH, source_filter=detected_source, severity="ERROR",
|
|
limit=10, since=since, until=until,
|
|
)
|
|
if not source_errors:
|
|
source_errors = _source_errors(
|
|
DB_PATH, source_filter=detected_source, severity="CRITICAL",
|
|
limit=5, since=since, until=until,
|
|
)
|
|
|
|
seen: set[str] = set()
|
|
combined = []
|
|
for r in broad + critical + errors + source_errors:
|
|
if r.entry_id not in seen:
|
|
seen.add(r.entry_id)
|
|
combined.append(r)
|
|
|
|
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
|
|
combined = combined[:20]
|
|
return {
|
|
"count": len(combined),
|
|
"results": [dataclasses.asdict(r) for r in combined],
|
|
"formatted": format_results(combined),
|
|
}
|
|
|
|
|
|
@router.get("/api/sources")
|
|
def list_sources() -> dict:
|
|
return {"sources": _list_sources(DB_PATH)}
|
|
|
|
|
|
@router.get("/api/stats")
|
|
def get_stats(
|
|
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
|
|
) -> dict:
|
|
return _stats(DB_PATH, window_hours=window)
|
|
|
|
|
|
@router.post("/api/incidents")
|
|
def create_incident_endpoint(body: IncidentCreate) -> dict:
|
|
incident = create_incident(
|
|
DB_PATH,
|
|
label=body.label,
|
|
issue_type=body.issue_type,
|
|
started_at=body.started_at,
|
|
ended_at=body.ended_at,
|
|
notes=body.notes,
|
|
severity=body.severity,
|
|
)
|
|
return dataclasses.asdict(incident)
|
|
|
|
|
|
@router.get("/api/incidents")
|
|
def list_incidents_endpoint() -> dict:
|
|
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
|
|
|
|
|
|
@router.get("/api/incidents/{incident_id}")
|
|
def get_incident_endpoint(incident_id: str) -> dict:
|
|
incident = get_incident(DB_PATH, incident_id)
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
entries = get_incident_entries(DB_PATH, incident)
|
|
return {
|
|
**dataclasses.asdict(incident),
|
|
"entries": [dataclasses.asdict(e) for e in entries],
|
|
}
|
|
|
|
|
|
@router.delete("/api/incidents/{incident_id}")
|
|
def delete_incident_endpoint(incident_id: str) -> dict:
|
|
if not delete_incident(DB_PATH, incident_id):
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
return {"deleted": incident_id}
|
|
|
|
|
|
@router.get("/api/incidents/{incident_id}/bundle")
|
|
def get_incident_bundle(incident_id: str) -> dict:
|
|
incident = get_incident(DB_PATH, incident_id)
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
|
|
|
|
|
|
@router.post("/api/incidents/{incident_id}/send")
|
|
def send_incident_bundle(incident_id: str) -> dict:
|
|
if not BUNDLE_ENDPOINT:
|
|
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
|
|
incident = get_incident(DB_PATH, incident_id)
|
|
if not incident:
|
|
raise HTTPException(status_code=404, detail="Incident not found")
|
|
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
|
|
payload = json.dumps(bundle).encode()
|
|
req = urllib.request.Request(
|
|
BUNDLE_ENDPOINT,
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])}
|
|
except urllib.error.HTTPError as exc:
|
|
raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc
|
|
except OSError as exc:
|
|
raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc
|
|
|
|
|
|
@router.post("/api/bundles")
|
|
def receive_bundle(bundle: dict) -> dict:
|
|
record = store_bundle(DB_PATH, bundle)
|
|
return {"id": record.id, "entry_count": record.entry_count}
|
|
|
|
|
|
@router.get("/api/bundles")
|
|
def list_bundles_endpoint() -> dict:
|
|
bundles = list_bundles(DB_PATH)
|
|
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
|
|
|
|
|
@router.get("/api/bundles/{bundle_id}")
|
|
def get_bundle_endpoint(bundle_id: str) -> dict:
|
|
bundle = get_bundle(DB_PATH, bundle_id)
|
|
if not bundle:
|
|
raise HTTPException(status_code=404, detail="Bundle not found")
|
|
return dataclasses.asdict(bundle)
|
|
|
|
|
|
app.include_router(router)
|
|
|
|
|
|
# Root redirect → /turnstone/
|
|
@app.get("/")
|
|
def root_redirect() -> RedirectResponse:
|
|
return RedirectResponse(url="/turnstone/")
|
|
|
|
|
|
# SPA catch-all — serves index.html for any /turnstone/* path that isn't a
|
|
# static asset or API route. Must be registered after include_router.
|
|
@app.get("/turnstone/{path:path}")
|
|
def spa_fallback(path: str) -> FileResponse:
|
|
if DIST_DIR.exists():
|
|
candidate = DIST_DIR / path
|
|
if candidate.is_file():
|
|
return FileResponse(str(candidate))
|
|
return FileResponse(str(DIST_DIR / "index.html"))
|
|
return FileResponse("/dev/null", status_code=503)
|