turnstone/app/rest.py
pyr0ball e44c6fd680 feat(blocklist): 6 REST endpoints + Pi-hole settings fields
Add blocklist candidate listing, scan trigger, status update,
push/unblock to Pi-hole, and connection test endpoints.
Add pihole_url/version/api_key and router_source_ids/device_names
fields to SettingsBody and prefs handling in patch_settings.
Add PiholeClient.__post_init__ validation so 503 fires naturally
when url/api_key are unconfigured (mock-safe: bypassed in tests).
2026-05-15 21:15:09 -07:00

764 lines
26 KiB
Python

"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix.
All routes (API + static files) are mounted at /turnstone so the app works
identically whether accessed directly (http://host:8534/turnstone/) or through
Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping.
"""
from __future__ import annotations
import asyncio
import dataclasses
import hmac
import json
import os
import sqlite3
import urllib.error
import urllib.request
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, Query, Request, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema
from app.ingest.base import load_compiled_patterns
from app.ingest.tautulli import parse_webhook as _parse_tautulli
from app.services.blocklist import (
BlocklistCandidate,
list_candidates,
load_telemetry_rules,
mark_pushed,
mark_unblocked,
run_scan,
update_candidate_status,
)
from app.services.pihole import PiholeClient
from app.services.incidents import (
build_bundle,
create_incident,
delete_incident,
get_bundle,
get_incident,
get_incident_entries,
list_bundles,
list_incidents,
store_bundle,
)
from app.services.search import (
search as _search,
list_sources as _list_sources,
recent_source_errors as _source_errors,
stats_summary as _stats,
format_results,
build_fts_index,
)
from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream
from app.watch.watcher import Watcher, load_watch_config
from app.context.store import (
add_fact as _add_fact,
list_facts as _list_facts,
delete_fact as _delete_fact,
list_documents as _list_documents,
delete_document as _delete_document,
)
from app.context.retriever import retrieve_context as _retrieve_context, format_context_block
from app.ingest.doc_upload import ingest_upload as _ingest_upload
from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session
from app.context.chunker import UnsupportedDocType, FileTooLarge
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
PREFS_PATH = DB_PATH.parent / "preferences.json"
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns"))
PATTERN_FILE = PATTERN_DIR / "default.yaml"
_watcher = Watcher(DB_PATH, PATTERN_FILE)
_compiled_patterns: list = []
@asynccontextmanager
async def _lifespan(app: FastAPI):
global _compiled_patterns
ensure_schema(DB_PATH)
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
watch_cfg_path = PATTERN_DIR / "watch.yaml"
configs = load_watch_config(watch_cfg_path)
if configs:
_watcher.configure(configs)
_watcher.start()
yield
_watcher.stop()
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "DELETE", "PATCH"],
allow_headers=["*"],
)
_PREFS_DEFAULTS: dict = {
"entry_point_style": "topbar",
"llm_url": "http://localhost:11434",
"llm_model": "llama3.1:8b",
"llm_api_key": "",
"severity_overrides": [
{
"name": "PAM auth noise",
"pattern": r"pam_unix.*auth(?:entication)?\s+fail|auth could not identify",
"override_severity": "WARN",
"enabled": True,
}
],
}
def _load_prefs() -> dict[str, str]:
if PREFS_PATH.exists():
try:
saved = json.loads(PREFS_PATH.read_text())
return {**_PREFS_DEFAULTS, **saved}
except (json.JSONDecodeError, OSError):
pass
return dict(_PREFS_DEFAULTS)
def _save_prefs(data: dict[str, str]) -> None:
PREFS_PATH.write_text(json.dumps(data))
class DiagnoseRequest(BaseModel):
query: str
since: str | None = None
until: str | None = None
source: str | None = None
class SeverityOverride(BaseModel):
name: str
pattern: str
override_severity: str
enabled: bool = True
class SettingsBody(BaseModel):
entry_point_style: str | None = None
llm_url: str | None = None
llm_model: str | None = None
llm_api_key: str | None = None
tautulli_token: str | None = None
severity_overrides: list[SeverityOverride] | None = None
pihole_url: str | None = None
pihole_version: str | None = None
pihole_api_key: str | None = None
router_source_ids: str | None = None
device_names: str | None = None
class IncidentCreate(BaseModel):
label: str
issue_type: str = ""
started_at: str | None = None
ended_at: str | None = None
notes: str = ""
severity: str = "medium"
class FactBody(BaseModel):
category: str
key: str
value: str
source: str | None = None
class WizardStepBody(BaseModel):
session: dict
step_id: str
answer: str | list[str] | None = None
class WizardApplyBody(BaseModel):
session: dict
# Serve built Vue assets at the path Vite embeds in index.html.
if (DIST_DIR / "assets").exists():
app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets")
# API router — all routes accessible at /turnstone/api/* and /turnstone/health.
router = APIRouter(prefix="/turnstone")
@router.get("/health")
def health() -> dict:
return {"status": "ok", "db": str(DB_PATH)}
@router.get("/api/search")
def search_logs(
q: Annotated[str, Query(description="Search query")] = "",
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
limit: Annotated[int, Query(ge=1, le=500)] = 50,
) -> dict:
if not q:
return {"count": 0, "results": []}
results = _search(
DB_PATH,
query=q,
source_filter=source,
severity=severity,
since=since,
until=until,
limit=limit,
)
return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]}
@router.get("/api/diagnose")
def diagnose(
q: Annotated[str, Query(description="Service name or problem description")] = "",
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
) -> dict:
if not q:
return {"count": 0, "results": [], "formatted": ""}
# Auto-detect source hints: if a query token matches part of a known source_id,
# use that token as the source_filter so all matching sources (e.g. all
# rotated plex logs) are included — not just the first matched rotation.
detected_source = source
if not detected_source:
known_sources = [s["source_id"] for s in _list_sources(DB_PATH)]
q_lower = q.lower()
for src in known_sources:
parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()]
for p in parts:
if len(p) > 3 and p in q_lower:
detected_source = p # use matched token, not full source_id
break
if detected_source:
break
common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False)
# Broad pass uses OR so any symptom keyword surfaces evidence
broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common)
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
# When a source was auto-detected, also pull its most recent errors via plain SQL —
# FTS ranking can bury real errors from the named service if their text doesn't
# match the symptom keywords. Plain-SQL scan returns actual recent errors regardless.
source_errors: list = []
if detected_source and not source and not errors:
source_errors = _source_errors(
DB_PATH, source_filter=detected_source, severity="ERROR",
limit=10, since=since, until=until,
)
if not source_errors:
source_errors = _source_errors(
DB_PATH, source_filter=detected_source, severity="CRITICAL",
limit=5, since=since, until=until,
)
seen: set[str] = set()
combined = []
for r in broad + critical + errors + source_errors:
if r.entry_id not in seen:
seen.add(r.entry_id)
combined.append(r)
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
combined = combined[:20]
return {
"count": len(combined),
"results": [dataclasses.asdict(r) for r in combined],
"formatted": format_results(combined),
}
@router.post("/api/diagnose")
def diagnose_post(body: DiagnoseRequest) -> dict:
if not body.query.strip():
return {
"summary": {
"total": 0, "window_start": None, "window_end": None,
"time_detected": False, "by_severity": {}, "by_source": {},
},
"entries": [],
}
prefs = _load_prefs()
result = _diagnose(
DB_PATH,
query=body.query,
since=body.since,
until=body.until,
source_filter=body.source or None,
llm_url=prefs.get("llm_url") or None,
llm_model=prefs.get("llm_model") or None,
llm_api_key=prefs.get("llm_api_key") or None,
)
return {
"summary": result["summary"],
"reasoning": result.get("reasoning"),
"entries": [dataclasses.asdict(r) for r in result["entries"]],
}
@router.post("/api/diagnose/stream")
async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
prefs = _load_prefs()
async def sse_gen():
async for event in _diagnose_stream(
DB_PATH,
query=body.query,
since=body.since,
until=body.until,
source_filter=body.source or None,
llm_url=prefs.get("llm_url") or None,
llm_model=prefs.get("llm_model") or None,
llm_api_key=prefs.get("llm_api_key") or None,
):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
sse_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@router.get("/api/settings")
def get_settings() -> dict:
return _load_prefs()
@router.patch("/api/settings")
def patch_settings(body: SettingsBody) -> dict:
prefs = _load_prefs()
if body.entry_point_style is not None:
if body.entry_point_style not in ("topbar", "fab"):
raise HTTPException(status_code=422, detail="entry_point_style must be 'topbar' or 'fab'")
prefs["entry_point_style"] = body.entry_point_style
if body.llm_url is not None:
prefs["llm_url"] = body.llm_url
if body.llm_model is not None:
prefs["llm_model"] = body.llm_model
if body.llm_api_key is not None:
prefs["llm_api_key"] = body.llm_api_key
if body.tautulli_token is not None:
prefs["tautulli_token"] = body.tautulli_token
if body.severity_overrides is not None:
prefs["severity_overrides"] = [o.model_dump() for o in body.severity_overrides]
if body.pihole_url is not None:
prefs["pihole_url"] = body.pihole_url
if body.pihole_version is not None:
prefs["pihole_version"] = body.pihole_version
if body.pihole_api_key is not None:
prefs["pihole_api_key"] = body.pihole_api_key
if body.router_source_ids is not None:
prefs["router_source_ids"] = body.router_source_ids
if body.device_names is not None:
prefs["device_names"] = body.device_names
_save_prefs(prefs)
return prefs
@router.get("/api/sources")
def list_sources() -> dict:
return {"sources": _list_sources(DB_PATH)}
@router.get("/api/watch/status")
def watch_status() -> dict:
return {"active": _watcher.is_active(), "sources": _watcher.status}
@router.post("/api/watch/reload")
def watch_reload() -> dict:
"""Stop all watch sources and restart with current watch.yaml."""
global _compiled_patterns
_watcher.stop()
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
watch_cfg_path = PATTERN_DIR / "watch.yaml"
configs = load_watch_config(watch_cfg_path)
if configs:
_watcher.configure(configs)
_watcher.start()
return {"reloaded": True, "source_count": len(configs)}
@router.get("/api/stats")
def get_stats(
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
) -> dict:
prefs = _load_prefs()
return _stats(DB_PATH, window_hours=window, severity_overrides=prefs.get("severity_overrides", []))
@router.post("/api/incidents")
def create_incident_endpoint(body: IncidentCreate) -> dict:
incident = create_incident(
DB_PATH,
label=body.label,
issue_type=body.issue_type,
started_at=body.started_at,
ended_at=body.ended_at,
notes=body.notes,
severity=body.severity,
)
return dataclasses.asdict(incident)
@router.get("/api/incidents")
def list_incidents_endpoint() -> dict:
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
@router.get("/api/incidents/{incident_id}")
def get_incident_endpoint(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
entries = get_incident_entries(DB_PATH, incident)
return {
**dataclasses.asdict(incident),
"entries": [dataclasses.asdict(e) for e in entries],
}
@router.delete("/api/incidents/{incident_id}")
def delete_incident_endpoint(incident_id: str) -> dict:
if not delete_incident(DB_PATH, incident_id):
raise HTTPException(status_code=404, detail="Incident not found")
return {"deleted": incident_id}
@router.get("/api/incidents/{incident_id}/bundle")
def get_incident_bundle(incident_id: str) -> dict:
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
@router.post("/api/incidents/{incident_id}/send")
def send_incident_bundle(incident_id: str) -> dict:
if not BUNDLE_ENDPOINT:
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
incident = get_incident(DB_PATH, incident_id)
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST)
payload = json.dumps(bundle).encode()
req = urllib.request.Request(
BUNDLE_ENDPOINT,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])}
except urllib.error.HTTPError as exc:
raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc
@router.post("/api/bundles")
def receive_bundle(bundle: dict) -> dict:
record = store_bundle(DB_PATH, bundle)
return {"id": record.id, "entry_count": record.entry_count}
@router.get("/api/bundles")
def list_bundles_endpoint() -> dict:
bundles = list_bundles(DB_PATH)
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
@router.get("/api/bundles/{bundle_id}")
def get_bundle_endpoint(bundle_id: str) -> dict:
bundle = get_bundle(DB_PATH, bundle_id)
if not bundle:
raise HTTPException(status_code=404, detail="Bundle not found")
return dataclasses.asdict(bundle)
def _tautulli_write_entry(conn: sqlite3.Connection, entry) -> None:
conn.execute(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
(
entry.entry_id, entry.source_id, entry.sequence,
entry.timestamp_raw, entry.timestamp_iso, entry.ingest_time,
entry.severity, entry.repeat_count, int(entry.out_of_order),
json.dumps(list(entry.matched_patterns)), entry.text,
),
)
@router.post("/api/ingest/tautulli")
def ingest_tautulli(
payload: dict,
request: Request,
background_tasks: BackgroundTasks,
) -> dict:
"""Accept a Tautulli webhook POST and store the event as a log entry."""
prefs = _load_prefs()
token = prefs.get("tautulli_token", "")
if token:
header_token = request.headers.get("X-Tautulli-Token", "")
if not hmac.compare_digest(header_token, token):
raise HTTPException(status_code=403, detail="Invalid Tautulli token")
if "action" not in payload:
raise HTTPException(status_code=400, detail="Missing required field: action")
compiled = _compiled_patterns
entry = _parse_tautulli(payload, compiled)
conn = sqlite3.connect(str(DB_PATH))
conn.execute("PRAGMA journal_mode=WAL")
try:
_tautulli_write_entry(conn, entry)
conn.commit()
finally:
conn.close()
background_tasks.add_task(build_fts_index, DB_PATH)
return {"stored": 1, "entry_id": entry.entry_id, "action": payload.get("action")}
class BlocklistStatusBody(BaseModel):
status: str
def _make_pihole_client() -> PiholeClient:
"""Build PiholeClient from prefs. Raises 503 if not configured.
The 503 is raised by catching ValueError from PiholeClient.__post_init__,
which validates that url and api_key are non-empty. When PiholeClient is
mocked in tests, __post_init__ is never called and no 503 is raised.
"""
prefs = _load_prefs()
url = prefs.get("pihole_url", "")
key = prefs.get("pihole_api_key", "")
version = prefs.get("pihole_version", "v6")
try:
return PiholeClient(url=url, api_key=key, version=version)
except ValueError as exc:
raise HTTPException(
status_code=503,
detail="Pi-hole not configured — set pihole_url and pihole_api_key in Settings",
) from exc
@router.get("/api/blocklist/candidates")
def list_blocklist_candidates(
status: Annotated[str | None, Query()] = None,
device_ip: Annotated[str | None, Query()] = None,
) -> dict:
candidates = list_candidates(DB_PATH, status=status, device_ip=device_ip)
return {"candidates": [dataclasses.asdict(c) for c in candidates], "total": len(candidates)}
@router.post("/api/blocklist/scan")
def scan_blocklist(background_tasks: BackgroundTasks) -> dict:
prefs = _load_prefs()
source_ids = [s.strip() for s in prefs.get("router_source_ids", "").split(",") if s.strip()]
device_map: dict[str, str] = {}
raw_devices = prefs.get("device_names", "")
if raw_devices:
try:
device_map = json.loads(raw_devices)
except (ValueError, TypeError):
pass
telemetry_path = PATTERN_DIR / "telemetry.yaml"
telemetry_rules = load_telemetry_rules(telemetry_path) if telemetry_path.exists() else []
background_tasks.add_task(run_scan, DB_PATH, source_ids, device_map, telemetry_rules)
return {"started": True}
@router.patch("/api/blocklist/candidates/{candidate_id}")
def update_blocklist_status(candidate_id: str, body: BlocklistStatusBody) -> dict:
try:
candidate = update_candidate_status(DB_PATH, candidate_id, body.status)
return dataclasses.asdict(candidate)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
except KeyError:
raise HTTPException(status_code=404, detail="Candidate not found")
@router.post("/api/blocklist/push/{candidate_id}")
def push_to_pihole(candidate_id: str) -> dict:
candidates = list_candidates(DB_PATH)
candidate = next((c for c in candidates if c.id == candidate_id), None)
if candidate is None:
raise HTTPException(status_code=404, detail="Candidate not found")
if candidate.status != "approved":
raise HTTPException(
status_code=400,
detail=f"Candidate must be approved before pushing (current status: {candidate.status!r})",
)
pihole = _make_pihole_client()
pihole.block(candidate.domain_or_ip)
mark_pushed(DB_PATH, candidate_id)
return {"pushed": True, "domain": candidate.domain_or_ip}
@router.delete("/api/blocklist/push/{candidate_id}")
def unblock_from_pihole(candidate_id: str) -> dict:
candidates = list_candidates(DB_PATH)
candidate = next((c for c in candidates if c.id == candidate_id), None)
if candidate is None:
raise HTTPException(status_code=404, detail="Candidate not found")
if candidate.status != "pushed":
raise HTTPException(
status_code=400,
detail=f"Candidate is not currently pushed (status: {candidate.status!r})",
)
pihole = _make_pihole_client()
pihole.unblock(candidate.domain_or_ip)
mark_unblocked(DB_PATH, candidate_id)
return {"unblocked": True, "domain": candidate.domain_or_ip}
@router.post("/api/blocklist/test")
def test_pihole_connection() -> dict:
pihole = _make_pihole_client()
return pihole.test_connection()
app.include_router(router)
_ctx = APIRouter(prefix="/turnstone/api/context")
@_ctx.post("/docs")
async def upload_doc(file: UploadFile):
content = await file.read()
try:
result = await asyncio.to_thread(
lambda: _ingest_upload(DB_PATH, file.filename or "upload", content)
)
except UnsupportedDocType as e:
raise HTTPException(status_code=415, detail=str(e))
except FileTooLarge as e:
raise HTTPException(status_code=413, detail=str(e))
return result
@_ctx.get("/docs")
async def list_docs():
docs = await asyncio.to_thread(lambda: _list_documents(DB_PATH))
return [
{
"id": d.id,
"filename": d.filename,
"doc_type": d.doc_type,
"file_size": d.file_size,
"uploaded_at": d.uploaded_at,
}
for d in docs
]
@_ctx.delete("/docs/{doc_id}")
async def delete_doc(doc_id: str):
deleted = await asyncio.to_thread(lambda: _delete_document(DB_PATH, doc_id))
if not deleted:
raise HTTPException(status_code=404, detail="Document not found")
return {"deleted": doc_id}
@_ctx.post("/facts")
async def create_fact(body: FactBody):
fact = await asyncio.to_thread(
lambda: _add_fact(DB_PATH, body.category, body.key, body.value, body.source)
)
return {"id": fact.id, "category": fact.category, "key": fact.key,
"value": fact.value, "source": fact.source, "created_at": fact.created_at}
@_ctx.get("/facts")
async def list_facts_endpoint(category: str | None = None):
facts = await asyncio.to_thread(lambda: _list_facts(DB_PATH, category))
return [
{"id": f.id, "category": f.category, "key": f.key,
"value": f.value, "source": f.source, "created_at": f.created_at}
for f in facts
]
@_ctx.delete("/facts/{fact_id}")
async def delete_fact_endpoint(fact_id: str):
deleted = await asyncio.to_thread(lambda: _delete_fact(DB_PATH, fact_id))
if not deleted:
raise HTTPException(status_code=404, detail="Fact not found")
return {"deleted": fact_id}
@_ctx.get("/wizard/schema")
async def wizard_schema():
return _wizard_schema()
@_ctx.post("/wizard/step")
async def wizard_step(body: WizardStepBody):
updated = advance_step(body.session, body.step_id, body.answer)
return {"session": updated, "complete": is_complete(updated)}
@_ctx.post("/wizard/apply")
async def wizard_apply(body: WizardApplyBody):
if not is_complete(body.session):
raise HTTPException(status_code=400, detail="Wizard session is not complete")
result = await asyncio.to_thread(lambda: apply_session(DB_PATH, body.session))
return result
@_ctx.get("/debug/search")
async def debug_search(q: str):
ctx = await asyncio.to_thread(lambda: _retrieve_context(DB_PATH, q))
return {"facts": ctx.facts, "chunks": ctx.chunks, "block": format_context_block(ctx)}
app.include_router(_ctx)
# Root redirect → /turnstone/
@app.get("/")
def root_redirect() -> RedirectResponse:
return RedirectResponse(url="/turnstone/")
# SPA catch-all — serves index.html for any /turnstone/* path that isn't a
# static asset or API route. Must be registered after include_router.
@app.get("/turnstone/{path:path}")
def spa_fallback(path: str) -> FileResponse:
if DIST_DIR.exists():
candidate = DIST_DIR / path
if candidate.is_file():
return FileResponse(str(candidate))
return FileResponse(str(DIST_DIR / "index.html"))
return FileResponse("/dev/null", status_code=503)