From 729b78e40f47cd35d1063d80dd9a7670a5aa0a92 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 13 May 2026 08:10:42 -0700 Subject: [PATCH] feat: source-scoped diagnose; multi-node Docker log collection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Diagnose: add source_filter param threaded through entries_in_window, search, _diagnose, and DiagnoseRequest — clicking diagnose on a dashboard source now scopes both keyword and window hits to that source - QuickCapture: read route.query.source; show scope badge with clear ✕; auto-run when source param is present without a query - DashboardView: pass source= (not q=) when navigating to diagnose - collect_cluster_logs.sh: auto-discover Docker containers on all nodes (Heimdall non-watched, Navi, Strahl via SSH); collect Cass Plex logs via SSH; write to per-node dirs for directory-mode ingest - turnstone-cluster.service: add --reload for hot-reload during dev --- app/rest.py | 2 + app/services/diagnose.py | 5 +- app/services/search.py | 4 + scripts/collect_cluster_logs.sh | 136 ++++++++++++++++++++++++---- scripts/turnstone-cluster.service | 2 +- web/src/components/QuickCapture.vue | 26 +++++- web/src/views/DashboardView.vue | 2 +- 7 files changed, 151 insertions(+), 26 deletions(-) diff --git a/app/rest.py b/app/rest.py index 843e0fd..aff1a4e 100644 --- a/app/rest.py +++ b/app/rest.py @@ -109,6 +109,7 @@ class DiagnoseRequest(BaseModel): query: str since: str | None = None until: str | None = None + source: str | None = None class SeverityOverride(BaseModel): @@ -249,6 +250,7 @@ def diagnose_post(body: DiagnoseRequest) -> dict: query=body.query, since=body.since, until=body.until, + source_filter=body.source or None, llm_url=prefs.get("llm_url") or None, llm_model=prefs.get("llm_model") or None, llm_api_key=prefs.get("llm_api_key") or None, diff --git a/app/services/diagnose.py b/app/services/diagnose.py index eb7e325..665c500 100644 --- a/app/services/diagnose.py +++ b/app/services/diagnose.py @@ -72,6 +72,7 @@ def diagnose( query: str, since: str | None = None, until: str | None = None, + source_filter: str | None = None, llm_url: str | None = None, llm_model: str | None = None, llm_api_key: str | None = None, @@ -86,8 +87,8 @@ def diagnose( else: keywords = query - keyword_hits = search(db_path, query=keywords, since=since, until=until, limit=150, or_mode=True) - window_hits = entries_in_window(db_path, since=since, until=until, limit=50) + keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True) + window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50) seen: set[str] = set() merged: list[SearchResult] = [] diff --git a/app/services/search.py b/app/services/search.py index 1665eb2..317808b 100644 --- a/app/services/search.py +++ b/app/services/search.py @@ -168,6 +168,7 @@ def entries_in_window( since: str | None, until: str | None, severity: str | None = None, + source_filter: str | None = None, limit: int = 100, ) -> list[SearchResult]: """Return log entries within a time window using a plain SQL scan (no FTS). @@ -191,6 +192,9 @@ def entries_in_window( if severity: conditions.append("severity = ?") params.append(severity.upper()) + if source_filter: + conditions.append("source_id LIKE ?") + params.append(f"%{source_filter}%") where = " AND ".join(conditions) params.append(limit) diff --git a/scripts/collect_cluster_logs.sh b/scripts/collect_cluster_logs.sh index 05b6ced..2c8b0f6 100644 --- a/scripts/collect_cluster_logs.sh +++ b/scripts/collect_cluster_logs.sh @@ -1,26 +1,48 @@ #!/usr/bin/env bash -# Collect recent journal logs from remote CircuitForge cluster nodes -# into /devl/turnstone-cluster/data/ for Turnstone to ingest. +# Collect logs from all CircuitForge cluster nodes into Turnstone. # -# Local Heimdall sources (journal, Docker containers, network syslog) are -# handled by the Turnstone live watcher (watch.yaml) — no collection needed. +# Local Heimdall sources (journal, live-watched Docker containers, network syslog) +# are handled by the Turnstone live watcher — no collection needed for those. # -# Triggered by systemd timer: turnstone-cluster-collect.timer (every 15 min). -# Install: sudo cp /turnstone-cluster-collect.* /etc/systemd/system/ -# sudo systemctl daemon-reload && sudo systemctl enable --now turnstone-cluster-collect.timer +# This script handles: +# - Remote node SSH journals (navi, sif, cass, strahl) +# - Docker container logs from all nodes (auto-discovered) +# - Plex logs from Cass (native install, no Docker) # -# Manual run: -# bash /Library/Development/CircuitForge/turnstone/scripts/collect_cluster_logs.sh +# Triggered by: turnstone-cluster-collect.timer (every 15 min) +# Manual run: bash /Library/Development/CircuitForge/turnstone/scripts/collect_cluster_logs.sh set -euo pipefail DATA_DIR=/devl/turnstone-cluster/data WINDOW="20 minutes ago" SSH_OPTS="-o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=no" +PYTHON=/devl/miniconda3/envs/cf/bin/python +INGEST="${PYTHON} /Library/Development/CircuitForge/turnstone/scripts/ingest_corpus.py" +DB=/devl/turnstone-cluster/data/turnstone.db +LOG=/devl/turnstone-cluster/data/ingest.log mkdir -p "${DATA_DIR}" -# ── Remote cluster nodes ────────────────────────────────────────────────────── +# ── Helpers ─────────────────────────────────────────────────────────────────── + +# Collect docker logs from a container into a JSONL file. +# Usage: _docker_logs [docker_cmd_prefix...] +_docker_logs() { + local cname="$1" outfile="$2" + shift 2 + "$@" docker logs --since 20m "${cname}" 2>&1 | \ + python3 -c " +import sys, json +src = '${cname}' +for line in sys.stdin: + line = line.rstrip() + if not line: continue + print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'})) +" > "${outfile}" 2>/dev/null || : > "${outfile}" +} + +# ── Remote cluster node journals ────────────────────────────────────────────── declare -A NODES=( [navi]="${DATA_DIR}/navi-journal.jsonl" [sif]="${DATA_DIR}/sif-journal.jsonl" @@ -30,24 +52,100 @@ declare -A NODES=( for node in "${!NODES[@]}"; do outfile="${NODES[$node]}" - echo "${node}: collecting journal..." if ssh ${SSH_OPTS} "${node}" true 2>/dev/null; then ssh ${SSH_OPTS} "${node}" \ "journalctl --output=json --priority=0..5 --since '${WINDOW}' --no-pager 2>/dev/null || true" \ > "${outfile}" 2>/dev/null || { echo "${node}: ssh failed, skipping"; : > "${outfile}"; } - echo "${node}: $(wc -l < "${outfile}") entries" + echo "${node}: $(wc -l < "${outfile}") journal entries" else echo "${node}: unreachable, skipping" : > "${outfile}" fi done -# Ingest remote node journals directly via the cf Python environment. -TURNSTONE_DB=/devl/turnstone-cluster/data/turnstone.db \ - /devl/miniconda3/envs/cf/bin/python \ - /Library/Development/CircuitForge/turnstone/scripts/ingest_corpus.py \ - --sources /devl/turnstone-cluster/patterns/sources-cluster.yaml \ - --db /devl/turnstone-cluster/data/turnstone.db \ - >> /devl/turnstone-cluster/data/ingest.log 2>&1 +# ── Heimdall Docker containers (non-live-watched) ───────────────────────────── +# The live watcher already tails: cf-orch-coordinator, cf-web, cf-directus, caddy-proxy +LIVE_WATCHED="cf-orch-coordinator cf-web cf-directus caddy-proxy" +HEIMDALL_DIR="${DATA_DIR}/docker-heimdall" +mkdir -p "${HEIMDALL_DIR}" + +while IFS= read -r cname; do + [[ " ${LIVE_WATCHED} " == *" ${cname} "* ]] && continue + _docker_logs "${cname}" "${HEIMDALL_DIR}/${cname}.jsonl" +done < <(docker ps --format '{{.Names}}') + +echo "heimdall docker: $(ls "${HEIMDALL_DIR}"/*.jsonl 2>/dev/null | wc -l) containers" + +# ── Navi Docker containers ──────────────────────────────────────────────────── +NAVI_DIR="${DATA_DIR}/docker-navi" +mkdir -p "${NAVI_DIR}" + +if ssh ${SSH_OPTS} navi true 2>/dev/null; then + while IFS= read -r cname; do + [[ -z "${cname}" ]] && continue + ssh ${SSH_OPTS} navi "docker logs --since 20m '${cname}' 2>&1" | \ + python3 -c " +import sys, json +src = 'navi/${cname}' +for line in sys.stdin: + line = line.rstrip() + if not line: continue + print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'})) +" > "${NAVI_DIR}/${cname}.jsonl" 2>/dev/null || : > "${NAVI_DIR}/${cname}.jsonl" + done < <(ssh ${SSH_OPTS} navi "docker ps --format '{{.Names}}'" 2>/dev/null) + echo "navi docker: $(ls "${NAVI_DIR}"/*.jsonl 2>/dev/null | wc -l) containers" +else + echo "navi: unreachable, skipping docker logs" +fi + +# ── Strahl Docker containers ────────────────────────────────────────────────── +STRAHL_DIR="${DATA_DIR}/docker-strahl" +mkdir -p "${STRAHL_DIR}" + +if ssh ${SSH_OPTS} strahl true 2>/dev/null; then + while IFS= read -r cname; do + [[ -z "${cname}" ]] && continue + ssh ${SSH_OPTS} strahl "docker logs --since 20m '${cname}' 2>&1" | \ + python3 -c " +import sys, json +src = 'strahl/${cname}' +for line in sys.stdin: + line = line.rstrip() + if not line: continue + print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'})) +" > "${STRAHL_DIR}/${cname}.jsonl" 2>/dev/null || : > "${STRAHL_DIR}/${cname}.jsonl" + done < <(ssh ${SSH_OPTS} strahl "docker ps --format '{{.Names}}'" 2>/dev/null) + echo "strahl docker: $(ls "${STRAHL_DIR}"/*.jsonl 2>/dev/null | wc -l) containers" +else + echo "strahl: unreachable, skipping docker logs" +fi + +# ── Cass — Plex logs (native install, no Docker) ───────────────────────────── +PLEX_DIR="${DATA_DIR}/plex-cass" +PLEX_LOG_DIR="/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs" +mkdir -p "${PLEX_DIR}" + +if ssh ${SSH_OPTS} cass true 2>/dev/null; then + while IFS= read -r remote_path; do + [[ -z "${remote_path}" ]] && continue + local_name="$(basename "${remote_path}" | tr ' ' '_' | tr '[:upper:]' '[:lower:]')" + ssh ${SSH_OPTS} cass "cat '${remote_path}'" > "${PLEX_DIR}/${local_name}" 2>/dev/null || true + done < <(ssh ${SSH_OPTS} cass "ls '${PLEX_LOG_DIR}'/*.log 2>/dev/null" 2>/dev/null) + echo "cass plex: $(ls "${PLEX_DIR}"/*.log 2>/dev/null | wc -l) log files" +else + echo "cass: unreachable, skipping plex logs" +fi + +# ── Ingest everything ───────────────────────────────────────────────────────── +{ + # Remote journals (explicit source IDs via YAML) + ${INGEST} --sources /devl/turnstone-cluster/patterns/sources-cluster.yaml --db "${DB}" + + # Docker and Plex logs (source IDs derived from filenames by directory ingest) + for dir in "${HEIMDALL_DIR}" "${NAVI_DIR}" "${STRAHL_DIR}" "${PLEX_DIR}"; do + [[ -d "${dir}" ]] && ls "${dir}"/*.jsonl "${dir}"/*.log 2>/dev/null | grep -q . && \ + ${INGEST} "${dir}" "${DB}" || true + done +} >> "${LOG}" 2>&1 echo "collect_cluster_logs: done" diff --git a/scripts/turnstone-cluster.service b/scripts/turnstone-cluster.service index a2891ab..06f4de7 100644 --- a/scripts/turnstone-cluster.service +++ b/scripts/turnstone-cluster.service @@ -10,7 +10,7 @@ Environment=TURNSTONE_DB=/devl/turnstone-cluster/data/turnstone.db Environment=TURNSTONE_PATTERNS=/devl/turnstone-cluster/patterns Environment=TURNSTONE_SOURCE_HOST=heimdall-cluster ExecStart=/devl/miniconda3/envs/cf/bin/python -m uvicorn app.rest:app \ - --host 0.0.0.0 --port 8534 + --host 0.0.0.0 --port 8534 --reload Restart=on-failure RestartSec=5s StandardOutput=journal diff --git a/web/src/components/QuickCapture.vue b/web/src/components/QuickCapture.vue index 5061d17..2877590 100644 --- a/web/src/components/QuickCapture.vue +++ b/web/src/components/QuickCapture.vue @@ -10,7 +10,7 @@ @keydown.enter="run()" /> + +
+ Scoped to: + {{ sourceScope }} + +
+
{{ error }} @@ -150,6 +161,7 @@ interface Summary { } const query = ref('') +const sourceScope = ref(null) const entries = ref([]) const summary = ref(null) const reasoning = ref(null) @@ -166,13 +178,21 @@ let capturedSince: string | null = null let capturedUntil: string | null = null onMounted(() => { + const s = route.query.source + if (typeof s === 'string' && s.trim()) sourceScope.value = s const q = route.query.q if (typeof q === 'string' && q.trim()) { query.value = q run() + } else if (sourceScope.value) { + run() } }) +watch(() => route.query.source, (newS) => { + sourceScope.value = typeof newS === 'string' && newS.trim() ? newS : null +}) + watch(() => route.query.q, (newQ) => { if (typeof newQ === 'string' && newQ.trim() && newQ !== lastQuery.value) { query.value = newQ @@ -181,7 +201,7 @@ watch(() => route.query.q, (newQ) => { }) async function run() { - if (!query.value.trim()) return + if (!query.value.trim() && !sourceScope.value) return loading.value = true error.value = null ranOnce.value = true @@ -192,7 +212,7 @@ async function run() { const res = await fetch(`${BASE}/api/diagnose`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ query: query.value }), + body: JSON.stringify({ query: query.value || sourceScope.value, source: sourceScope.value }), }) if (!res.ok) throw new Error(`API returned ${res.status}`) const data = await res.json() diff --git a/web/src/views/DashboardView.vue b/web/src/views/DashboardView.vue index c211b7f..672d500 100644 --- a/web/src/views/DashboardView.vue +++ b/web/src/views/DashboardView.vue @@ -246,7 +246,7 @@ function healthDot(errors: number, total: number): string { } function diagnoseSource(sourceId: string) { - router.push({ path: '/diagnose', query: { q: sourceId } }) + router.push({ path: '/diagnose', query: { source: sourceId } }) } function shortTs(iso: string | null): string {