"""Turnstone REST API — serves REST API and Vue SPA under the /turnstone prefix. All routes (API + static files) are mounted at /turnstone so the app works identically whether accessed directly (http://host:8534/turnstone/) or through Caddy (menagerie.circuitforge.tech/turnstone) without prefix stripping. """ from __future__ import annotations import asyncio import dataclasses import hmac import json import os import sqlite3 import urllib.error import urllib.request from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated from fastapi import APIRouter, BackgroundTasks, FastAPI, HTTPException, Query, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from app.ingest.pipeline import ensure_schema from app.ingest.base import load_compiled_patterns from app.ingest.tautulli import parse_webhook as _parse_tautulli from app.services.blocklist import ( BlocklistCandidate, get_candidate, list_candidates, load_telemetry_rules, mark_pushed, mark_unblocked, run_scan, update_candidate_status, ) from app.services.pihole import PiholeClient from app.services.incidents import ( build_bundle, create_incident, delete_incident, get_bundle, get_incident, get_incident_entries, list_bundles, list_incidents, store_bundle, ) from app.services.search import ( search as _search, list_sources as _list_sources, recent_source_errors as _source_errors, stats_summary as _stats, format_results, build_fts_index, ) from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream from app.watch.watcher import Watcher, load_watch_config from app.context.store import ( add_fact as _add_fact, list_facts as _list_facts, delete_fact as _delete_fact, list_documents as _list_documents, delete_document as _delete_document, ) from app.context.retriever import retrieve_context as _retrieve_context, format_context_block from app.ingest.doc_upload import ingest_upload as _ingest_upload from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session from app.context.chunker import UnsupportedDocType, FileTooLarge DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) PREFS_PATH = DB_PATH.parent / "preferences.json" DIST_DIR = Path(__file__).parent.parent / "web" / "dist" SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) PATTERN_FILE = PATTERN_DIR / "default.yaml" _watcher = Watcher(DB_PATH, PATTERN_FILE) _compiled_patterns: list = [] @asynccontextmanager async def _lifespan(app: FastAPI): global _compiled_patterns ensure_schema(DB_PATH) _compiled_patterns = load_compiled_patterns(PATTERN_FILE) watch_cfg_path = PATTERN_DIR / "watch.yaml" configs = load_watch_config(watch_cfg_path) if configs: _watcher.configure(configs) _watcher.start() yield _watcher.stop() app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "POST", "DELETE", "PATCH"], allow_headers=["*"], ) _PREFS_DEFAULTS: dict = { "entry_point_style": "topbar", "llm_url": "http://localhost:11434", "llm_model": "llama3.1:8b", "llm_api_key": "", "severity_overrides": [ { "name": "PAM auth noise", "pattern": r"pam_unix.*auth(?:entication)?\s+fail|auth could not identify", "override_severity": "WARN", "enabled": True, } ], } def _load_prefs() -> dict[str, str]: if PREFS_PATH.exists(): try: saved = json.loads(PREFS_PATH.read_text()) return {**_PREFS_DEFAULTS, **saved} except (json.JSONDecodeError, OSError): pass return dict(_PREFS_DEFAULTS) def _save_prefs(data: dict[str, str]) -> None: PREFS_PATH.write_text(json.dumps(data)) class DiagnoseRequest(BaseModel): query: str since: str | None = None until: str | None = None source: str | None = None class SeverityOverride(BaseModel): name: str pattern: str override_severity: str enabled: bool = True class SettingsBody(BaseModel): entry_point_style: str | None = None llm_url: str | None = None llm_model: str | None = None llm_api_key: str | None = None tautulli_token: str | None = None severity_overrides: list[SeverityOverride] | None = None pihole_url: str | None = None pihole_version: str | None = None pihole_api_key: str | None = None router_source_ids: str | None = None device_names: str | None = None class IncidentCreate(BaseModel): label: str issue_type: str = "" started_at: str | None = None ended_at: str | None = None notes: str = "" severity: str = "medium" class FactBody(BaseModel): category: str key: str value: str source: str | None = None class WizardStepBody(BaseModel): session: dict step_id: str answer: str | list[str] | None = None class WizardApplyBody(BaseModel): session: dict # Serve built Vue assets at the path Vite embeds in index.html. if (DIST_DIR / "assets").exists(): app.mount("/turnstone/assets", StaticFiles(directory=str(DIST_DIR / "assets")), name="assets") # API router — all routes accessible at /turnstone/api/* and /turnstone/health. router = APIRouter(prefix="/turnstone") @router.get("/health") def health() -> dict: return {"status": "ok", "db": str(DB_PATH)} @router.get("/api/search") def search_logs( q: Annotated[str, Query(description="Search query")] = "", source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None, severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None, since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None, until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None, limit: Annotated[int, Query(ge=1, le=500)] = 50, ) -> dict: if not q: return {"count": 0, "results": []} results = _search( DB_PATH, query=q, source_filter=source, severity=severity, since=since, until=until, limit=limit, ) return {"count": len(results), "results": [dataclasses.asdict(r) for r in results]} @router.get("/api/diagnose") def diagnose( q: Annotated[str, Query(description="Service name or problem description")] = "", source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None, since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None, until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None, ) -> dict: if not q: return {"count": 0, "results": [], "formatted": ""} # Auto-detect source hints: if a query token matches part of a known source_id, # use that token as the source_filter so all matching sources (e.g. all # rotated plex logs) are included — not just the first matched rotation. detected_source = source if not detected_source: known_sources = [s["source_id"] for s in _list_sources(DB_PATH)] q_lower = q.lower() for src in known_sources: parts = [p for seg in src.split(":") for p in seg.replace("-", " ").replace("_", " ").split()] for p in parts: if len(p) > 3 and p in q_lower: detected_source = p # use matched token, not full source_id break if detected_source: break common: dict = dict(source_filter=detected_source, since=since, until=until, include_repeats=False) # Broad pass uses OR so any symptom keyword surfaces evidence broad = _search(DB_PATH, query=q, limit=15, or_mode=True, **common) critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common) errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common) # When a source was auto-detected, also pull its most recent errors via plain SQL — # FTS ranking can bury real errors from the named service if their text doesn't # match the symptom keywords. Plain-SQL scan returns actual recent errors regardless. source_errors: list = [] if detected_source and not source and not errors: source_errors = _source_errors( DB_PATH, source_filter=detected_source, severity="ERROR", limit=10, since=since, until=until, ) if not source_errors: source_errors = _source_errors( DB_PATH, source_filter=detected_source, severity="CRITICAL", limit=5, since=since, until=until, ) seen: set[str] = set() combined = [] for r in broad + critical + errors + source_errors: if r.entry_id not in seen: seen.add(r.entry_id) combined.append(r) combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence)) combined = combined[:20] return { "count": len(combined), "results": [dataclasses.asdict(r) for r in combined], "formatted": format_results(combined), } @router.post("/api/diagnose") def diagnose_post(body: DiagnoseRequest) -> dict: if not body.query.strip(): return { "summary": { "total": 0, "window_start": None, "window_end": None, "time_detected": False, "by_severity": {}, "by_source": {}, }, "entries": [], } prefs = _load_prefs() result = _diagnose( DB_PATH, query=body.query, since=body.since, until=body.until, source_filter=body.source or None, llm_url=prefs.get("llm_url") or None, llm_model=prefs.get("llm_model") or None, llm_api_key=prefs.get("llm_api_key") or None, ) return { "summary": result["summary"], "reasoning": result.get("reasoning"), "entries": [dataclasses.asdict(r) for r in result["entries"]], } @router.post("/api/diagnose/stream") async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse: prefs = _load_prefs() async def sse_gen(): async for event in _diagnose_stream( DB_PATH, query=body.query, since=body.since, until=body.until, source_filter=body.source or None, llm_url=prefs.get("llm_url") or None, llm_model=prefs.get("llm_model") or None, llm_api_key=prefs.get("llm_api_key") or None, ): yield f"data: {json.dumps(event)}\n\n" return StreamingResponse( sse_gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @router.get("/api/settings") def get_settings() -> dict: return _load_prefs() @router.patch("/api/settings") def patch_settings(body: SettingsBody) -> dict: prefs = _load_prefs() if body.entry_point_style is not None: if body.entry_point_style not in ("topbar", "fab"): raise HTTPException(status_code=422, detail="entry_point_style must be 'topbar' or 'fab'") prefs["entry_point_style"] = body.entry_point_style if body.llm_url is not None: prefs["llm_url"] = body.llm_url if body.llm_model is not None: prefs["llm_model"] = body.llm_model if body.llm_api_key is not None: prefs["llm_api_key"] = body.llm_api_key if body.tautulli_token is not None: prefs["tautulli_token"] = body.tautulli_token if body.severity_overrides is not None: prefs["severity_overrides"] = [o.model_dump() for o in body.severity_overrides] if body.pihole_url is not None: prefs["pihole_url"] = body.pihole_url if body.pihole_version is not None: prefs["pihole_version"] = body.pihole_version if body.pihole_api_key is not None: prefs["pihole_api_key"] = body.pihole_api_key if body.router_source_ids is not None: prefs["router_source_ids"] = body.router_source_ids if body.device_names is not None: prefs["device_names"] = body.device_names _save_prefs(prefs) return prefs @router.get("/api/sources") def list_sources() -> dict: return {"sources": _list_sources(DB_PATH)} @router.get("/api/watch/status") def watch_status() -> dict: return {"active": _watcher.is_active(), "sources": _watcher.status} @router.post("/api/watch/reload") def watch_reload() -> dict: """Stop all watch sources and restart with current watch.yaml.""" global _compiled_patterns _watcher.stop() _compiled_patterns = load_compiled_patterns(PATTERN_FILE) watch_cfg_path = PATTERN_DIR / "watch.yaml" configs = load_watch_config(watch_cfg_path) if configs: _watcher.configure(configs) _watcher.start() return {"reloaded": True, "source_count": len(configs)} @router.get("/api/stats") def get_stats( window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24, ) -> dict: prefs = _load_prefs() return _stats(DB_PATH, window_hours=window, severity_overrides=prefs.get("severity_overrides", [])) @router.post("/api/incidents") def create_incident_endpoint(body: IncidentCreate) -> dict: incident = create_incident( DB_PATH, label=body.label, issue_type=body.issue_type, started_at=body.started_at, ended_at=body.ended_at, notes=body.notes, severity=body.severity, ) return dataclasses.asdict(incident) @router.get("/api/incidents") def list_incidents_endpoint() -> dict: return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]} @router.get("/api/incidents/{incident_id}") def get_incident_endpoint(incident_id: str) -> dict: incident = get_incident(DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") entries = get_incident_entries(DB_PATH, incident) return { **dataclasses.asdict(incident), "entries": [dataclasses.asdict(e) for e in entries], } @router.delete("/api/incidents/{incident_id}") def delete_incident_endpoint(incident_id: str) -> dict: if not delete_incident(DB_PATH, incident_id): raise HTTPException(status_code=404, detail="Incident not found") return {"deleted": incident_id} @router.get("/api/incidents/{incident_id}/bundle") def get_incident_bundle(incident_id: str) -> dict: incident = get_incident(DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") return build_bundle(DB_PATH, incident, source_host=SOURCE_HOST) @router.post("/api/incidents/{incident_id}/send") def send_incident_bundle(incident_id: str) -> dict: if not BUNDLE_ENDPOINT: raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured") incident = get_incident(DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST) payload = json.dumps(bundle).encode() req = urllib.request.Request( BUNDLE_ENDPOINT, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=15) as resp: return {"sent": True, "status": resp.status, "entry_count": len(bundle["log_entries"])} except urllib.error.HTTPError as exc: raise HTTPException(status_code=502, detail=f"Receiver returned {exc.code}") from exc except OSError as exc: raise HTTPException(status_code=502, detail=f"Send failed: {exc}") from exc @router.post("/api/bundles") def receive_bundle(bundle: dict) -> dict: record = store_bundle(DB_PATH, bundle) return {"id": record.id, "entry_count": record.entry_count} @router.get("/api/bundles") def list_bundles_endpoint() -> dict: bundles = list_bundles(DB_PATH) return {"bundles": [dataclasses.asdict(b) for b in bundles]} @router.get("/api/bundles/{bundle_id}") def get_bundle_endpoint(bundle_id: str) -> dict: bundle = get_bundle(DB_PATH, bundle_id) if not bundle: raise HTTPException(status_code=404, detail="Bundle not found") return dataclasses.asdict(bundle) def _tautulli_write_entry(conn: sqlite3.Connection, entry) -> None: conn.execute( """ INSERT OR IGNORE INTO log_entries (id, source_id, sequence, timestamp_raw, timestamp_iso, ingest_time, severity, repeat_count, out_of_order, matched_patterns, text) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, ( entry.entry_id, entry.source_id, entry.sequence, entry.timestamp_raw, entry.timestamp_iso, entry.ingest_time, entry.severity, entry.repeat_count, int(entry.out_of_order), json.dumps(list(entry.matched_patterns)), entry.text, ), ) @router.post("/api/ingest/tautulli") def ingest_tautulli( payload: dict, request: Request, background_tasks: BackgroundTasks, ) -> dict: """Accept a Tautulli webhook POST and store the event as a log entry.""" prefs = _load_prefs() token = prefs.get("tautulli_token", "") if token: header_token = request.headers.get("X-Tautulli-Token", "") if not hmac.compare_digest(header_token, token): raise HTTPException(status_code=403, detail="Invalid Tautulli token") if "action" not in payload: raise HTTPException(status_code=400, detail="Missing required field: action") compiled = _compiled_patterns entry = _parse_tautulli(payload, compiled) conn = sqlite3.connect(str(DB_PATH)) conn.execute("PRAGMA journal_mode=WAL") try: _tautulli_write_entry(conn, entry) conn.commit() finally: conn.close() background_tasks.add_task(build_fts_index, DB_PATH) return {"stored": 1, "entry_id": entry.entry_id, "action": payload.get("action")} class BlocklistStatusBody(BaseModel): status: str def _make_pihole_client() -> PiholeClient: """Build PiholeClient from prefs. Raises 503 if not configured. The 503 is raised by catching ValueError from PiholeClient.__post_init__, which validates that url and api_key are non-empty. When PiholeClient is mocked in tests, __post_init__ is never called and no 503 is raised. """ prefs = _load_prefs() url = prefs.get("pihole_url", "") key = prefs.get("pihole_api_key", "") version = prefs.get("pihole_version", "v6") try: return PiholeClient(url=url, api_key=key, version=version) except ValueError as exc: raise HTTPException( status_code=503, detail="Pi-hole not configured — set pihole_url and pihole_api_key in Settings", ) from exc @router.get("/api/blocklist/candidates") def list_blocklist_candidates( status: Annotated[str | None, Query()] = None, device_ip: Annotated[str | None, Query()] = None, ) -> dict: candidates = list_candidates(DB_PATH, status=status, device_ip=device_ip) return {"candidates": [dataclasses.asdict(c) for c in candidates], "total": len(candidates)} @router.post("/api/blocklist/scan") def scan_blocklist(background_tasks: BackgroundTasks) -> dict: prefs = _load_prefs() source_ids = [s.strip() for s in prefs.get("router_source_ids", "").split(",") if s.strip()] device_map: dict[str, str] = {} raw_devices = prefs.get("device_names", "") if raw_devices: try: device_map = json.loads(raw_devices) except (ValueError, TypeError): raise HTTPException(status_code=400, detail="device_names is not valid JSON — update it in Settings") telemetry_path = PATTERN_DIR / "telemetry.yaml" telemetry_rules = load_telemetry_rules(telemetry_path) if telemetry_path.exists() else [] background_tasks.add_task(run_scan, DB_PATH, source_ids, device_map, telemetry_rules) return {"started": True} @router.patch("/api/blocklist/candidates/{candidate_id}") def update_blocklist_status(candidate_id: str, body: BlocklistStatusBody) -> dict: try: candidate = update_candidate_status(DB_PATH, candidate_id, body.status) return dataclasses.asdict(candidate) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) except KeyError: raise HTTPException(status_code=404, detail="Candidate not found") @router.post("/api/blocklist/push/{candidate_id}") def push_to_pihole(candidate_id: str) -> dict: try: candidate = get_candidate(DB_PATH, candidate_id) except KeyError: raise HTTPException(status_code=404, detail="Candidate not found") if candidate.status != "approved": raise HTTPException( status_code=400, detail=f"Candidate must be approved before pushing (current status: {candidate.status!r})", ) pihole = _make_pihole_client() pihole.block(candidate.domain_or_ip) mark_pushed(DB_PATH, candidate_id) return {"pushed": True, "domain": candidate.domain_or_ip} @router.delete("/api/blocklist/push/{candidate_id}") def unblock_from_pihole(candidate_id: str) -> dict: try: candidate = get_candidate(DB_PATH, candidate_id) except KeyError: raise HTTPException(status_code=404, detail="Candidate not found") if candidate.status != "pushed": raise HTTPException( status_code=400, detail=f"Candidate is not currently pushed (status: {candidate.status!r})", ) pihole = _make_pihole_client() pihole.unblock(candidate.domain_or_ip) mark_unblocked(DB_PATH, candidate_id) return {"unblocked": True, "domain": candidate.domain_or_ip} @router.post("/api/blocklist/test") def test_pihole_connection() -> dict: pihole = _make_pihole_client() return pihole.test_connection() app.include_router(router) _ctx = APIRouter(prefix="/turnstone/api/context") @_ctx.post("/docs") async def upload_doc(file: UploadFile): content = await file.read() try: result = await asyncio.to_thread( lambda: _ingest_upload(DB_PATH, file.filename or "upload", content) ) except UnsupportedDocType as e: raise HTTPException(status_code=415, detail=str(e)) except FileTooLarge as e: raise HTTPException(status_code=413, detail=str(e)) return result @_ctx.get("/docs") async def list_docs(): docs = await asyncio.to_thread(lambda: _list_documents(DB_PATH)) return [ { "id": d.id, "filename": d.filename, "doc_type": d.doc_type, "file_size": d.file_size, "uploaded_at": d.uploaded_at, } for d in docs ] @_ctx.delete("/docs/{doc_id}") async def delete_doc(doc_id: str): deleted = await asyncio.to_thread(lambda: _delete_document(DB_PATH, doc_id)) if not deleted: raise HTTPException(status_code=404, detail="Document not found") return {"deleted": doc_id} @_ctx.post("/facts") async def create_fact(body: FactBody): fact = await asyncio.to_thread( lambda: _add_fact(DB_PATH, body.category, body.key, body.value, body.source) ) return {"id": fact.id, "category": fact.category, "key": fact.key, "value": fact.value, "source": fact.source, "created_at": fact.created_at} @_ctx.get("/facts") async def list_facts_endpoint(category: str | None = None): facts = await asyncio.to_thread(lambda: _list_facts(DB_PATH, category)) return [ {"id": f.id, "category": f.category, "key": f.key, "value": f.value, "source": f.source, "created_at": f.created_at} for f in facts ] @_ctx.delete("/facts/{fact_id}") async def delete_fact_endpoint(fact_id: str): deleted = await asyncio.to_thread(lambda: _delete_fact(DB_PATH, fact_id)) if not deleted: raise HTTPException(status_code=404, detail="Fact not found") return {"deleted": fact_id} @_ctx.get("/wizard/schema") async def wizard_schema(): return _wizard_schema() @_ctx.post("/wizard/step") async def wizard_step(body: WizardStepBody): updated = advance_step(body.session, body.step_id, body.answer) return {"session": updated, "complete": is_complete(updated)} @_ctx.post("/wizard/apply") async def wizard_apply(body: WizardApplyBody): if not is_complete(body.session): raise HTTPException(status_code=400, detail="Wizard session is not complete") result = await asyncio.to_thread(lambda: apply_session(DB_PATH, body.session)) return result @_ctx.get("/debug/search") async def debug_search(q: str): ctx = await asyncio.to_thread(lambda: _retrieve_context(DB_PATH, q)) return {"facts": ctx.facts, "chunks": ctx.chunks, "block": format_context_block(ctx)} app.include_router(_ctx) # Root redirect → /turnstone/ @app.get("/") def root_redirect() -> RedirectResponse: return RedirectResponse(url="/turnstone/") # SPA catch-all — serves index.html for any /turnstone/* path that isn't a # static asset or API route. Must be registered after include_router. @app.get("/turnstone/{path:path}") def spa_fallback(path: str) -> FileResponse: if DIST_DIR.exists(): candidate = DIST_DIR / path if candidate.is_file(): return FileResponse(str(candidate)) return FileResponse(str(DIST_DIR / "index.html")) return FileResponse("/dev/null", status_code=503)