diff --git a/app/rest.py b/app/rest.py index aff1a4e..f642e81 100644 --- a/app/rest.py +++ b/app/rest.py @@ -17,7 +17,7 @@ from typing import Annotated from fastapi import APIRouter, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import FileResponse, RedirectResponse +from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel @@ -40,7 +40,7 @@ from app.services.search import ( stats_summary as _stats, format_results, ) -from app.services.diagnose import diagnose as _diagnose +from app.services.diagnose import diagnose as _diagnose, diagnose_stream as _diagnose_stream from app.watch.watcher import Watcher, load_watch_config DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) @@ -262,6 +262,30 @@ def diagnose_post(body: DiagnoseRequest) -> dict: } +@router.post("/api/diagnose/stream") +async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse: + prefs = _load_prefs() + + async def sse_gen(): + async for event in _diagnose_stream( + DB_PATH, + query=body.query, + since=body.since, + until=body.until, + source_filter=body.source or None, + llm_url=prefs.get("llm_url") or None, + llm_model=prefs.get("llm_model") or None, + llm_api_key=prefs.get("llm_api_key") or None, + ): + yield f"data: {json.dumps(event)}\n\n" + + return StreamingResponse( + sse_gen(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + + @router.get("/api/settings") def get_settings() -> dict: return _load_prefs() diff --git a/app/services/diagnose.py b/app/services/diagnose.py index 665c500..b8a0362 100644 --- a/app/services/diagnose.py +++ b/app/services/diagnose.py @@ -1,8 +1,11 @@ """Frictionless diagnose service — NL time extraction + layered log search.""" from __future__ import annotations +import asyncio +import dataclasses import logging import re +from collections.abc import AsyncGenerator from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any @@ -50,8 +53,18 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]: return since, until, keywords or query if _HAS_DATEPARSER and _search_dates is not None: + # Tell dateparser what timezone the user is in so "3:35 am" means local time. + # PREFER_DAY_OF_MONTH is unused here but PREFER_DATES_FROM=past ensures + # "3:35 am" resolves to the most recent past occurrence, not a future one. + local_offset = datetime.now().astimezone().utcoffset() + offset_h = int((local_offset.total_seconds() if local_offset else 0) / 3600) + tz_str = f"UTC{'+' if offset_h >= 0 else ''}{offset_h}" try: - results = _search_dates(query, languages=["en"], settings={"PREFER_DATES_FROM": "past"}) + results = _search_dates( + query, + languages=["en"], + settings={"PREFER_DATES_FROM": "past", "TIMEZONE": tz_str, "RETURN_AS_TIMEZONE_AWARE": True}, + ) except Exception: logger.warning("dateparser failed on query %r — falling back to 60-min window", query) results = None @@ -59,6 +72,8 @@ def parse_time_window(query: str) -> tuple[str | None, str | None, str]: phrase, dt = results[0] if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) # normalise to UTC for SQLite string compare since = (dt - timedelta(minutes=30)).isoformat() until = (dt + timedelta(minutes=30)).isoformat() keywords = re.sub(r"\s{2,}", " ", query.replace(phrase, " ").strip()) @@ -88,7 +103,7 @@ def diagnose( keywords = query keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) - window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50) + window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50, per_source_cap=15) seen: set[str] = set() merged: list[SearchResult] = [] @@ -125,6 +140,111 @@ def diagnose( } +async def diagnose_stream( + db_path: Path, + query: str, + since: str | None = None, + until: str | None = None, + source_filter: str | None = None, + llm_url: str | None = None, + llm_model: str | None = None, + llm_api_key: str | None = None, +) -> AsyncGenerator[dict[str, Any], None]: + """Async generator yielding SSE event dicts for the diagnose pipeline. + + Yields events in order: + {"type":"status","message":"…"} — pipeline progress + {"type":"summary","data":{…}} — window + severity counts (fast, from DB) + {"type":"entries","data":[…]} — log entries (fast, from DB) + {"type":"reasoning","text":"…"} — LLM analysis (slow, optional) + {"type":"done"} + """ + keywords = query.strip() + source_browse = not keywords and source_filter is not None + + if source_browse: + # No keyword — browsing a source directly. Use 24h window; skip FTS entirely. + yield {"type": "status", "message": f"Loading {source_filter}…"} + since = since or _last_n_minutes(60 * 24) + until = until or _now_iso() + time_detected = False + else: + yield {"type": "status", "message": "Parsing time window…"} + time_detected = since is not None and until is not None + if not time_detected: + parsed_since, parsed_until, keywords = await asyncio.to_thread(parse_time_window, query) + since = since or parsed_since + until = until or parsed_until + time_detected = keywords != query + + yield {"type": "status", "message": "Searching logs…"} + + if source_browse: + keyword_hits: list[SearchResult] = [] + window_hits = await asyncio.to_thread( + lambda: entries_in_window( + db_path, since, until, + source_filter=source_filter, limit=200, + ) + ) + else: + keyword_hits, window_hits = await asyncio.gather( + asyncio.to_thread( + lambda: search( + db_path, keywords, + source_filter=source_filter, since=since, until=until, + limit=150, or_mode=True, + ) + ), + asyncio.to_thread( + lambda: entries_in_window( + db_path, since, until, + source_filter=source_filter, limit=50, per_source_cap=15, + ) + ), + ) + + seen: set[str] = set() + merged: list[SearchResult] = [] + for r in keyword_hits + window_hits: + if r.entry_id not in seen: + seen.add(r.entry_id) + merged.append(r) + + combined = sorted(merged, key=lambda r: (r.timestamp_iso or "\xff", r.sequence))[:200] + + by_severity: dict[str, int] = {"CRITICAL": 0, "ERROR": 0, "WARN": 0, "INFO": 0} + by_source: dict[str, int] = {} + for r in combined: + sev = (r.severity or "INFO").upper() + if sev in by_severity: + by_severity[sev] += 1 + by_source[r.source_id] = by_source.get(r.source_id, 0) + 1 + + yield { + "type": "summary", + "data": { + "total": len(combined), + "window_start": since, + "window_end": until, + "time_detected": time_detected, + "by_severity": by_severity, + "by_source": by_source, + }, + } + yield {"type": "entries", "data": [dataclasses.asdict(r) for r in combined]} + + if llm_url and llm_model and combined: + yield {"type": "status", "message": "Analyzing with LLM…"} + reasoning = await asyncio.to_thread( + lambda: summarize(query, combined, llm_url, llm_model, llm_api_key) + ) + if reasoning: + yield {"type": "reasoning", "text": reasoning} + + yield {"type": "done"} + + def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() diff --git a/app/services/llm.py b/app/services/llm.py index 159d4ef..e97fdce 100644 --- a/app/services/llm.py +++ b/app/services/llm.py @@ -75,8 +75,6 @@ def summarize( resp.raise_for_status() # 404 means no assignment configured — fall through to direct model call logger.debug("No task assignment for turnstone.log_analysis — falling back to direct model") - except httpx.HTTPStatusError: - raise except Exception as exc: logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) diff --git a/app/services/search.py b/app/services/search.py index 317808b..d7bf12c 100644 --- a/app/services/search.py +++ b/app/services/search.py @@ -170,11 +170,16 @@ def entries_in_window( severity: str | None = None, source_filter: str | None = None, limit: int = 100, + per_source_cap: int | None = None, ) -> list[SearchResult]: """Return log entries within a time window using a plain SQL scan (no FTS). Used as a fallback when keyword search returns nothing — ensures incident detail always shows the raw log activity in the window even if no keywords match. + + per_source_cap: when set, limits rows per source_id so high-volume sources + (e.g. network-syslog) don't crowd out lower-volume but more interesting ones. + Errors/warnings are ranked first within each source partition. """ conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") @@ -197,19 +202,47 @@ def entries_in_window( params.append(f"%{source_filter}%") where = " AND ".join(conditions) - params.append(limit) - rows = conn.execute( - f""" - SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, - repeat_count, out_of_order, matched_patterns, text, 0.0 as rank - FROM log_entries - WHERE {where} - ORDER BY timestamp_iso ASC - LIMIT ? - """, - params, - ).fetchall() + if per_source_cap is not None: + # Use a window function to cap rows per source, errors/warnings first. + query = f""" + WITH ranked AS ( + SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, + repeat_count, out_of_order, matched_patterns, text, 0.0 as rank, + ROW_NUMBER() OVER ( + PARTITION BY source_id + ORDER BY + CASE UPPER(severity) + WHEN 'CRITICAL' THEN 0 + WHEN 'ERROR' THEN 1 + WHEN 'WARN' THEN 2 + ELSE 3 + END, + timestamp_iso + ) AS rn + FROM log_entries + WHERE {where} + ) + SELECT entry_id, source_id, sequence, timestamp_iso, severity, + repeat_count, out_of_order, matched_patterns, text, rank + FROM ranked + WHERE rn <= ? + ORDER BY timestamp_iso ASC + LIMIT ? + """ + params.extend([per_source_cap, limit]) + else: + query = f""" + SELECT id as entry_id, source_id, sequence, timestamp_iso, severity, + repeat_count, out_of_order, matched_patterns, text, 0.0 as rank + FROM log_entries + WHERE {where} + ORDER BY timestamp_iso ASC + LIMIT ? + """ + params.append(limit) + + rows = conn.execute(query, params).fetchall() conn.close() return [ diff --git a/manage.sh b/manage.sh index a491754..7e72b01 100755 --- a/manage.sh +++ b/manage.sh @@ -23,7 +23,17 @@ VITE_PORT=5174 # Vite HMR port in dev mode (proxies /api → 8534) LOG_DIR="log" API_PID_FILE=".turnstone-api.pid" -DB="${TURNSTONE_DB:-${SCRIPT_DIR}/data/turnstone.db}" +# Default to the live cluster DB when present; fall back to dev DB. +_CLUSTER_DB="/devl/turnstone-cluster/data/turnstone.db" +_DEV_DB="${SCRIPT_DIR}/data/turnstone.db" +if [[ -z "${TURNSTONE_DB:-}" ]]; then + DB="$([[ -d /devl/turnstone-cluster ]] && echo "${_CLUSTER_DB}" || echo "${_DEV_DB}")" +else + DB="${TURNSTONE_DB}" +fi + +# Use cluster patterns (watch.yaml, default.yaml) when available. +PATTERN_DIR="${TURNSTONE_PATTERNS:-$([[ -d /devl/turnstone-cluster/patterns ]] && echo "/devl/turnstone-cluster/patterns" || echo "${SCRIPT_DIR}/patterns")}" CONDA_BASE="${CONDA_BASE:-/devl/miniconda3}" PYTHON="${CONDA_BASE}/envs/cf/bin/python" @@ -35,6 +45,31 @@ _is_alive() { [[ -f "$pid_file" ]] && kill -0 "$(<"$pid_file")" 2>/dev/null } +# Kill any process currently holding a TCP port. +_kill_port() { + local port="$1" + local pids + pids=$(ss -tlnp "sport = :${port}" 2>/dev/null | grep -oP '(?<=pid=)\d+' | sort -u) + [[ -z "$pids" ]] && return 0 + for pid in $pids; do + warn "Killing stray PID ${pid} on port ${port}" + kill "$pid" 2>/dev/null || true + done +} + +# Wait for a port to stop accepting connections (i.e. fully released). +_wait_for_port_free() { + local port="$1" + for _i in $(seq 1 30); do + sleep 0.3 + (echo "" >/dev/tcp/127.0.0.1/"$port") 2>/dev/null || return 0 + done + warn "Port ${port} still occupied after 9 s — trying SIGKILL" + _kill_port "$port" + sleep 1 + (echo "" >/dev/tcp/127.0.0.1/"$port") 2>/dev/null && warn "Port ${port} still in use!" || true +} + _kill_pid_file() { local pid_file="$1" label="$2" if [[ -f "$pid_file" ]]; then @@ -48,7 +83,7 @@ _kill_pid_file() { rm -f "$pid_file" fi else - warn "$label not running." + warn "No PID file for $label." fi } @@ -123,7 +158,9 @@ case "$CMD" in success "SPA built → web/dist/" info "Starting on port ${API_PORT}…" - TURNSTONE_DB="$DB" nohup "$PYTHON" -m uvicorn app.rest:app \ + info " DB: ${DB}" + info " Patterns: ${PATTERN_DIR}" + TURNSTONE_DB="$DB" TURNSTONE_PATTERNS="$PATTERN_DIR" nohup "$PYTHON" -m uvicorn app.rest:app \ --host 0.0.0.0 --port "$API_PORT" \ >> "${LOG_DIR}/api.log" 2>&1 & echo $! > "$API_PID_FILE" @@ -133,6 +170,8 @@ case "$CMD" in stop) _kill_pid_file "$API_PID_FILE" "Turnstone" + _kill_port "$API_PORT" + _wait_for_port_free "$API_PORT" ;; restart) diff --git a/web/src/components/QuickCapture.vue b/web/src/components/QuickCapture.vue index 2877590..3b65d7f 100644 --- a/web/src/components/QuickCapture.vue +++ b/web/src/components/QuickCapture.vue @@ -14,15 +14,24 @@ @click="run()" class="px-6 py-2.5 rounded bg-accent text-white text-sm font-semibold hover:bg-blue-400 transition-colors disabled:opacity-50" > - Searching… + Go Go + +
+ + {{ statusMsg }} +
+
Scoped to: - {{ sourceScope }} + {{ sourceScope }}
@@ -69,14 +89,17 @@
-
- +
+
-

No log evidence found for "{{ lastQuery }}"

-

Check the Sources tab to confirm data is ingested, or try a broader description.

+

No {{ severityFilter }} entries in this result set.

+
@@ -166,6 +189,7 @@ const entries = ref([]) const summary = ref(null) const reasoning = ref(null) const loading = ref(false) +const statusMsg = ref(null) const error = ref(null) const ranOnce = ref(false) const lastQuery = ref('') @@ -174,6 +198,7 @@ const saving = ref(false) const showDetails = ref(false) const detailSeverity = ref('medium') const detailNotes = ref('') +const severityFilter = ref(null) let capturedSince: string | null = null let capturedUntil: string | null = null @@ -202,29 +227,62 @@ watch(() => route.query.q, (newQ) => { async function run() { if (!query.value.trim() && !sourceScope.value) return - loading.value = true - error.value = null - ranOnce.value = true - lastQuery.value = query.value - saved.value = false - showDetails.value = false + loading.value = true + statusMsg.value = 'Searching…' + error.value = null + ranOnce.value = true + lastQuery.value = query.value + saved.value = false + showDetails.value = false + severityFilter.value = null + entries.value = [] + summary.value = null + reasoning.value = null + try { - const res = await fetch(`${BASE}/api/diagnose`, { + const res = await fetch(`${BASE}/api/diagnose/stream`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ query: query.value || sourceScope.value, source: sourceScope.value }), + body: JSON.stringify({ query: query.value, source: sourceScope.value }), }) if (!res.ok) throw new Error(`API returned ${res.status}`) - const data = await res.json() - entries.value = data.entries - summary.value = data.summary - reasoning.value = data.reasoning ?? null - capturedSince = data.summary.window_start - capturedUntil = data.summary.window_end + if (!res.body) throw new Error('No response body') + + const reader = res.body.getReader() + const decoder = new TextDecoder() + let buf = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + buf += decoder.decode(value, { stream: true }) + // SSE events are separated by '\n\n' + const parts = buf.split('\n\n') + buf = parts.pop() ?? '' + for (const part of parts) { + const line = part.trim() + if (!line.startsWith('data: ')) continue + const evt = JSON.parse(line.slice(6)) + if (evt.type === 'status') { + statusMsg.value = evt.message + } else if (evt.type === 'summary') { + summary.value = evt.data + capturedSince = evt.data.window_start + capturedUntil = evt.data.window_end + } else if (evt.type === 'entries') { + entries.value = evt.data + } else if (evt.type === 'reasoning') { + reasoning.value = evt.text + } else if (evt.type === 'done') { + statusMsg.value = null + } + } + } } catch (e) { error.value = e instanceof Error ? e.message : String(e) } finally { - loading.value = false + loading.value = false + statusMsg.value = null } } @@ -266,6 +324,12 @@ const nonZeroSeverity = computed(() => { ) }) +const filteredEntries = computed(() => + severityFilter.value + ? entries.value.filter(e => (e.severity ?? 'INFO').toUpperCase() === severityFilter.value) + : entries.value +) + function sevClass(sev: string): string { return ({ CRITICAL: 'text-sev-critical', @@ -275,6 +339,15 @@ function sevClass(sev: string): string { } as Record)[sev] ?? 'text-text-dim' } +function sevActivePillClass(sev: string): string { + return ({ + CRITICAL: 'bg-sev-critical/20 border-sev-critical text-sev-critical', + ERROR: 'bg-sev-error/20 border-sev-error text-sev-error', + WARN: 'bg-sev-warn/20 border-sev-warn text-sev-warn', + INFO: 'bg-sev-info/20 border-sev-info text-sev-info', + } as Record)[sev] ?? 'bg-surface-raised border-surface-border text-text-dim' +} + function fmtTs(iso: string | null): string { if (!iso) return '—' try {