"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix. All routes (API + static files) are mounted at /turnstone so the app works identically whether accessed directly (http://host:8534/turnstone/) or through Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping. """ from __future__ import annotations import dataclasses import json import os import urllib.error import urllib.request from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated from fastapi import APIRouter, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from app.ingest.pipeline import ensure_schema from app.services.incidents import ( build_bundle, create_incident, delete_incident, get_bundle, get_incident, get_incident_entries, list_bundles, list_incidents, store_bundle, ) from app.services.search import ( search as _search, list_sources as _list_sources, recent_source_errors as _source_errors, stats_summary as _stats, format_results, ) from app.services.diagnose import diagnose as _diagnose from app.watch.watcher import Watcher, load_watch_config DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) PREFS_PATH = DB_PATH.parent / "preferences.json" DIST_DIR = Path(__file__).parent.parent / "web" / "dist" SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) _watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml") @asynccontextmanager async def _lifespan(app: FastAPI): ensure_schema(DB_PATH) watch_cfg_path = PATTERN_DIR / "watch.yaml" configs = load_watch_config(watch_cfg_path) if configs: _watcher.configure(configs) _watcher.start() yield _watcher.stop() app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "POST", "DELETE", "PATCH"], allow_headers=["*"], ) _PREFS_DEFAULTS: dict = { "entry_point_style": "topbar", "llm_url": "http://localhost:11434", "llm_model": "llama3.1:8b", "severity_overrides": [ { "name": "PAM auth noise", "pattern": r"pam_unix.*auth(?:entication)?\s+fail|auth could not identify", "override_severity": "WARN", "enabled": True, } ], } def _load_prefs() -> dict[str, str]: if PREFS_PATH.exists(): try: saved = json.loads(PREFS_PATH.read_text()) return {**_PREFS_DEFAULTS, **saved} except (json.JSONDecodeError, OSError): pass return dict(_PREFS_DEFAULTS) def _save_prefs(data: dict[str, str]) -> None: PREFS_PATH.write_text(json.dumps(data)) class DiagnoseRequest(BaseModel): query: str since: str | None = None until: str | None = None class SeverityOverride(BaseModel): name: str pattern: str override_severity: str enabled: bool = True class SettingsBody(BaseModel): entry_point_style: str | None = None llm_url: str | None = None llm_model: str | None = None severity_overrides: list[SeverityOverride] | None = None class IncidentCreate(BaseModel): label: str issue_type: str = "" started_at: str | None = None ended_at: str | None = None notes: str = "" severity: str = "medium" # Serve built Vue assets at the path Vite embeds in index.html. if (DIST_DIR / "assets").exists(): app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets") # API router — all routes accessible at /turnstone/api/* and /turnstone/health. router = APIRouter(prefix="/turnstone") @router.get("/health") def health() -> dict: return {"status": "ok", "db": str(DB_PATH)} @router.get("/api/search") def search_logs( q: Annotated[str, Query(description="Search query")] = "", source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None, severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None, since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None, until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None, limit: Annotated[int, Query(ge=1, le=500)] = 50, ) -> dict: if not q: return {"count": 0, "results": []} results = _search( DB_PATH, query=q, source_filter=source, severity=severity, since=since, until=until, limit=limit, ) return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]} @router.get("/api/diagnose") def diagnose( q: Annotated[str, Query(description="Service name or problem description")] = "", source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None, since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None, until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None, ) -> dict: if not q: return {"count": 0, "results": [], "formatted": ""} # Auto-detect source hints: if a query token matches part of a known source_id, # use that token as the source_filter so all matching sources (e.g. all # rotated plex logs) are included — not just the first matched rotation. detected_source = source if not detected_source: known_sources = [s["source_id"] for s in _list_sources(DB_PATH)] q_lower = q.lower() for src in known_sources: parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()] for p in parts: if len(p) > 3 and p in q_lower: detected_source = p # use matched token, not full source_id break if detected_source: break common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False) # Broad pass uses OR so any symptom keyword surfaces evidence broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common) critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common) errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common) # When a source was auto-detected, also pull its most recent errors via plain SQL — # FTS ranking can bury real errors from the named service if their text doesn't # match the symptom keywords. Plain-SQL scan returns actual recent errors regardless. source_errors: list = [] if detected_source and not source and not errors: source_errors = _source_errors( DB_PATH, source_filter=detected_source, severity="ERROR", limit=10, since=since, until=until, ) if not source_errors: source_errors = _source_errors( DB_PATH, source_filter=detected_source, severity="CRITICAL", limit=5, since=since, until=until, ) seen: set[str] = set() combined = [] for r in broad + critical + errors + source_errors: if r.entry_id not in seen: seen.add(r.entry_id) combined.append(r) combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence)) combined = combined[:20] return { "count": len(combined), "results": [dataclasses.asdict(r) for r in combined], "formatted": format_results(combined), } @router.post("/api/diagnose") def diagnose_post(body: DiagnoseRequest) -> dict: if not body.query.strip(): return { "summary": { "total": 0, "window_start": None, "window_end": None, "time_detected": False, "by_severity": {}, "by_source": {}, }, "entries": [], } prefs = _load_prefs() result = _diagnose( DB_PATH, query=body.query, since=body.since, until=body.until, llm_url=prefs.get("llm_url") or None, llm_model=prefs.get("llm_model") or None, ) return { "summary": result["summary"], "reasoning": result.get("reasoning"), "entries": [dataclasses.asdict(r) for r in result["entries"]], } @router.get("/api/settings") def get_settings() -> dict: return _load_prefs() @router.patch("/api/settings") def patch_settings(body: SettingsBody) -> dict: prefs = _load_prefs() if body.entry_point_style is not None: if body.entry_point_style not in ("topbar", "fab"): raise HTTPException(status_code=422, detail="entry_point_style must be 'topbar' or 'fab'") prefs["entry_point_style"] = body.entry_point_style if body.llm_url is not None: prefs["llm_url"] = body.llm_url if body.llm_model is not None: prefs["llm_model"] = body.llm_model if body.severity_overrides is not None: prefs["severity_overrides"] = [o.model_dump() for o in body.severity_overrides] _save_prefs(prefs) return prefs @router.get("/api/sources") def list_sources() -> dict: return {"sources": _list_sources(DB_PATH)} @router.get("/api/watch/status") def watch_status() -> dict: return {"active": _watcher.is_active(), "sources": _watcher.status} @router.post("/api/watch/reload") def watch_reload() -> dict: """Stop all watch sources and restart with current watch.yaml.""" _watcher.stop() watch_cfg_path = PATTERN_DIR / "watch.yaml" configs = load_watch_config(watch_cfg_path) if configs: _watcher.configure(configs) _watcher.start() return {"reloaded": True, "source_count": len(configs)} @router.get("/api/stats") def get_stats( window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24, ) -> dict: prefs = _load_prefs() return _stats(DB_PATH, window_hours=window, severity_overrides=prefs.get("severity_overrides", [])) @router.post("/api/incidents") def create_incident_endpoint(body: IncidentCreate) -> dict: incident = create_incident( DB_PATH, label=body.label, issue_type=body.issue_type, started_at=body.started_at, ended_at=body.ended_at, notes=body.notes, severity=body.severity, ) return dataclasses.asdict(incident) @router.get("/api/incidents") def list_incidents_endpoint() -> dict: return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]} @router.get("/api/incidents/{incident_id}") def get_incident_endpoint(incident_id: str) -> dict: incident = get_incident(DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") entries = get_incident_entries(DB_PATH, incident) return { **dataclasses.asdict(incident), "entries": [dataclasses.asdict(e) for e in entries], } @router.delete("/api/incidents/{incident_id}") def delete_incident_endpoint(incident_id: str) -> dict: if not delete_incident(DB_PATH, incident_id): raise HTTPException(status_code=404, detail="Incident not found") return {"deleted": incident_id} @router.get("/api/incidents/{incident_id}/bundle") def get_incident_bundle(incident_id: str) -> dict: incident = get_incident(DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST) @router.post("/api/incidents/{incident_id}/send") def send_incident_bundle(incident_id: str) -> dict: if not BUNDLE_ENDPOINT: raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured") incident = get_incident(DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST) payload = json.dumps(bundle).encode() req = urllib.request.Request( BUNDLE_ENDPOINT, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=15) as resp: return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])} except urllib.error.HTTPError as exc: raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc @router.post("/api/bundles") def receive_bundle(bundle: dict) -> dict: record = store_bundle(DB_PATH, bundle) return {"id": record.id, "entry_count": record.entry_count} @router.get("/api/bundles") def list_bundles_endpoint() -> dict: bundles = list_bundles(DB_PATH) return {"bundles": [dataclasses.asdict(b) for b in bundles]} @router.get("/api/bundles/{bundle_id}") def get_bundle_endpoint(bundle_id: str) -> dict: bundle = get_bundle(DB_PATH, bundle_id) if not bundle: raise HTTPException(status_code=404, detail="Bundle not found") return dataclasses.asdict(bundle) app.include_router(router) # Root redirect → /turnstone/ @app.get("/") def root_redirect() -> RedirectResponse: return RedirectResponse(url="/turnstone/") # SPA catch-all — serves index.html for any /turnstone/* path that isn't a # static asset or API route. Must be registered after include_router. @app.get("/turnstone/{path:path}") def spa_fallback(path: str) -> FileResponse: if DIST_DIR.exists(): candidate = DIST_DIR / path if candidate.is_file(): return FileResponse(str(candidate)) return FileResponse(str(DIST_DIR / "index.html")) return FileResponse("/dev/null", status_code=503)