feat: SSE streaming diagnose, severity filter pills, per-source-cap search

- diagnose_stream() async generator: status/summary/entries/reasoning/done events
- POST /api/diagnose/stream SSE endpoint wired in rest.py
- entries_in_window() gains per_source_cap to prevent high-volume sources crowding results
- QuickCapture: severity filter pills, filtered entries view, pipeline status spinner
- llm.py: remove overly broad HTTPStatusError re-raise
This commit is contained in:
pyr0ball 2026-05-13 15:45:35 -07:00
parent 812c934822
commit dbdba4080f
6 changed files with 333 additions and 46 deletions

View file

@ -17,7 +17,7 @@ from typing import Annotated
from fastapi import APIRouter, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
@ -40,7 +40,7 @@ from app.services.search import (
stats_summary as _stats,
format_results,
)
from app.services.diagnose import diagnose as _diagnose
from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream
from app.watch.watcher import Watcher, load_watch_config
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
@ -262,6 +262,30 @@ def diagnose_post(body: DiagnoseRequest) -> dict:
}
@router.post("/api/diagnose/stream")
async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
prefs = _load_prefs()
async def sse_gen():
async for event in _diagnose_stream(
DB_PATH,
query=body.query,
since=body.since,
until=body.until,
source_filter=body.source or None,
llm_url=prefs.get("llm_url") or None,
llm_model=prefs.get("llm_model") or None,
llm_api_key=prefs.get("llm_api_key") or None,
):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
sse_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@router.get("/api/settings")
def get_settings() -> dict:
return _load_prefs()

View file

