turnstone/app/glean/pipeline.py
pyr0ball 1131816666 feat: bundle PII sanitization, onboarding wizard, NL source addition (#51, #52, #53)
Bundle export (#51):
- _redact_text() with 5 compiled regex patterns (IPv4, email, user=, host=, password=)
- build_bundle(sanitize=False) — per-entry redaction at export time
- sent_bundles table tracks every outgoing export (GET and POST /send)
- GET /api/sent-bundles exposes history; SentBundle model added
- BundlesView: Received/Sent tabs, sanitized badge, 5-entry preview, re-download
- IncidentsView: Sanitize PII checkbox next to Send Bundle

Onboarding wizard (#52):
- app/services/discover.py: journald/Docker/file detection (best-effort, safe in containers)
- GET /api/setup/status, /discover, POST /api/setup/write (additive, appends to existing)
- SetupWizard.vue: 3-step Detect → Select → Confirm
  - Step 1 shows grouped summary (journald/file/docker counts)
  - Step 2: collapsible groups with All/None section toggles
    - journald + file: pre-selected; docker: collapsed, none pre-selected
  - Step 3: YAML preview before write
- SourcesView: shows wizard on first run; Add Source button reuses it

NL source addition (#53):
- app/services/nl_source.py: keyword shortcut (13 well-known apps) + LLM fallback
- POST /api/setup/interpret: keyword → LLM → null (graceful fallback)
- NL field in wizard step 2; manual form shown when interpretation fails
- Added sources appear in grouped list immediately
2026-05-29 14:14:28 -07:00

676 lines
23 KiB
Python

"""Glean pipeline: auto-detect format, parse, write to SQLite."""
from __future__ import annotations
import json
import logging
import re
import sqlite3
from pathlib import Path
from typing import Iterator
import yaml
from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh
from app.glean.base import _compile, load_patterns, now_iso
from app.glean.ssh import (
SSHTransport,
SSHConnectionError,
SSHCommandError,
_build_docker_command,
_build_journald_command,
_build_plaintext_command,
_build_syslog_command,
)
from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index
logger = logging.getLogger(__name__)
_SCHEMA = """
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
-- kept here as no-ops so legacy single-file deployments still work
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score REAL,
llm_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
CREATE TABLE IF NOT EXISTS glean_fingerprints (
path TEXT PRIMARY KEY,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
gleaned_at TEXT NOT NULL
);
"""
_CONTEXT_SCHEMA = """
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
"""
def ensure_schema(db_path: Path) -> None:
"""Create all tables and apply additive migrations. Safe to call on every startup."""
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
# Additive column migrations — ALTER TABLE silently skips if column exists
for stmt in [
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
def ensure_context_schema(db_path: Path) -> None:
"""Create context KB tables in a dedicated database file.
Using a separate file from the main log DB means context fact writes never
contend with the high-throughput glean scheduler, which can hold the main
DB write lock for seconds at a time when flushing large journal batches.
"""
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript(_CONTEXT_SCHEMA)
conn.commit()
conn.close()
def _fingerprint(path: Path) -> tuple[float, int]:
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
st = path.stat()
return st.st_mtime, st.st_size
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
"""Return True only when the stored fingerprint exactly matches (mtime, size).
A smaller size (log rotation) or a larger size (new lines appended) both
return False so the caller re-gleams the file.
"""
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(path),),
).fetchone()
if row is None:
return False
return row[0] == mtime and row[1] == size
def _save_fingerprint(
conn: sqlite3.Connection,
path: Path,
mtime: float,
size: int,
gleaned_at: str,
) -> None:
"""Upsert the fingerprint for *path* after a successful glean."""
conn.execute(
"""
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
VALUES (?, ?, ?, ?)
""",
(str(path), mtime, size, gleaned_at),
)
def _detect_format(first_line: str) -> str:
try:
obj = json.loads(first_line)
if "__REALTIME_TIMESTAMP" in obj:
return "journald"
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
return "docker"
if wazuh.is_wazuh_alert(obj):
return "wazuh"
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
return "caddy"
except (json.JSONDecodeError, AttributeError):
pass
if plex.is_plex_log(first_line):
return "plex"
if qbittorrent.is_qbit_log(first_line):
return "qbittorrent"
if servarr.is_servarr_log(first_line):
return "servarr"
if dmesg_log.is_dmesg_log(first_line):
return "dmesg"
if syslog.is_syslog(first_line):
return "syslog"
return "plaintext"
def _parse_file(
path: Path,
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
source_id: str | None = None,
) -> Iterator[RetrievedEntry]:
source_id = source_id or path.stem
with path.open("r", errors="replace") as f:
lines = iter(f)
try:
first = next(lines)
except StopIteration:
return
fmt = _detect_format(first.strip())
logger.info("Detected format %r for %s", fmt, path.name)
def all_lines():
yield first
yield from lines
if fmt == "journald":
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "wazuh":
yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "docker":
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "caddy":
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "plex":
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "qbittorrent":
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "servarr":
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "dmesg":
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "syslog":
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
else:
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
conn.executemany(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
[
(
e.entry_id, e.source_id, e.sequence,
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
e.severity, e.repeat_count, int(e.out_of_order),
json.dumps(list(e.matched_patterns)), e.text,
)
for e in batch
],
)
def _glean_files(
files: list[Path],
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
source_id_map: dict[Path, str] | None = None,
force: bool = False,
) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file)
compiled = _compile(patterns)
ingest_time = now_iso()
source_id_map = source_id_map or {}
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
stats: dict[str, int] = {}
skipped: list[str] = []
for log_file in files:
source_id = source_id_map.get(log_file, log_file.stem)
# Fingerprint check — skip files whose mtime+size haven't changed.
mtime, size = _fingerprint(log_file)
if not force and _fp_unchanged(conn, log_file, mtime, size):
logger.debug("Skipping unchanged file: %s", log_file.name)
skipped.append(log_file.name)
stats[source_id] = stats.get(source_id, 0)
continue
count = 0
batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
batch.append(entry)
if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
conn.commit()
stats[source_id] = stats.get(source_id, 0) + count
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
conn.close()
if skipped:
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
logger.info("Building FTS index...")
build_fts_index(db_path)
logger.info("FTS index ready")
return stats
def _stream_and_write(
transport: SSHTransport,
cmd: str,
parser,
source_id: str,
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
conn: sqlite3.Connection,
batch_size: int,
) -> int:
"""Stream *cmd* output through *parser* and write entries to *conn*.
Catches SSHCommandError per-item so one bad command doesn't abort the rest
of the glean items for this host. Returns the number of entries written.
"""
count = 0
batch: list[RetrievedEntry] = []
try:
for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time):
batch.append(entry)
if len(batch) >= batch_size:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
batch.clear()
if batch:
_write_batch(conn, batch)
conn.commit()
count += len(batch)
except SSHCommandError as exc:
logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc)
logger.info("Gleaned %d entries from SSH source %s", count, source_id)
return count
def _glean_ssh_source(
src: dict, # type: ignore[type-arg]
compiled: list[tuple[LogPattern, object]],
ingest_time: str,
conn: sqlite3.Connection,
batch_size: int,
) -> dict[str, int]:
"""Open one SSHTransport connection for *src* and glean all its glean items.
One SSH connection is shared across all items in the ``glean:`` list so
the handshake overhead is paid only once per host per glean run.
Returns a stats dict mapping ``{source_id: entry_count}`` for each item.
Gracefully skips the entire source on SSHConnectionError.
"""
host_id = src.get("id", src.get("host", "unknown"))
host = src["host"]
user = src["user"]
key_path = str(Path(src["key_path"]).expanduser())
port = int(src.get("port", 22))
glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg]
stats: dict[str, int] = {}
try:
with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t:
for item in glean_items:
item_type = item.get("type", "plaintext")
# Per-item source_id — falls back to host_id/type for un-labelled items
item_id = item.get("id") or f"{host_id}/{item_type}"
if item_type == "journald":
cmd = _build_journald_command(item)
count = _stream_and_write(
t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size
)
stats[item_id] = stats.get(item_id, 0) + count
elif item_type == "syslog":
cmd = _build_syslog_command(item)
count = _stream_and_write(
t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size
)
stats[item_id] = stats.get(item_id, 0) + count
elif item_type == "plaintext":
cmd = _build_plaintext_command(item)
count = _stream_and_write(
t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size
)
stats[item_id] = stats.get(item_id, 0) + count
elif item_type == "docker":
cmds = _build_docker_command(item)
if isinstance(cmds, str):
cmds = [cmds]
containers: list[str] = item.get("containers", [])
for i, cmd in enumerate(cmds):
# Use the container name as the final path segment when available
container_name = containers[i] if i < len(containers) else str(i)
container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id
count = _stream_and_write(
t, cmd, docker_log.parse, container_id,
compiled, ingest_time, conn, batch_size,
)
stats[container_id] = stats.get(container_id, 0) + count
else:
logger.warning(
"Unknown SSH glean type %r for source %r — skipping item",
item_type, host_id,
)
except SSHConnectionError as exc:
logger.warning("SSH connection failed for source %r: %s", host_id, exc)
return stats
def glean_ssh_source(
src: dict, # type: ignore[type-arg]
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
) -> dict[str, int]:
"""Glean a single SSH source dict and write results to *db_path*.
Public wrapper around :func:`_glean_ssh_source` for the REST layer.
Manages the DB connection, pattern compilation, and FTS rebuild so callers
don't have to deal with those lifecycle concerns.
Returns stats mapping ``{sub_source_id: entry_count}``.
"""
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
compiled = _compile(load_patterns(effective_pattern_file))
ingest_time = now_iso()
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
try:
stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
finally:
conn.close()
logger.info("Rebuilding FTS index after SSH source glean...")
build_fts_index(db_path)
return stats
def glean_dir(
corpus_dir: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]:
"""Glean all .jsonl and .log files from a corpus directory.
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
regardless of whether they have changed since the last run.
"""
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
def glean_file(
log_file: Path,
db_path: Path,
pattern_file: Path | None = None,
force: bool = False,
) -> dict[str, int]:
"""Glean a single log file (any supported format).
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
"""
return _glean_files([log_file], db_path, pattern_file, force=force)
def glean_sources(
sources_file: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]:
"""Glean all sources listed in a sources.yaml config file.
Supports two source types:
Local file sources (default):
sources:
- id: sonarr
path: /opt/sonarr/config/logs/sonarr.0.txt
SSH remote sources (transport: ssh):
sources:
- id: rack01
transport: ssh
host: 192.168.1.10
user: admin
key_path: ~/.ssh/id_ed25519
glean:
- type: journald
args: ["--since", "2 hours ago"]
- type: syslog
path: /var/log/syslog
- type: plaintext
path: /var/log/app/error.log
- type: docker
containers: [myapp, nginx]
Missing local paths and SSH connection failures are logged as warnings
so the cron keeps running when a source is temporarily down.
"""
with open(sources_file) as f:
config = yaml.safe_load(f)
local_sources: list[dict] = [] # type: ignore[type-arg]
ssh_sources: list[dict] = [] # type: ignore[type-arg]
for src in config.get("sources", []):
if src.get("transport") == "ssh":
ssh_sources.append(src)
else:
local_sources.append(src)
# ── Local file sources ─────────────────────────────────────────────────
files: list[Path] = []
source_id_map: dict[Path, str] = {}
for src in local_sources:
path = Path(src["path"])
if not path.exists():
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
continue
files.append(path)
if "id" in src:
source_id_map[path] = src["id"]
if not files and not ssh_sources:
logger.warning("No sources found — check sources.yaml paths")
return {}
stats: dict[str, int] = {}
if files:
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
# ── SSH remote sources ─────────────────────────────────────────────────
if not ssh_sources:
return stats
# Compile patterns once, share across all SSH sources in this run.
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
compiled = _compile(load_patterns(effective_pattern_file))
ingest_time = now_iso()
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
try:
for src in ssh_sources:
ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
for k, v in ssh_stats.items():
stats[k] = stats.get(k, 0) + v
finally:
conn.close()
# Rebuild FTS only when SSH sources added entries (_glean_files already
# rebuilds when local sources are present; safe to call again if both ran).
if ssh_sources:
logger.info("Rebuilding FTS index after SSH glean...")
build_fts_index(db_path)
return stats