- app/rest.py: FastAPI app wrapping search/diagnose/sources with CORS - web/: Vue 3 + Vite + UnoCSS + Pinia frontend at port 8535 - LogSearchView: sidebar filters (source, severity, limit) + FTS search - DiagnoseView: layered symptom investigation matching MCP diagnose tool - SourcesView: corpus table with entry count, error count, time range - LogEntryRow: severity badge, pattern chips, repeat count, timestamp - StatusDot: live API health indicator in nav - scripts/start_dev.sh: launch FastAPI (:8534) + Vite dev server (:8535) - .gitignore: add web/node_modules/ and web/dist/ - Caddy: /turnstone* route added to menagerie.circuitforge.tech block (API → :8534 with /turnstone strip, SPA fallback → :8535)
90 lines
3 KiB
Python
90 lines
3 KiB
Python
"""Turnstone REST API — thin HTTP wrapper around the search and ingest services."""
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
from fastapi import FastAPI, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
from app.services.search import search as _search, list_sources as _list_sources, format_results
|
|
|
|
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
|
|
|
|
app = FastAPI(title="Turnstone API", version="0.1.0")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["GET", "POST"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> dict:
|
|
return {"status": "ok", "db": str(DB_PATH)}
|
|
|
|
|
|
@app.get("/api/search")
|
|
def search_logs(
|
|
q: Annotated[str, Query(description="Search query")] = "",
|
|
source: Annotated[str | None, Query(description="Filter by log source ID (partial match)")] = None,
|
|
severity: Annotated[str | None, Query(description="Filter by severity (DEBUG/INFO/WARN/ERROR/CRITICAL)")] = None,
|
|
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
|
|
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
|
|
limit: Annotated[int, Query(ge=1, le=500)] = 50,
|
|
) -> dict:
|
|
if not q:
|
|
return {"count": 0, "results": []}
|
|
results = _search(
|
|
DB_PATH,
|
|
query=q,
|
|
source_filter=source,
|
|
severity=severity,
|
|
since=since,
|
|
until=until,
|
|
limit=limit,
|
|
)
|
|
return {
|
|
"count": len(results),
|
|
"results": [dataclasses.asdict(r) for r in results],
|
|
}
|
|
|
|
|
|
@app.get("/api/diagnose")
|
|
def diagnose(
|
|
q: Annotated[str, Query(description="Service name or problem description")] = "",
|
|
source: Annotated[str | None, Query(description="Limit to a specific source ID (partial match)")] = None,
|
|
since: Annotated[str | None, Query(description="ISO timestamp lower bound")] = None,
|
|
until: Annotated[str | None, Query(description="ISO timestamp upper bound")] = None,
|
|
) -> dict:
|
|
if not q:
|
|
return {"count": 0, "results": [], "formatted": ""}
|
|
common: dict = dict(source_filter=source, since=since, until=until, include_repeats=False)
|
|
broad = _search(DB_PATH, query=q, limit=15, **common)
|
|
critical = _search(DB_PATH, query=q, severity="CRITICAL", limit=5, **common)
|
|
errors = _search(DB_PATH, query=q, severity="ERROR", limit=10, **common)
|
|
|
|
seen: set[str] = set()
|
|
combined = []
|
|
for r in broad + critical + errors:
|
|
if r.entry_id not in seen:
|
|
seen.add(r.entry_id)
|
|
combined.append(r)
|
|
|
|
combined.sort(key=lambda r: (r.timestamp_iso or "\xff", r.sequence))
|
|
combined = combined[:20]
|
|
return {
|
|
"count": len(combined),
|
|
"results": [dataclasses.asdict(r) for r in combined],
|
|
"formatted": format_results(combined),
|
|
}
|
|
|
|
|
|
@app.get("/api/sources")
|
|
def list_sources() -> dict:
|
|
sources = _list_sources(DB_PATH)
|
|
return {"sources": sources}
|