@ -1,8 +1,11 @@
"""Frictionless diagnose service — NL time extraction + layered log search."""
from __future__ import annotations
import asyncio
import dataclasses
import logging
import re
from collections.abc import AsyncGenerator
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
@ -50,8 +53,18 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
return since, until, keywords or query
if _HAS_DATEPARSER and _search_dates is not None:
# Tell dateparser what timezone the user is in so "3:35 am" means local time.
# PREFER_DAY_OF_MONTH is unused here but PREFER_DATES_FROM=past ensures
# "3:35 am" resolves to the most recent past occurrence, not a future one.
local_offset = datetime.now().astimezone().utcoffset()
offset_h = int((local_offset.total_seconds() if local_offset else 0) / 3600)
tz_str = f"UTC{'+' if offset_h >= 0 else ''}{offset_h}"
try:
results = _search_dates(query, languages=["en"], settings={"PREFER_DATES_FROM": "past"})
results = _search_dates(
query,
languages=["en"],
settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True},
)
except Exception:
logger.warning("dateparser failed on query %r — falling back to 60-min window", query)
results = None
@ -59,6 +72,8 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]:
phrase, dt = results[0]
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare
since = (dt - timedelta(minutes=30)).isoformat()
until = (dt + timedelta(minutes=30)).isoformat()
keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip())
@ -88,7 +103,7 @@ def diagnose(
keywords = query
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True)
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50)
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15)
seen: set[str] = set()
merged: list[SearchResult] = []
@ -125,6 +140,111 @@ def diagnose(
}
async def diagnose_stream(
db_path: Path,
query: str,
since: str | None = None,
until: str | None = None,
source_filter: str | None = None,
llm_url: str | None = None,
llm_model: str | None = None,
llm_api_key: str | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""Async generator yielding SSE event dicts for the diagnose pipeline.
Yields events in order:
{"type":"status","message":""} pipeline progress
{"type":"summary","data":{}} window + severity counts (fast, from DB)
{"type":"entries","data":[]} log entries (fast, from DB)
{"type":"reasoning","text":""} LLM analysis (slow, optional)
{"type":"done"}
"""
keywords = query.strip()
source_browse = not keywords and source_filter is not None
if source_browse:
# No keyword — browsing a source directly. Use 24h window; skip FTS entirely.
yield {"type": "status", "message": f"Loading {source_filter}"}
since = since or _last_n_minutes(60 * 24)
until = until or _now_iso()
time_detected = False
else:
yield {"type": "status", "message": "Parsing time window…"}
time_detected = since is not None and until is not None
if not time_detected:
parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query)
since = since or parsed_since
until = until or parsed_until
time_detected = keywords != query
yield {"type": "status", "message": "Searching logs…"}
if source_browse:
keyword_hits: list[SearchResult] = []
window_hits = await asyncio.to_thread(
lambda: entries_in_window(
db_path, since, until,
source_filter=source_filter, limit=200,
)
)
else:
keyword_hits, window_hits = await asyncio.gather(
asyncio.to_thread(
lambda: search(
db_path, keywords,
source_filter=source_filter, since=since, until=until,
limit=150, or_mode=True,
)
),
asyncio.to_thread(
lambda: entries_in_window(
db_path, since, until,
source_filter=source_filter, limit=50, per_source_cap=15,
)
),
)
seen: set[str] = set()
merged: list[SearchResult] = []
for r in keyword_hits + window_hits:
if r.entry_id not in seen:
seen.add(r.entry_id)
merged.append(r)
combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200]
by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0}
by_source: dict[str, int] = {}
for r in combined:
sev = (r.severity or "INFO").upper()
if sev in by_severity:
by_severity[sev] += 1
by_source[r.source_id] = by_source.get(r.source_id, 0) + 1
yield {
"type": "summary",
"data": {
"total": len(combined),
"window_start": since,
"window_end": until,
"time_detected": time_detected,
"by_severity": by_severity,
"by_source": by_source,
},
}
yield {"type": "entries", "data": [dataclasses.asdict(r) for r in combined]}
if llm_url and llm_model and combined:
yield {"type": "status", "message": "Analyzing with LLM…"}
reasoning = await asyncio.to_thread(
lambda: summarize(query, combined, llm_url, llm_model, llm_api_key)
)
if reasoning:
yield {"type": "reasoning", "text": reasoning}
yield {"type": "done"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()

View file

@ -75,8 +75,6 @@ def summarize(
resp.raise_for_status()
# 404 means no assignment configured — fall through to direct model call
logger.debug("No task assignment for turnstone.log_analysis — falling back to direct model")
except httpx.HTTPStatusError:
raise
except Exception as exc:
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)

View file

@ -170,11 +170,16 @@ def entries_in_window(
severity: str | None = None,
source_filter: str | None = None,
limit: int = 100,
per_source_cap: int | None = None,
) -> list[SearchResult]:
"""Return log entries within a time window using a plain SQL scan (no FTS).
Used as a fallback when keyword search returns nothing ensures incident
detail always shows the raw log activity in the window even if no keywords match.
per_source_cap: when set, limits rows per source_id so high-volume sources
(e.g. network-syslog) don't crowd out lower-volume but more interesting ones.
Errors/warnings are ranked first within each source partition.
"""
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
@ -197,19 +202,47 @@ def entries_in_window(
params.append(f"%{source_filter}%")
where = " AND ".join(conditions)
params.append(limit)
rows = conn.execute(
f"""
if per_source_cap is not None:
# Use a window function to cap rows per source, errors/warnings first.
query = f"""
WITH ranked AS (
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank,
ROW_NUMBER() OVER (
PARTITION BY source_id
ORDER BY
CASE UPPER(severity)
WHEN 'CRITICAL' THEN 0
WHEN 'ERROR' THEN 1
WHEN 'WARN' THEN 2
ELSE 3
END,
timestamp_iso
) AS rn
FROM log_entries
WHERE {where}
)
SELECT entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, rank
FROM ranked
WHERE rn <= ?
ORDER BY timestamp_iso ASC
LIMIT ?
"""
params.extend([per_source_cap, limit])
else:
query = f"""
SELECT id as entry_id, source_id, sequence, timestamp_iso, severity,
repeat_count, out_of_order, matched_patterns, text, 0.0 as rank
FROM log_entries
WHERE {where}
ORDER BY timestamp_iso ASC
LIMIT ?
""",
params,
).fetchall()
"""
params.append(limit)
rows = conn.execute(query, params).fetchall()
conn.close()
return [

View file

@ -23,7 +23,17 @@ VITE_PORT=5174 # Vite HMR port in dev mode (proxies /api → 8534)
LOG_DIR="log"
API_PID_FILE=".turnstone-api.pid"
DB="${TURNSTONE_DB:-${SCRIPT_DIR}/data/turnstone.db}"
# Default to the live cluster DB when present; fall back to dev DB.
_CLUSTER_DB="/devl/turnstone-cluster/data/turnstone.db"
_DEV_DB="${SCRIPT_DIR}/data/turnstone.db"
if [[ -z "${TURNSTONE_DB:-}" ]]; then
DB="$([[ -d /devl/turnstone-cluster ]] && echo "${_CLUSTER_DB}" || echo "${_DEV_DB}")"
else
DB="${TURNSTONE_DB}"
fi
# Use cluster patterns (watch.yaml, default.yaml) when available.
PATTERN_DIR="${TURNSTONE_PATTERNS:-$([[ -d /devl/turnstone-cluster/patterns ]] && echo "/devl/turnstone-cluster/patterns" || echo "${SCRIPT_DIR}/patterns")}"
CONDA_BASE="${CONDA_BASE:-/devl/miniconda3}"
PYTHON="${CONDA_BASE}/envs/cf/bin/python"
@ -35,6 +45,31 @@ _is_alive() {
[[ -f "$pid_file" ]] && kill -0 "$(<"$pid_file")" 2>/dev/null
}
# Kill any process currently holding a TCP port.
_kill_port() {
local port="$1"
local pids
pids=$(ss -tlnp "sport = :${port}" 2>/dev/null | grep -oP '(?<=pid=)\d+' | sort -u)
[[ -z "$pids" ]] && return 0
for pid in $pids; do
warn "Killing stray PID ${pid} on port ${port}"
kill "$pid" 2>/dev/null || true
done
}
# Wait for a port to stop accepting connections (i.e. fully released).
_wait_for_port_free() {
local port="$1"
for _i in $(seq 1 30); do
sleep 0.3
(echo "" >/dev/tcp/127.0.0.1/"$port") 2>/dev/null || return 0
done
warn "Port ${port} still occupied after 9 s — trying SIGKILL"
_kill_port "$port"
sleep 1
(echo "" >/dev/tcp/127.0.0.1/"$port") 2>/dev/null && warn "Port ${port} still in use!" || true
}
_kill_pid_file() {
local pid_file="$1" label="$2"
if [[ -f "$pid_file" ]]; then
@ -48,7 +83,7 @@ _kill_pid_file() {
rm -f "$pid_file"
fi
else
warn "$label not running."
warn "No PID file for $label."
fi
}
@ -123,7 +158,9 @@ case "$CMD" in
success "SPA built → web/dist/"
info "Starting on port ${API_PORT}"
TURNSTONE_DB="$DB" nohup "$PYTHON" -m uvicorn app.rest:app \
info " DB: ${DB}"
info " Patterns: ${PATTERN_DIR}"
TURNSTONE_DB="$DB" TURNSTONE_PATTERNS="$PATTERN_DIR" nohup "$PYTHON" -m uvicorn app.rest:app \
--host 0.0.0.0 --port "$API_PORT" \
>> "${LOG_DIR}/api.log" 2>&1 &
echo $! > "$API_PID_FILE"
@ -133,6 +170,8 @@ case "$CMD" in
stop)
_kill_pid_file "$API_PID_FILE" "Turnstone"
_kill_port "$API_PORT"
_wait_for_port_free "$API_PORT"
;;
restart)

View file

@ -14,15 +14,24 @@
@click="run()"
class="px-6 py-2.5 rounded bg-accent text-white text-sm font-semibold hover:bg-blue-400 transition-colors disabled:opacity-50"
>
<span v-if="loading">Searching</span>
<span v-if="loading">Go</span>
<span v-else>Go</span>
</button>
</div>
<!-- Pipeline status -->
<div
v-if="loading && statusMsg"
class="flex items-center gap-2 mb-3 text-xs text-text-dim"
>
<span class="inline-block w-3 h-3 rounded-full border-2 border-accent border-t-transparent animate-spin" />
<span>{{ statusMsg }}</span>
</div>
<!-- Source scope badge -->
<div v-if="sourceScope" class="flex items-center gap-2 mb-4 text-xs">
<span class="text-text-dim">Scoped to:</span>
<span class="font-mono text-accent bg-accent/10 border border-accent/20 rounded px-2 py-0.5">{{ sourceScope }}</span>
<span class="font-mono text-surface bg-accent rounded px-2 py-0.5">{{ sourceScope }}</span>
<button
@click="sourceScope = null"
class="text-text-dim hover:text-text-primary ml-1"
@ -38,16 +47,27 @@
<!-- Summary header -->
<div v-if="summary" class="mb-4 rounded border border-surface-border bg-surface-raised p-4">
<div class="flex flex-wrap gap-x-6 gap-y-1 text-xs text-text-dim">
<span class="text-text-muted font-medium">{{ summary.total }} entr{{ summary.total !== 1 ? 'ies' : 'y' }}</span>
<span class="text-text-muted font-medium">
{{ severityFilter ? filteredEntries.length + ' of ' : '' }}{{ summary.total }} entr{{ summary.total !== 1 ? 'ies' : 'y' }}
</span>
<span v-if="summary.window_start">
{{ fmtTs(summary.window_start) }} {{ fmtTs(summary.window_end) }}
</span>
<span v-if="!summary.time_detected" class="italic">(last 60 min no time detected)</span>
<span
</div>
<!-- Severity filter pills -->
<div class="flex flex-wrap gap-2 mt-2">
<button
v-for="(count, sev) in nonZeroSeverity"
:key="String(sev)"
:class="sevClass(String(sev))"
>{{ count }} {{ sev }}</span>
@click="severityFilter = severityFilter === String(sev) ? null : String(sev)"
:class="[
'px-2 py-0.5 rounded text-xs font-medium border transition-colors',
severityFilter === String(sev)
? sevActivePillClass(String(sev))
: 'border-surface-border text-text-dim hover:border-current ' + sevClass(String(sev))
]"
>{{ count }} {{ sev }}</button>
</div>
<div class="flex flex-wrap gap-x-4 gap-y-1 text-xs text-text-dim mt-2">
<span v-for="(count, src) in summary.by_source" :key="String(src)">
@ -69,14 +89,17 @@
</div>
<!-- Log stream -->
<div v-if="entries.length" class="rounded border border-surface-border overflow-hidden mb-4">
<LogEntryRow v-for="entry in entries" :key="entry.entry_id" :entry="entry" />
<div v-if="filteredEntries.length" class="rounded border border-surface-border overflow-hidden mb-4">
<LogEntryRow v-for="entry in filteredEntries" :key="entry.entry_id" :entry="entry" />
</div>
<!-- Zero state -->
<div v-else-if="ranOnce && !loading" class="text-center text-text-dim py-12">
<p v-if="severityFilter" class="mb-1">No {{ severityFilter }} entries in this result set.</p>
<template v-else>
<p class="mb-1">No log evidence found for "{{ lastQuery }}"</p>
<p class="text-sm">Check the Sources tab to confirm data is ingested, or try a broader description.</p>
</template>
</div>
<!-- Save CTAs -->
@ -166,6 +189,7 @@ const entries = ref<LogEntry[]>([])
const summary = ref<Summary | null>(null)
const reasoning = ref<string | null>(null)
const loading = ref(false)
const statusMsg = ref<string | null>(null)
const error = ref<string | null>(null)
const ranOnce = ref(false)
const lastQuery = ref('')
@ -174,6 +198,7 @@ const saving = ref(false)
const showDetails = ref(false)
const detailSeverity = ref('medium')
const detailNotes = ref('')
const severityFilter = ref<string | null>(null)
let capturedSince: string | null = null
let capturedUntil: string | null = null
@ -203,28 +228,61 @@ watch(() => route.query.q, (newQ) => {
async function run() {
if (!query.value.trim() && !sourceScope.value) return
loading.value = true
statusMsg.value = 'Searching…'
error.value = null
ranOnce.value = true
lastQuery.value = query.value
saved.value = false
showDetails.value = false
severityFilter.value = null
entries.value = []
summary.value = null
reasoning.value = null
try {
const res = await fetch(`${BASE}/api/diagnose`, {
const res = await fetch(`${BASE}/api/diagnose/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query: query.value || sourceScope.value, source: sourceScope.value }),
body: JSON.stringify({ query: query.value, source: sourceScope.value }),
})
if (!res.ok) throw new Error(`API returned ${res.status}`)
const data = await res.json()
entries.value = data.entries
summary.value = data.summary
reasoning.value = data.reasoning ?? null
capturedSince = data.summary.window_start
capturedUntil = data.summary.window_end
if (!res.body) throw new Error('No response body')
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buf = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buf += decoder.decode(value, { stream: true })
// SSE events are separated by '\n\n'
const parts = buf.split('\n\n')
buf = parts.pop() ?? ''
for (const part of parts) {
const line = part.trim()
if (!line.startsWith('data: ')) continue
const evt = JSON.parse(line.slice(6))
if (evt.type === 'status') {
statusMsg.value = evt.message
} else if (evt.type === 'summary') {
summary.value = evt.data
capturedSince = evt.data.window_start
capturedUntil = evt.data.window_end
} else if (evt.type === 'entries') {
entries.value = evt.data
} else if (evt.type === 'reasoning') {
reasoning.value = evt.text
} else if (evt.type === 'done') {
statusMsg.value = null
}
}
}
} catch (e) {
error.value = e instanceof Error ? e.message : String(e)
} finally {
loading.value = false
statusMsg.value = null
}
}
@ -266,6 +324,12 @@ const nonZeroSeverity = computed(() => {
)
})
const filteredEntries = computed(() =>
severityFilter.value
? entries.value.filter(e => (e.severity ?? 'INFO').toUpperCase() === severityFilter.value)
: entries.value
)
function sevClass(sev: string): string {
return ({
CRITICAL: 'text-sev-critical',
@ -275,6 +339,15 @@ function sevClass(sev: string): string {
} as Record<string, string>)[sev] ?? 'text-text-dim'
}
function sevActivePillClass(sev: string): string {
return ({
CRITICAL: 'bg-sev-critical/20 border-sev-critical text-sev-critical',
ERROR: 'bg-sev-error/20 border-sev-error text-sev-error',
WARN: 'bg-sev-warn/20 border-sev-warn text-sev-warn',
INFO: 'bg-sev-info/20 border-sev-info text-sev-info',
} as Record<string, string>)[sev] ?? 'bg-surface-raised border-surface-border text-text-dim'
}
function fmtTs(iso: string | null): string {
if (!iso) return '—'
try {