turnstone/app/glean/pipeline.py
pyr0ball 7a2ab0bb46 feat(orchard): auto-enrollment API for branch node provisioning (#27)
Implements the Orchard branch grafting system for harvest.circuitforge.tech:

- POST /api/orchard/graft: provisions data dir, starts a new
  turnstone-submissions-<slug> Docker container on the next free port
  (ORCHARD_PORT_BASE=8538+), injects a handle_path block into the
  Caddyfile dynamic-branches marker section, restarts caddy-proxy,
  returns {submit_endpoint, api_key}
- GET /api/orchard/branches: list active/inactive branches (admin-only)
- DELETE /api/orchard/branches/<slug>: deactivate branch + stop container
- POST /api/orchard/branches/<slug>/anonymize: HMAC-based IP/username
  pseudonymization worker over a branch DB
- POST /api/glean/batch: optional TURNSTONE_BRANCH_KEY auth guard
- anonymized column added to log_entries schema (migration-safe)
- Updated Caddyfile with /huginn/* route (port 8536), /node2/* (8537),
  and dynamic-branch marker section
- All endpoints admin-gated via TURNSTONE_ORCHARD_ADMIN_KEY

Closes: #27
2026-06-14 14:30:18 -07:00

641 lines
22 KiB
Python

"""Glean pipeline: auto-detect format, parse, write to SQLite or Postgres."""
from __future__ import annotations
import json
import logging
import re
import sqlite3 # still used in migrate_incidents_to_dedicated_db (SQLite-only migration)
from pathlib import Path
from typing import Any, Iterator
from app.db import (
frag,
get_conn,
resolve_tenant_id,
)
from app.db.schema import (
ensure_context_schema,
ensure_incidents_schema,
ensure_schema,
migrate_incidents_to_dedicated_db,
)
import yaml
from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh
from app.glean.base import _compile, load_patterns, now_iso
from app.glean.ssh import (
SSHTransport,
SSHConnectionError,
SSHCommandError,
_build_docker_command,
_build_journald_command,
_build_plaintext_command,
_build_syslog_command,
)
from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index
logger = logging.getLogger(__name__)
_SCHEMA = """
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL,
anonymized INTEGER DEFAULT NULL
);
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
-- incidents tables moved to ensure_incidents_schema() / INCIDENTS_DB_PATH
-- kept here as no-ops so legacy single-file deployments still work
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
-- kept here as no-ops so legacy single-file deployments still work
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score REAL,
llm_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
CREATE TABLE IF NOT EXISTS glean_fingerprints (
path TEXT PRIMARY KEY,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
gleaned_at TEXT NOT NULL
);
"""
_CONTEXT_SCHEMA = """
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
"""
# ensure_schema / ensure_context_schema / ensure_incidents_schema / migrate_incidents_to_dedicated_db
# are now implemented in app/db/schema.py and re-exported via app/db/__init__.py.
# The imports at the top of this file bring them in; these names are kept as module-level
# symbols so existing callers (rest.py, tests) still find them here without changes.
# _INCIDENTS_SCHEMA and its ensure_/migrate_ functions moved to app/db/schema.py
def _fingerprint(path: Path) -> tuple[float, int]:
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
st = path.stat()
return st.st_mtime, st.st_size
def _fp_unchanged(conn: Any, path: Path, mtime: float, size: int) -> bool:
"""Return True only when the stored fingerprint exactly matches (mtime, size)."""
tid = resolve_tenant_id()
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ? AND (tenant_id = ? OR tenant_id = '')",
(str(path), tid),
).fetchone()
if row is None:
return False
return row["mtime"] == mtime and row["size"] == size
def _save_fingerprint(
conn: Any,
path: Path,
mtime: float,
size: int,
gleaned_at: str,
) -> None:
"""Upsert the fingerprint for *path* after a successful glean."""
tid = resolve_tenant_id()
conn.execute(frag.fingerprint_upsert(), (tid, str(path), mtime, size, gleaned_at))
def _detect_format(first_line: str) -> str:
try:
obj = json.loads(first_line)
if "__REALTIME_TIMESTAMP" in obj:
return "journald"
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
return "docker"
if wazuh.is_wazuh_alert(obj):
return "wazuh"
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
return "caddy"
except (json.JSONDecodeError, AttributeError):
pass
if plex.is_plex_log(first_line):
return "plex"
if qbittorrent.is_qbit_log(first_line):
return "qbittorrent"
if servarr.is_servarr_log(first_line):
return "servarr"
if dmesg_log.is_dmesg_log(first_line):
return "dmesg"
if syslog.is_syslog(first_line):
return "syslog"
return "plaintext"
def _parse_file(
path: Path,
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
source_id: str | None = None,
) -> Iterator[RetrievedEntry]:
source_id = source_id or path.stem
with path.open("r", errors="replace") as f:
lines = iter(f)
try:
first = next(lines)
except StopIteration:
return
fmt = _detect_format(first.strip())
logger.info("Detected format %r for %s", fmt, path.name)
def all_lines():
yield first
yield from lines
if fmt == "journald":
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "wazuh":
yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "docker":
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "caddy":
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "plex":
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "qbittorrent":
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "servarr":
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "dmesg":
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "syslog":
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
else:
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
def _write_batch(conn: Any, batch: list[RetrievedEntry]) -> None:
tid = resolve_tenant_id()
conflict = frag.entries_conflict_clause()
sql = f"""
{frag.insert_ignore_entries()}
(tenant_id, id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
{conflict}
"""
conn.executemany(
sql,
[
(
tid, e.entry_id, e.source_id, e.sequence,
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
e.severity, e.repeat_count, int(e.out_of_order),
json.dumps(list(e.matched_patterns)), e.text,
)
for e in batch
],
)
def _glean_files(
files: list[Path],
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
source_id_map: dict[Path, str] | None = None,
force: bool = False,
) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file)
compiled = _compile(patterns)
ingest_time = now_iso()
source_id_map = source_id_map or {}
ensure_schema(db_path)
with get_conn(db_path) as conn:
stats: dict[str, int] = {}
skipped: list[str] = []
for log_file in files:
source_id = source_id_map.get(log_file, log_file.stem)
mtime, size = _fingerprint(log_file)
if not force and _fp_unchanged(conn, log_file, mtime, size):
logger.debug("Skipping unchanged file: %s", log_file.name)
skipped.append(log_file.name)
stats[source_id] = stats.get(source_id, 0)
continue
count = 0
batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
batch.append(entry)
if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
conn.commit()
stats[source_id] = stats.get(source_id, 0) + count
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
if skipped:
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
logger.info("Building FTS index...")
build_fts_index(db_path)
logger.info("FTS index ready")
return stats
def _stream_and_write(
transport: SSHTransport,
cmd: str,
parser,
source_id: str,
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
conn: Any,
batch_size: int,
) -> int:
"""Stream *cmd* output through *parser* and write entries to *conn*.
Catches SSHCommandError per-item so one bad command doesn't abort the rest
of the glean items for this host. Returns the number of entries written.
"""
count = 0
batch: list[RetrievedEntry] = []
try:
for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time):
batch.append(entry)
if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
except SSHCommandError as exc:
logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc)
logger.info("Gleaned %d entries from SSH source %s", count, source_id)
return count
def _glean_ssh_source(
src: dict, # type: ignore[type-arg]
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
conn: Any,
batch_size: int,
) -> dict[str, int]:
"""Open one SSHTransport connection for *src* and glean all its glean items.
One SSH connection is shared across all items in the ``glean:`` list so
the handshake overhead is paid only once per host per glean run.
Returns a stats dict mapping ``{source_id: entry_count}`` for each item.
Gracefully skips the entire source on SSHConnectionError.
"""
host_id = src.get("id", src.get("host", "unknown"))
host = src["host"]
user = src["user"]
key_path = str(Path(src["key_path"]).expanduser())
port = int(src.get("port", 22))
glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg]
stats: dict[str, int] = {}
try:
with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t:
for item in glean_items:
item_type = item.get("type", "plaintext")
# Per-item source_id — falls back to host_id/type for un-labelled items
item_id = item.get("id") or f"{host_id}/{item_type}"
if item_type == "journald":
cmd = _build_journald_command(item)
count = _stream_and_write(
t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size
)
stats[item_id] = stats.get(item_id, 0) + count
elif item_type == "syslog":
cmd = _build_syslog_command(item)
count = _stream_and_write(
t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size
)
stats[item_id] = stats.get(item_id, 0) + count
elif item_type == "plaintext":
cmd = _build_plaintext_command(item)
count = _stream_and_write(
t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size
)
stats[item_id] = stats.get(item_id, 0) + count
elif item_type == "docker":
cmds = _build_docker_command(item)
if isinstance(cmds, str):
cmds = [cmds]
containers: list[str] = item.get("containers", [])
for i, cmd in enumerate(cmds):
# Use the container name as the final path segment when available
container_name = containers[i] if i < len(containers) else str(i)
container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id
count = _stream_and_write(
t, cmd, docker_log.parse, container_id,
compiled, ingest_time, conn, batch_size,
)
stats[container_id] = stats.get(container_id, 0) + count
else:
logger.warning(
"Unknown SSH glean type %r for source %r — skipping item",
item_type, host_id,
)
except SSHConnectionError as exc:
logger.warning("SSH connection failed for source %r: %s", host_id, exc)
return stats
def glean_ssh_source(
src: dict, # type: ignore[type-arg]
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
) -> dict[str, int]:
"""Glean a single SSH source dict and write results to *db_path*.
Public wrapper around :func:`_glean_ssh_source` for the REST layer.
Manages the DB connection, pattern compilation, and FTS rebuild so callers
don't have to deal with those lifecycle concerns.
Returns stats mapping ``{sub_source_id: entry_count}``.
"""
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
compiled = _compile(load_patterns(effective_pattern_file))
ingest_time = now_iso()
ensure_schema(db_path)
with get_conn(db_path) as conn:
stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
logger.info("Rebuilding FTS index after SSH source glean...")
build_fts_index(db_path)
return stats
def glean_dir(
corpus_dir: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]:
"""Glean all .jsonl and .log files from a corpus directory.
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
regardless of whether they have changed since the last run.
"""
files = sorted(corpus_dir.rglob("*.jsonl")) + sorted(corpus_dir.rglob("*.log"))
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
def glean_file(
log_file: Path,
db_path: Path,
pattern_file: Path | None = None,
force: bool = False,
) -> dict[str, int]:
"""Glean a single log file (any supported format).
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
"""
return _glean_files([log_file], db_path, pattern_file, force=force)
def glean_sources(
sources_file: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]:
"""Glean all sources listed in a sources.yaml config file.
Supports two source types:
Local file sources (default):
sources:
- id: sonarr
path: /opt/sonarr/config/logs/sonarr.0.txt
SSH remote sources (transport: ssh):
sources:
- id: rack01
transport: ssh
host: 192.168.1.10
user: admin
key_path: ~/.ssh/id_ed25519
glean:
- type: journald
args: ["--since", "2 hours ago"]
- type: syslog
path: /var/log/syslog
- type: plaintext
path: /var/log/app/error.log
- type: docker
containers: [myapp, nginx]
Missing local paths and SSH connection failures are logged as warnings
so the cron keeps running when a source is temporarily down.
"""
with open(sources_file) as f:
config = yaml.safe_load(f)
local_sources: list[dict] = [] # type: ignore[type-arg]
ssh_sources: list[dict] = [] # type: ignore[type-arg]
for src in config.get("sources", []):
if src.get("transport") == "ssh":
ssh_sources.append(src)
else:
local_sources.append(src)
# ── Local file sources ─────────────────────────────────────────────────
files: list[Path] = []
source_id_map: dict[Path, str] = {}
for src in local_sources:
path = Path(src["path"])
if not path.exists():
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
continue
files.append(path)
if "id" in src:
source_id_map[path] = src["id"]
if not files and not ssh_sources:
logger.warning("No sources found — check sources.yaml paths")
return {}
stats: dict[str, int] = {}
if files:
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
# ── SSH remote sources ─────────────────────────────────────────────────
if not ssh_sources:
return stats
# Compile patterns once, share across all SSH sources in this run.
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
compiled = _compile(load_patterns(effective_pattern_file))
ingest_time = now_iso()
ensure_schema(db_path)
with get_conn(db_path) as conn:
for src in ssh_sources:
ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
for k, v in ssh_stats.items():
stats[k] = stats.get(k, 0) + v
conn.commit()
# Rebuild FTS only when SSH sources added entries (_glean_files already
# rebuilds when local sources are present; safe to call again if both ran).
if ssh_sources:
logger.info("Rebuilding FTS index after SSH glean...")
build_fts_index(db_path)
return stats