Watcher, REST endpoints, services (search, incidents, blocklist),
MCP server, context retriever, embedder, glean_scheduler, and
doc_upload all used the default 5-second SQLite busy timeout.
During collect glean write phases, watcher flush threads were hitting
'database is locked' errors when the glean held the write lock longer
than 5 seconds.
All connections now use timeout=30.0, matching the pipeline fix
from commit ee39ffb. No logic changes.
167 lines
7.3 KiB
Bash
167 lines
7.3 KiB
Bash
#!/usr/bin/env bash
|
|
# Collect logs from all CircuitForge cluster nodes into Turnstone.
|
|
#
|
|
# Local Heimdall sources (journal, live-watched Docker containers, network syslog)
|
|
# are handled by the Turnstone live watcher — no collection needed for those.
|
|
#
|
|
# This script handles:
|
|
# - Remote node SSH journals (navi, sif, cass, strahl)
|
|
# - Docker container logs from all nodes (auto-discovered)
|
|
# - Plex logs from Cass (native install, no Docker)
|
|
#
|
|
# Triggered by: turnstone-cluster-collect.timer (every 15 min)
|
|
# Manual run: bash /Library/Development/CircuitForge/turnstone/scripts/collect_cluster_logs.sh
|
|
|
|
set -euo pipefail
|
|
|
|
DATA_DIR=/devl/turnstone-cluster/data
|
|
WINDOW="20 minutes ago"
|
|
SSH_OPTS="-o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=no"
|
|
PYTHON=/devl/miniconda3/envs/cf/bin/python
|
|
INGEST="${PYTHON} /Library/Development/CircuitForge/turnstone/scripts/glean_corpus.py"
|
|
DB=/devl/turnstone-cluster/data/turnstone.db
|
|
LOG=/devl/turnstone-cluster/data/glean.log
|
|
|
|
mkdir -p "${DATA_DIR}"
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
# Collect docker logs from a container into a JSONL file.
|
|
# Usage: _docker_logs <container> <outfile> [docker_cmd_prefix...]
|
|
_docker_logs() {
|
|
local cname="$1" outfile="$2"
|
|
shift 2
|
|
"$@" docker logs --since 20m "${cname}" 2>&1 | \
|
|
python3 -c "
|
|
import sys, json
|
|
src = '${cname}'
|
|
for line in sys.stdin:
|
|
line = line.rstrip()
|
|
if not line: continue
|
|
print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'}))
|
|
" > "${outfile}" 2>/dev/null || : > "${outfile}"
|
|
}
|
|
|
|
# ── Remote cluster node journals ──────────────────────────────────────────────
|
|
declare -A NODES=(
|
|
[navi]="${DATA_DIR}/navi-journal.jsonl"
|
|
[sif]="${DATA_DIR}/sif-journal.jsonl"
|
|
[cass]="${DATA_DIR}/cass-journal.jsonl"
|
|
[strahl]="${DATA_DIR}/strahl-journal.jsonl"
|
|
[muninn]="${DATA_DIR}/muninn-journal.jsonl"
|
|
)
|
|
|
|
for node in "${!NODES[@]}"; do
|
|
outfile="${NODES[$node]}"
|
|
if ssh ${SSH_OPTS} "${node}" true 2>/dev/null; then
|
|
ssh ${SSH_OPTS} "${node}" \
|
|
"journalctl --output=json --priority=0..5 --since '${WINDOW}' --no-pager 2>/dev/null || true" \
|
|
> "${outfile}" 2>/dev/null || { echo "${node}: ssh failed, skipping"; : > "${outfile}"; }
|
|
echo "${node}: $(wc -l < "${outfile}") journal entries"
|
|
else
|
|
echo "${node}: unreachable, skipping"
|
|
: > "${outfile}"
|
|
fi
|
|
done
|
|
|
|
# ── Heimdall Docker containers (non-live-watched) ─────────────────────────────
|
|
# The live watcher already tails: cf-orch-coordinator, cf-web, cf-directus, caddy-proxy
|
|
LIVE_WATCHED="cf-orch-coordinator cf-web cf-directus caddy-proxy"
|
|
HEIMDALL_DIR="${DATA_DIR}/docker-heimdall"
|
|
mkdir -p "${HEIMDALL_DIR}"
|
|
|
|
while IFS= read -r cname; do
|
|
[[ " ${LIVE_WATCHED} " == *" ${cname} "* ]] && continue
|
|
_docker_logs "${cname}" "${HEIMDALL_DIR}/${cname}.jsonl"
|
|
done < <(docker ps --format '{{.Names}}')
|
|
|
|
echo "heimdall docker: $(ls "${HEIMDALL_DIR}"/*.jsonl 2>/dev/null | wc -l) containers"
|
|
|
|
# ── Navi Docker containers ────────────────────────────────────────────────────
|
|
NAVI_DIR="${DATA_DIR}/docker-navi"
|
|
mkdir -p "${NAVI_DIR}"
|
|
|
|
if ssh ${SSH_OPTS} navi true 2>/dev/null; then
|
|
while IFS= read -r cname; do
|
|
[[ -z "${cname}" ]] && continue
|
|
ssh ${SSH_OPTS} navi "docker logs --since 20m '${cname}' 2>&1" | \
|
|
python3 -c "
|
|
import sys, json
|
|
src = 'navi/${cname}'
|
|
for line in sys.stdin:
|
|
line = line.rstrip()
|
|
if not line: continue
|
|
print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'}))
|
|
" > "${NAVI_DIR}/${cname}.jsonl" 2>/dev/null || : > "${NAVI_DIR}/${cname}.jsonl"
|
|
done < <(ssh ${SSH_OPTS} navi "docker ps --format '{{.Names}}'" 2>/dev/null)
|
|
echo "navi docker: $(ls "${NAVI_DIR}"/*.jsonl 2>/dev/null | wc -l) containers"
|
|
else
|
|
echo "navi: unreachable, skipping docker logs"
|
|
fi
|
|
|
|
# ── Navi qBittorrent app logs (volume-mounted files, not in docker logs) ──────
|
|
# qBit writes rich per-torrent events to a file inside the compose volume.
|
|
# These are NOT captured by `docker logs` — must be pulled directly.
|
|
QBIT_LOG_BASE="/opt/containers/arr"
|
|
for instance in qbit-tb0 qbit-tb1 qbit-tb2; do
|
|
remote_log="${QBIT_LOG_BASE}/${instance}/qBittorrent/logs/qbittorrent.log"
|
|
local_out="${NAVI_DIR}/${instance}-app.log"
|
|
if ssh ${SSH_OPTS} navi "test -f '${remote_log}'" 2>/dev/null; then
|
|
ssh ${SSH_OPTS} navi "cat '${remote_log}'" > "${local_out}" 2>/dev/null || : > "${local_out}"
|
|
else
|
|
: > "${local_out}"
|
|
fi
|
|
done
|
|
echo "navi qbit app logs: $(cat "${NAVI_DIR}"/qbit-tb*.log 2>/dev/null | wc -l) lines"
|
|
|
|
# ── Strahl Docker containers ──────────────────────────────────────────────────
|
|
STRAHL_DIR="${DATA_DIR}/docker-strahl"
|
|
mkdir -p "${STRAHL_DIR}"
|
|
|
|
if ssh ${SSH_OPTS} strahl true 2>/dev/null; then
|
|
while IFS= read -r cname; do
|
|
[[ -z "${cname}" ]] && continue
|
|
ssh ${SSH_OPTS} strahl "docker logs --since 20m '${cname}' 2>&1" | \
|
|
python3 -c "
|
|
import sys, json
|
|
src = 'strahl/${cname}'
|
|
for line in sys.stdin:
|
|
line = line.rstrip()
|
|
if not line: continue
|
|
print(json.dumps({'MESSAGE': line, 'SYSLOG_IDENTIFIER': src, '_TRANSPORT': 'docker', 'PRIORITY': '6'}))
|
|
" > "${STRAHL_DIR}/${cname}.jsonl" 2>/dev/null || : > "${STRAHL_DIR}/${cname}.jsonl"
|
|
done < <(ssh ${SSH_OPTS} strahl "docker ps --format '{{.Names}}'" 2>/dev/null)
|
|
echo "strahl docker: $(ls "${STRAHL_DIR}"/*.jsonl 2>/dev/null | wc -l) containers"
|
|
else
|
|
echo "strahl: unreachable, skipping docker logs"
|
|
fi
|
|
|
|
# ── Cass — Plex logs (native install, no Docker) ─────────────────────────────
|
|
PLEX_DIR="${DATA_DIR}/plex-cass"
|
|
PLEX_LOG_DIR="/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs"
|
|
mkdir -p "${PLEX_DIR}"
|
|
|
|
if ssh ${SSH_OPTS} cass true 2>/dev/null; then
|
|
while IFS= read -r remote_path; do
|
|
[[ -z "${remote_path}" ]] && continue
|
|
local_name="$(basename "${remote_path}" | tr ' ' '_' | tr '[:upper:]' '[:lower:]')"
|
|
ssh ${SSH_OPTS} cass "cat '${remote_path}'" > "${PLEX_DIR}/${local_name}" 2>/dev/null || true
|
|
done < <(ssh ${SSH_OPTS} cass "ls '${PLEX_LOG_DIR}'/*.log 2>/dev/null" 2>/dev/null)
|
|
echo "cass plex: $(ls "${PLEX_DIR}"/*.log 2>/dev/null | wc -l) log files"
|
|
else
|
|
echo "cass: unreachable, skipping plex logs"
|
|
fi
|
|
|
|
# ── Ingest everything ─────────────────────────────────────────────────────────
|
|
{
|
|
# Remote journals (explicit source IDs via YAML)
|
|
${INGEST} --sources /devl/turnstone-cluster/patterns/sources-cluster.yaml --db "${DB}"
|
|
|
|
# Docker and Plex logs (source IDs derived from filenames by directory glean)
|
|
for dir in "${HEIMDALL_DIR}" "${NAVI_DIR}" "${STRAHL_DIR}" "${PLEX_DIR}"; do
|
|
[[ -d "${dir}" ]] && ls "${dir}"/*.jsonl "${dir}"/*.log 2>/dev/null | grep -q . && \
|
|
${INGEST} "${dir}" "${DB}" || true
|
|
done
|
|
} >> "${LOG}" 2>&1
|
|
|
|
echo "collect_cluster_logs: done"
|