feat: source-scoped diagnose; multi-node Docker log collection
- Diagnose: add source_filter param threaded through entries_in_window, search, _diagnose, and DiagnoseRequest — clicking diagnose on a dashboard source now scopes both keyword and window hits to that source - QuickCapture: read route.query.source; show scope badge with clear ✕; auto-run when source param is present without a query - DashboardView: pass source= (not q=) when navigating to diagnose - collect_cluster_logs.sh: auto-discover Docker containers on all nodes (Heimdall non-watched, Navi, Strahl via SSH); collect Cass Plex logs via SSH; write to per-node dirs for directory-mode ingest - turnstone-cluster.service: add --reload for hot-reload during dev
This commit is contained in:
parent
53fa350adf
commit
caa85b3d30
7 changed files with 151 additions and 26 deletions
|
|
@ -109,6 +109,7 @@ class DiagnoseRequest(BaseModel):
|
||||||
query: str
|
query: str
|
||||||
since: str | None = None
|
since: str | None = None
|
||||||
until: str | None = None
|
until: str | None = None
|
||||||
|
source: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class SeverityOverride(BaseModel):
|
class SeverityOverride(BaseModel):
|
||||||
|
|
@ -249,6 +250,7 @@ def diagnose_post(body: DiagnoseRequest) -> dict:
|
||||||
query=body.query,
|
query=body.query,
|
||||||
since=body.since,
|
since=body.since,
|
||||||
until=body.until,
|
until=body.until,
|
||||||
|
source_filter=body.source or None,
|
||||||
llm_url=prefs.get("llm_url") or None,
|
llm_url=prefs.get("llm_url") or None,
|
||||||
llm_model=prefs.get("llm_model") or None,
|
llm_model=prefs.get("llm_model") or None,
|
||||||
llm_api_key=prefs.get("llm_api_key") or None,
|
llm_api_key=prefs.get("llm_api_key") or None,
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,7 @@ def diagnose(
|
||||||
query: str,
|
query: str,
|
||||||
since: str | None = None,
|
since: str | None = None,
|
||||||
until: str | None = None,
|
until: str | None = None,
|
||||||
|
source_filter: str | None = None,
|
||||||
llm_url: str | None = None,
|
llm_url: str | None = None,
|
||||||
llm_model: str | None = None,
|
llm_model: str | None = None,
|
||||||
llm_api_key: str | None = None,
|
llm_api_key: str | None = None,
|
||||||
|
|
@ -86,8 +87,8 @@ def diagnose(
|
||||||
else:
|
else:
|
||||||
keywords = query
|
keywords = query
|
||||||
|
|
||||||
keyword_hits = search(db_path, query=keywords, since=since, until=until, limit=150, or_mode=True)
|
keyword_hits = search(db_path, query=keywords, since=since, until=until, source_filter=source_filter, limit=150, or_mode=True)
|
||||||
window_hits = entries_in_window(db_path, since=since, until=until, limit=50)
|
window_hits = entries_in_window(db_path, since=since, until=until, source_filter=source_filter, limit=50)
|
||||||
|
|
||||||
seen: set[str] = set()
|
seen: set[str] = set()
|
||||||
merged: list[SearchResult] = []
|
merged: list[SearchResult] = []
|
||||||
|
|
|
||||||
|
|
@ -168,6 +168,7 @@ def entries_in_window(
|
||||||
since: str | None,
|
since: str | None,
|
||||||
until: str | None,
|
until: str | None,
|
||||||
severity: str | None = None,
|
severity: str | None = None,
|
||||||
|
source_filter: str | None = None,
|
||||||
limit: int = 100,
|
limit: int = 100,
|
||||||
) -> list[SearchResult]:
|
) -> list[SearchResult]:
|
||||||
"""Return log entries within a time window using a plain SQL scan (no FTS).
|
"""Return log entries within a time window using a plain SQL scan (no FTS).
|
||||||
|
|
@ -191,6 +192,9 @@ def entries_in_window(
|
||||||
if severity:
|
if severity:
|
||||||
conditions.append("severity = ?")
|
conditions.append("severity = ?")
|
||||||
params.append(severity.upper())
|
params.append(severity.upper())
|
||||||
|
if source_filter:
|
||||||
|
conditions.append("source_id LIKE ?")
|
||||||
|
params.append(f"%{source_filter}%")
|
||||||
|
|
||||||
where = " AND ".join(conditions)
|
where = " AND ".join(conditions)
|
||||||
params.append(limit)
|
params.append(limit)
|
||||||
|
|
|
||||||
|
|
@ -1,26 +1,48 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
# Collect recent journal logs from remote CircuitForge cluster nodes
|
# Collect logs from all CircuitForge cluster nodes into Turnstone.
|
||||||
# into /devl/turnstone-cluster/data/ for Turnstone to ingest.
|
|
||||||
#
|
#
|
||||||
# Local Heimdall sources (journal, Docker containers, network syslog) are
|
# Local Heimdall sources (journal, live-watched Docker containers, network syslog)
|
||||||
# handled by the Turnstone live watcher (watch.yaml) — no collection needed.
|
# are handled by the Turnstone live watcher — no collection needed for those.
|
||||||
#
|
#
|
||||||
# Triggered by systemd timer: turnstone-cluster-collect.timer (every 15 min).
|
# This script handles:
|
||||||
# Install: sudo cp <scripts>/turnstone-cluster-collect.* /etc/systemd/system/
|
# - Remote node SSH journals (navi, sif, cass, strahl)
|
||||||
# sudo systemctl daemon-reload && sudo systemctl enable --now turnstone-cluster-collect.timer
|
# - Docker container logs from all nodes (auto-discovered)
|
||||||
|
# - Plex logs from Cass (native install, no Docker)
|
||||||
#
|
#
|
||||||
# Manual run:
|
# Triggered by: turnstone-cluster-collect.timer (every 15 min)
|
||||||
# bash /Library/Development/CircuitForge/turnstone/scripts/collect_cluster_logs.sh
|
# Manual run: bash /Library/Development/CircuitForge/turnstone/scripts/collect_cluster_logs.sh
|
||||||
|
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
DATA_DIR=/devl/turnstone-cluster/data
|
DATA_DIR=/devl/turnstone-cluster/data
|
||||||
WINDOW="20 minutes ago"
|
WINDOW="20 minutes ago"
|
||||||
SSH_OPTS="-o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=no"
|
SSH_OPTS="-o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=no"
|
||||||
|
PYTHON=/devl/miniconda3/envs/cf/bin/python
|
||||||
|
INGEST="${PYTHON} /Library/Development/CircuitForge/turnstone/scripts/ingest_corpus.py"
|
||||||
|
DB=/devl/turnstone-cluster/data/turnstone.db
|
||||||
|
LOG=/devl/turnstone-cluster/data/ingest.log
|
||||||
|
|
||||||
mkdir -p "${DATA_DIR}"
|
mkdir -p "${DATA_DIR}"
|
||||||
|
|
||||||
# ── Remote cluster nodes ──────────────────────────────────────────────────────
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Collect docker logs from a container into a JSONL file.
|
||||||
|
# Usage: _docker_logs <container> <outfile> [docker_cmd_prefix...]
|
||||||
|
_docker_logs() {
|
||||||
|
local cname="$1" outfile="$2"
|
||||||
|
shift 2
|
||||||
|
"$@" docker logs --since 20m "${cname}" 2>&1 | \
|
||||||
|
python3 -c "
|
||||||
|
import sys, json
|
||||||
|
src = '${cname}'
|
||||||
|
for line in sys.stdin:
|
||||||
|
line = line.rstrip()
|
||||||
|
if not line: continue
|
||||||
|
print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'}))
|
||||||
|
" > "${outfile}" 2>/dev/null || : > "${outfile}"
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── Remote cluster node journals ──────────────────────────────────────────────
|
||||||
declare -A NODES=(
|
declare -A NODES=(
|
||||||
[navi]="${DATA_DIR}/navi-journal.jsonl"
|
[navi]="${DATA_DIR}/navi-journal.jsonl"
|
||||||
[sif]="${DATA_DIR}/sif-journal.jsonl"
|
[sif]="${DATA_DIR}/sif-journal.jsonl"
|
||||||
|
|
@ -30,24 +52,100 @@ declare -A NODES=(
|
||||||
|
|
||||||
for node in "${!NODES[@]}"; do
|
for node in "${!NODES[@]}"; do
|
||||||
outfile="${NODES[$node]}"
|
outfile="${NODES[$node]}"
|
||||||
echo "${node}: collecting journal..."
|
|
||||||
if ssh ${SSH_OPTS} "${node}" true 2>/dev/null; then
|
if ssh ${SSH_OPTS} "${node}" true 2>/dev/null; then
|
||||||
ssh ${SSH_OPTS} "${node}" \
|
ssh ${SSH_OPTS} "${node}" \
|
||||||
"journalctl --output=json --priority=0..5 --since '${WINDOW}' --no-pager 2>/dev/null || true" \
|
"journalctl --output=json --priority=0..5 --since '${WINDOW}' --no-pager 2>/dev/null || true" \
|
||||||
> "${outfile}" 2>/dev/null || { echo "${node}: ssh failed, skipping"; : > "${outfile}"; }
|
> "${outfile}" 2>/dev/null || { echo "${node}: ssh failed, skipping"; : > "${outfile}"; }
|
||||||
echo "${node}: $(wc -l < "${outfile}") entries"
|
echo "${node}: $(wc -l < "${outfile}") journal entries"
|
||||||
else
|
else
|
||||||
echo "${node}: unreachable, skipping"
|
echo "${node}: unreachable, skipping"
|
||||||
: > "${outfile}"
|
: > "${outfile}"
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
|
|
||||||
# Ingest remote node journals directly via the cf Python environment.
|
# ── Heimdall Docker containers (non-live-watched) ─────────────────────────────
|
||||||
TURNSTONE_DB=/devl/turnstone-cluster/data/turnstone.db \
|
# The live watcher already tails: cf-orch-coordinator, cf-web, cf-directus, caddy-proxy
|
||||||
/devl/miniconda3/envs/cf/bin/python \
|
LIVE_WATCHED="cf-orch-coordinator cf-web cf-directus caddy-proxy"
|
||||||
/Library/Development/CircuitForge/turnstone/scripts/ingest_corpus.py \
|
HEIMDALL_DIR="${DATA_DIR}/docker-heimdall"
|
||||||
--sources /devl/turnstone-cluster/patterns/sources-cluster.yaml \
|
mkdir -p "${HEIMDALL_DIR}"
|
||||||
--db /devl/turnstone-cluster/data/turnstone.db \
|
|
||||||
>> /devl/turnstone-cluster/data/ingest.log 2>&1
|
while IFS= read -r cname; do
|
||||||
|
[[ " ${LIVE_WATCHED} " == *" ${cname} "* ]] && continue
|
||||||
|
_docker_logs "${cname}" "${HEIMDALL_DIR}/${cname}.jsonl"
|
||||||
|
done < <(docker ps --format '{{.Names}}')
|
||||||
|
|
||||||
|
echo "heimdall docker: $(ls "${HEIMDALL_DIR}"/*.jsonl 2>/dev/null | wc -l) containers"
|
||||||
|
|
||||||
|
# ── Navi Docker containers ────────────────────────────────────────────────────
|
||||||
|
NAVI_DIR="${DATA_DIR}/docker-navi"
|
||||||
|
mkdir -p "${NAVI_DIR}"
|
||||||
|
|
||||||
|
if ssh ${SSH_OPTS} navi true 2>/dev/null; then
|
||||||
|
while IFS= read -r cname; do
|
||||||
|
[[ -z "${cname}" ]] && continue
|
||||||
|
ssh ${SSH_OPTS} navi "docker logs --since 20m '${cname}' 2>&1" | \
|
||||||
|
python3 -c "
|
||||||
|
import sys, json
|
||||||
|
src = 'navi/${cname}'
|
||||||
|
for line in sys.stdin:
|
||||||
|
line = line.rstrip()
|
||||||
|
if not line: continue
|
||||||
|
print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'}))
|
||||||
|
" > "${NAVI_DIR}/${cname}.jsonl" 2>/dev/null || : > "${NAVI_DIR}/${cname}.jsonl"
|
||||||
|
done < <(ssh ${SSH_OPTS} navi "docker ps --format '{{.Names}}'" 2>/dev/null)
|
||||||
|
echo "navi docker: $(ls "${NAVI_DIR}"/*.jsonl 2>/dev/null | wc -l) containers"
|
||||||
|
else
|
||||||
|
echo "navi: unreachable, skipping docker logs"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# ── Strahl Docker containers ──────────────────────────────────────────────────
|
||||||
|
STRAHL_DIR="${DATA_DIR}/docker-strahl"
|
||||||
|
mkdir -p "${STRAHL_DIR}"
|
||||||
|
|
||||||
|
if ssh ${SSH_OPTS} strahl true 2>/dev/null; then
|
||||||
|
while IFS= read -r cname; do
|
||||||
|
[[ -z "${cname}" ]] && continue
|
||||||
|
ssh ${SSH_OPTS} strahl "docker logs --since 20m '${cname}' 2>&1" | \
|
||||||
|
python3 -c "
|
||||||
|
import sys, json
|
||||||
|
src = 'strahl/${cname}'
|
||||||
|
for line in sys.stdin:
|
||||||
|
line = line.rstrip()
|
||||||
|
if not line: continue
|
||||||
|
print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'}))
|
||||||
|
" > "${STRAHL_DIR}/${cname}.jsonl" 2>/dev/null || : > "${STRAHL_DIR}/${cname}.jsonl"
|
||||||
|
done < <(ssh ${SSH_OPTS} strahl "docker ps --format '{{.Names}}'" 2>/dev/null)
|
||||||
|
echo "strahl docker: $(ls "${STRAHL_DIR}"/*.jsonl 2>/dev/null | wc -l) containers"
|
||||||
|
else
|
||||||
|
echo "strahl: unreachable, skipping docker logs"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# ── Cass — Plex logs (native install, no Docker) ─────────────────────────────
|
||||||
|
PLEX_DIR="${DATA_DIR}/plex-cass"
|
||||||
|
PLEX_LOG_DIR="/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs"
|
||||||
|
mkdir -p "${PLEX_DIR}"
|
||||||
|
|
||||||
|
if ssh ${SSH_OPTS} cass true 2>/dev/null; then
|
||||||
|
while IFS= read -r remote_path; do
|
||||||
|
[[ -z "${remote_path}" ]] && continue
|
||||||
|
local_name="$(basename "${remote_path}" | tr ' ' '_' | tr '[:upper:]' '[:lower:]')"
|
||||||
|
ssh ${SSH_OPTS} cass "cat '${remote_path}'" > "${PLEX_DIR}/${local_name}" 2>/dev/null || true
|
||||||
|
done < <(ssh ${SSH_OPTS} cass "ls '${PLEX_LOG_DIR}'/*.log 2>/dev/null" 2>/dev/null)
|
||||||
|
echo "cass plex: $(ls "${PLEX_DIR}"/*.log 2>/dev/null | wc -l) log files"
|
||||||
|
else
|
||||||
|
echo "cass: unreachable, skipping plex logs"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# ── Ingest everything ─────────────────────────────────────────────────────────
|
||||||
|
{
|
||||||
|
# Remote journals (explicit source IDs via YAML)
|
||||||
|
${INGEST} --sources /devl/turnstone-cluster/patterns/sources-cluster.yaml --db "${DB}"
|
||||||
|
|
||||||
|
# Docker and Plex logs (source IDs derived from filenames by directory ingest)
|
||||||
|
for dir in "${HEIMDALL_DIR}" "${NAVI_DIR}" "${STRAHL_DIR}" "${PLEX_DIR}"; do
|
||||||
|
[[ -d "${dir}" ]] && ls "${dir}"/*.jsonl "${dir}"/*.log 2>/dev/null | grep -q . && \
|
||||||
|
${INGEST} "${dir}" "${DB}" || true
|
||||||
|
done
|
||||||
|
} >> "${LOG}" 2>&1
|
||||||
|
|
||||||
echo "collect_cluster_logs: done"
|
echo "collect_cluster_logs: done"
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ Environment=TURNSTONE_DB=/devl/turnstone-cluster/data/turnstone.db
|
||||||
Environment=TURNSTONE_PATTERNS=/devl/turnstone-cluster/patterns
|
Environment=TURNSTONE_PATTERNS=/devl/turnstone-cluster/patterns
|
||||||
Environment=TURNSTONE_SOURCE_HOST=heimdall-cluster
|
Environment=TURNSTONE_SOURCE_HOST=heimdall-cluster
|
||||||
ExecStart=/devl/miniconda3/envs/cf/bin/python -m uvicorn app.rest:app \
|
ExecStart=/devl/miniconda3/envs/cf/bin/python -m uvicorn app.rest:app \
|
||||||
--host 0.0.0.0 --port 8534
|
--host 0.0.0.0 --port 8534 --reload
|
||||||
Restart=on-failure
|
Restart=on-failure
|
||||||
RestartSec=5s
|
RestartSec=5s
|
||||||
StandardOutput=journal
|
StandardOutput=journal
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@
|
||||||
@keydown.enter="run()"
|
@keydown.enter="run()"
|
||||||
/>
|
/>
|
||||||
<button
|
<button
|
||||||
:disabled="loading || !query.trim()"
|
:disabled="loading || (!query.trim() && !sourceScope)"
|
||||||
@click="run()"
|
@click="run()"
|
||||||
class="px-6 py-2.5 rounded bg-accent text-white text-sm font-semibold hover:bg-blue-400 transition-colors disabled:opacity-50"
|
class="px-6 py-2.5 rounded bg-accent text-white text-sm font-semibold hover:bg-blue-400 transition-colors disabled:opacity-50"
|
||||||
>
|
>
|
||||||
|
|
@ -19,6 +19,17 @@
|
||||||
</button>
|
</button>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<!-- Source scope badge -->
|
||||||
|
<div v-if="sourceScope" class="flex items-center gap-2 mb-4 text-xs">
|
||||||
|
<span class="text-text-dim">Scoped to:</span>
|
||||||
|
<span class="font-mono text-accent bg-accent/10 border border-accent/20 rounded px-2 py-0.5">{{ sourceScope }}</span>
|
||||||
|
<button
|
||||||
|
@click="sourceScope = null"
|
||||||
|
class="text-text-dim hover:text-text-primary ml-1"
|
||||||
|
title="Clear scope"
|
||||||
|
>✕</button>
|
||||||
|
</div>
|
||||||
|
|
||||||
<!-- Error -->
|
<!-- Error -->
|
||||||
<div v-if="error" class="mb-4 p-3 rounded bg-red-900/30 border border-red-700/40 text-sev-error text-sm">
|
<div v-if="error" class="mb-4 p-3 rounded bg-red-900/30 border border-red-700/40 text-sev-error text-sm">
|
||||||
{{ error }}
|
{{ error }}
|
||||||
|
|
@ -150,6 +161,7 @@ interface Summary {
|
||||||
}
|
}
|
||||||
|
|
||||||
const query = ref('')
|
const query = ref('')
|
||||||
|
const sourceScope = ref<string | null>(null)
|
||||||
const entries = ref<LogEntry[]>([])
|
const entries = ref<LogEntry[]>([])
|
||||||
const summary = ref<Summary | null>(null)
|
const summary = ref<Summary | null>(null)
|
||||||
const reasoning = ref<string | null>(null)
|
const reasoning = ref<string | null>(null)
|
||||||
|
|
@ -166,13 +178,21 @@ let capturedSince: string | null = null
|
||||||
let capturedUntil: string | null = null
|
let capturedUntil: string | null = null
|
||||||
|
|
||||||
onMounted(() => {
|
onMounted(() => {
|
||||||
|
const s = route.query.source
|
||||||
|
if (typeof s === 'string' && s.trim()) sourceScope.value = s
|
||||||
const q = route.query.q
|
const q = route.query.q
|
||||||
if (typeof q === 'string' && q.trim()) {
|
if (typeof q === 'string' && q.trim()) {
|
||||||
query.value = q
|
query.value = q
|
||||||
run()
|
run()
|
||||||
|
} else if (sourceScope.value) {
|
||||||
|
run()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
watch(() => route.query.source, (newS) => {
|
||||||
|
sourceScope.value = typeof newS === 'string' && newS.trim() ? newS : null
|
||||||
|
})
|
||||||
|
|
||||||
watch(() => route.query.q, (newQ) => {
|
watch(() => route.query.q, (newQ) => {
|
||||||
if (typeof newQ === 'string' && newQ.trim() && newQ !== lastQuery.value) {
|
if (typeof newQ === 'string' && newQ.trim() && newQ !== lastQuery.value) {
|
||||||
query.value = newQ
|
query.value = newQ
|
||||||
|
|
@ -181,7 +201,7 @@ watch(() => route.query.q, (newQ) => {
|
||||||
})
|
})
|
||||||
|
|
||||||
async function run() {
|
async function run() {
|
||||||
if (!query.value.trim()) return
|
if (!query.value.trim() && !sourceScope.value) return
|
||||||
loading.value = true
|
loading.value = true
|
||||||
error.value = null
|
error.value = null
|
||||||
ranOnce.value = true
|
ranOnce.value = true
|
||||||
|
|
@ -192,7 +212,7 @@ async function run() {
|
||||||
const res = await fetch(`${BASE}/api/diagnose`, {
|
const res = await fetch(`${BASE}/api/diagnose`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: { 'Content-Type': 'application/json' },
|
headers: { 'Content-Type': 'application/json' },
|
||||||
body: JSON.stringify({ query: query.value }),
|
body: JSON.stringify({ query: query.value || sourceScope.value, source: sourceScope.value }),
|
||||||
})
|
})
|
||||||
if (!res.ok) throw new Error(`API returned ${res.status}`)
|
if (!res.ok) throw new Error(`API returned ${res.status}`)
|
||||||
const data = await res.json()
|
const data = await res.json()
|
||||||
|
|
|
||||||
|
|
@ -246,7 +246,7 @@ function healthDot(errors: number, total: number): string {
|
||||||
}
|
}
|
||||||
|
|
||||||
function diagnoseSource(sourceId: string) {
|
function diagnoseSource(sourceId: string) {
|
||||||
router.push({ path: '/diagnose', query: { q: sourceId } })
|
router.push({ path: '/diagnose', query: { source: sourceId } })
|
||||||
}
|
}
|
||||||
|
|
||||||
function shortTs(iso: string | null): string {
|
function shortTs(iso: string | null): string {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue