FTS5 bulk-insert write locks starved the incident API and bundle endpoints during log bursts (sonarr/radarr, high-volume docker sources). Fix mirrors the context_facts split (context -> turnstone-context.db): - Add INCIDENTS_DB_PATH / TURNSTONE_INCIDENTS_DB env var in rest.py - Add _INCIDENTS_SCHEMA, ensure_incidents_schema(), and migrate_incidents_to_dedicated_db() in glean/pipeline.py - Stub out incidents/received_bundles/sent_bundles in _SCHEMA (no-op CREATE IF NOT EXISTS) so legacy single-file deployments still open - Thread incidents_db_path through diagnose_stream -> run_pipeline -> FalsePositiveSuppressor.suppress -> _fetch_resolved_incidents - One-shot migration on startup: copy existing rows from main DB to incidents DB via INSERT OR IGNORE (idempotent, safe to re-run) - Fix test_blocklist_endpoints fixtures to patch CONTEXT_DB_PATH and INCIDENTS_DB_PATH alongside DB_PATH (worktree has no data/ dir) 372 tests passing. Closes: #60
762 lines
26 KiB
Python
762 lines
26 KiB
Python
"""Glean pipeline: auto-detect format, parse, write to SQLite."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import yaml
|
|
|
|
from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh
|
|
from app.glean.base import _compile, load_patterns, now_iso
|
|
from app.glean.ssh import (
|
|
SSHTransport,
|
|
SSHConnectionError,
|
|
SSHCommandError,
|
|
_build_docker_command,
|
|
_build_journald_command,
|
|
_build_plaintext_command,
|
|
_build_syslog_command,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
from app.services.search import build_fts_index
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS log_entries (
|
|
id TEXT PRIMARY KEY,
|
|
source_id TEXT NOT NULL,
|
|
sequence INTEGER NOT NULL,
|
|
timestamp_raw TEXT,
|
|
timestamp_iso TEXT,
|
|
ingest_time TEXT NOT NULL,
|
|
severity TEXT,
|
|
repeat_count INTEGER DEFAULT 1,
|
|
out_of_order INTEGER DEFAULT 0,
|
|
matched_patterns TEXT DEFAULT '[]',
|
|
text TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
|
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
|
|
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
|
|
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
|
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
|
|
|
-- incidents tables moved to ensure_incidents_schema() / INCIDENTS_DB_PATH
|
|
-- kept here as no-ops so legacy single-file deployments still work
|
|
CREATE TABLE IF NOT EXISTS incidents (
|
|
id TEXT PRIMARY KEY,
|
|
label TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
started_at TEXT,
|
|
ended_at TEXT,
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium'
|
|
);
|
|
CREATE TABLE IF NOT EXISTS received_bundles (
|
|
id TEXT PRIMARY KEY,
|
|
source_host TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
label TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium',
|
|
started_at TEXT,
|
|
bundled_at TEXT NOT NULL,
|
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
|
bundle_json TEXT NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS sent_bundles (
|
|
id TEXT PRIMARY KEY,
|
|
incident_id TEXT NOT NULL,
|
|
exported_at TEXT NOT NULL,
|
|
sanitized INTEGER NOT NULL DEFAULT 0,
|
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
|
bundle_json TEXT NOT NULL
|
|
);
|
|
|
|
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
|
|
-- kept here as no-ops so legacy single-file deployments still work
|
|
CREATE TABLE IF NOT EXISTS context_facts (
|
|
id TEXT PRIMARY KEY,
|
|
category TEXT NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
source TEXT,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_documents (
|
|
id TEXT PRIMARY KEY,
|
|
filename TEXT NOT NULL,
|
|
doc_type TEXT NOT NULL,
|
|
full_text TEXT NOT NULL,
|
|
file_size INTEGER,
|
|
uploaded_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_chunks (
|
|
id TEXT PRIMARY KEY,
|
|
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
|
|
chunk_index INTEGER NOT NULL,
|
|
text TEXT NOT NULL,
|
|
embedding BLOB
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS blocklist_candidates (
|
|
id TEXT PRIMARY KEY,
|
|
domain_or_ip TEXT NOT NULL,
|
|
source_device_ip TEXT,
|
|
source_device_name TEXT,
|
|
first_seen TEXT NOT NULL,
|
|
last_seen TEXT NOT NULL,
|
|
hit_count INTEGER DEFAULT 1,
|
|
status TEXT DEFAULT 'pending',
|
|
pushed_at TEXT,
|
|
log_evidence TEXT DEFAULT '[]',
|
|
matched_rule TEXT,
|
|
llm_score REAL,
|
|
llm_reason TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
|
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
|
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
|
|
|
CREATE TABLE IF NOT EXISTS glean_fingerprints (
|
|
path TEXT PRIMARY KEY,
|
|
mtime REAL NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
gleaned_at TEXT NOT NULL
|
|
);
|
|
"""
|
|
|
|
|
|
_CONTEXT_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS context_facts (
|
|
id TEXT PRIMARY KEY,
|
|
category TEXT NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
source TEXT,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_documents (
|
|
id TEXT PRIMARY KEY,
|
|
filename TEXT NOT NULL,
|
|
doc_type TEXT NOT NULL,
|
|
full_text TEXT NOT NULL,
|
|
file_size INTEGER,
|
|
uploaded_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_chunks (
|
|
id TEXT PRIMARY KEY,
|
|
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
|
|
chunk_index INTEGER NOT NULL,
|
|
text TEXT NOT NULL,
|
|
embedding BLOB
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
|
"""
|
|
|
|
|
|
def ensure_schema(db_path: Path) -> None:
|
|
"""Create all tables and apply additive migrations. Safe to call on every startup."""
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
# Additive column migrations — ALTER TABLE silently skips if column exists
|
|
for stmt in [
|
|
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def ensure_context_schema(db_path: Path) -> None:
|
|
"""Create context KB tables in a dedicated database file.
|
|
|
|
Using a separate file from the main log DB means context fact writes never
|
|
contend with the high-throughput glean scheduler, which can hold the main
|
|
DB write lock for seconds at a time when flushing large journal batches.
|
|
"""
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.executescript(_CONTEXT_SCHEMA)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
_INCIDENTS_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS incidents (
|
|
id TEXT PRIMARY KEY,
|
|
label TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
started_at TEXT,
|
|
ended_at TEXT,
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium'
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
|
|
|
CREATE TABLE IF NOT EXISTS received_bundles (
|
|
id TEXT PRIMARY KEY,
|
|
source_host TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
label TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium',
|
|
started_at TEXT,
|
|
bundled_at TEXT NOT NULL,
|
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
|
bundle_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
|
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
|
|
|
CREATE TABLE IF NOT EXISTS sent_bundles (
|
|
id TEXT PRIMARY KEY,
|
|
incident_id TEXT NOT NULL,
|
|
exported_at TEXT NOT NULL,
|
|
sanitized INTEGER NOT NULL DEFAULT 0,
|
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
|
bundle_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
|
|
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
|
|
"""
|
|
|
|
|
|
def ensure_incidents_schema(db_path: Path) -> None:
|
|
"""Create incidents tables in a dedicated database file.
|
|
|
|
Using a separate file from the main log DB means incident writes never
|
|
contend with the FTS5 bulk-insert write lock held by the glean scheduler.
|
|
Mirrors the context_facts split (CONTEXT_DB_PATH / turnstone-context.db).
|
|
"""
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_INCIDENTS_SCHEMA)
|
|
for stmt in [
|
|
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int:
|
|
"""One-shot migration: copy incidents/bundles rows from main DB to incidents DB.
|
|
|
|
Safe to call on every startup — rows already present in incidents_db are
|
|
skipped via INSERT OR IGNORE. Returns the count of rows migrated.
|
|
"""
|
|
src = sqlite3.connect(str(main_db), timeout=30.0)
|
|
src.row_factory = sqlite3.Row
|
|
dst = sqlite3.connect(str(incidents_db), timeout=30.0)
|
|
migrated = 0
|
|
for table in ("incidents", "received_bundles", "sent_bundles"):
|
|
try:
|
|
rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
|
|
except sqlite3.OperationalError:
|
|
continue
|
|
if not rows:
|
|
continue
|
|
cols = ", ".join(rows[0].keys())
|
|
placeholders = ", ".join("?" * len(rows[0].keys()))
|
|
dst.executemany(
|
|
f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608
|
|
[tuple(r) for r in rows],
|
|
)
|
|
migrated += len(rows)
|
|
dst.commit()
|
|
src.close()
|
|
dst.close()
|
|
return migrated
|
|
|
|
|
|
def _fingerprint(path: Path) -> tuple[float, int]:
|
|
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
|
st = path.stat()
|
|
return st.st_mtime, st.st_size
|
|
|
|
|
|
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
|
|
"""Return True only when the stored fingerprint exactly matches (mtime, size).
|
|
|
|
A smaller size (log rotation) or a larger size (new lines appended) both
|
|
return False so the caller re-gleams the file.
|
|
"""
|
|
row = conn.execute(
|
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
|
(str(path),),
|
|
).fetchone()
|
|
if row is None:
|
|
return False
|
|
return row[0] == mtime and row[1] == size
|
|
|
|
|
|
def _save_fingerprint(
|
|
conn: sqlite3.Connection,
|
|
path: Path,
|
|
mtime: float,
|
|
size: int,
|
|
gleaned_at: str,
|
|
) -> None:
|
|
"""Upsert the fingerprint for *path* after a successful glean."""
|
|
conn.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(str(path), mtime, size, gleaned_at),
|
|
)
|
|
|
|
|
|
def _detect_format(first_line: str) -> str:
|
|
try:
|
|
obj = json.loads(first_line)
|
|
if "__REALTIME_TIMESTAMP" in obj:
|
|
return "journald"
|
|
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
|
|
return "docker"
|
|
if wazuh.is_wazuh_alert(obj):
|
|
return "wazuh"
|
|
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
|
|
return "caddy"
|
|
except (json.JSONDecodeError, AttributeError):
|
|
pass
|
|
if plex.is_plex_log(first_line):
|
|
return "plex"
|
|
if qbittorrent.is_qbit_log(first_line):
|
|
return "qbittorrent"
|
|
if servarr.is_servarr_log(first_line):
|
|
return "servarr"
|
|
if dmesg_log.is_dmesg_log(first_line):
|
|
return "dmesg"
|
|
if syslog.is_syslog(first_line):
|
|
return "syslog"
|
|
return "plaintext"
|
|
|
|
|
|
def _parse_file(
|
|
path: Path,
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
source_id: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
source_id = source_id or path.stem
|
|
|
|
with path.open("r", errors="replace") as f:
|
|
lines = iter(f)
|
|
try:
|
|
first = next(lines)
|
|
except StopIteration:
|
|
return
|
|
|
|
fmt = _detect_format(first.strip())
|
|
logger.info("Detected format %r for %s", fmt, path.name)
|
|
|
|
def all_lines():
|
|
yield first
|
|
yield from lines
|
|
|
|
if fmt == "journald":
|
|
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "wazuh":
|
|
yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "docker":
|
|
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "caddy":
|
|
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "plex":
|
|
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "qbittorrent":
|
|
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "servarr":
|
|
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "dmesg":
|
|
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "syslog":
|
|
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
|
|
else:
|
|
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
|
|
|
|
|
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR IGNORE INTO log_entries
|
|
(id, source_id, sequence, timestamp_raw, timestamp_iso,
|
|
ingest_time, severity, repeat_count, out_of_order,
|
|
matched_patterns, text)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
e.entry_id, e.source_id, e.sequence,
|
|
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
|
|
e.severity, e.repeat_count, int(e.out_of_order),
|
|
json.dumps(list(e.matched_patterns)), e.text,
|
|
)
|
|
for e in batch
|
|
],
|
|
)
|
|
|
|
|
|
def _glean_files(
|
|
files: list[Path],
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
source_id_map: dict[Path, str] | None = None,
|
|
force: bool = False,
|
|
) -> dict[str, int]:
|
|
pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
patterns = load_patterns(pattern_file)
|
|
compiled = _compile(patterns)
|
|
ingest_time = now_iso()
|
|
source_id_map = source_id_map or {}
|
|
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
stats: dict[str, int] = {}
|
|
skipped: list[str] = []
|
|
|
|
for log_file in files:
|
|
source_id = source_id_map.get(log_file, log_file.stem)
|
|
|
|
# Fingerprint check — skip files whose mtime+size haven't changed.
|
|
mtime, size = _fingerprint(log_file)
|
|
if not force and _fp_unchanged(conn, log_file, mtime, size):
|
|
logger.debug("Skipping unchanged file: %s", log_file.name)
|
|
skipped.append(log_file.name)
|
|
stats[source_id] = stats.get(source_id, 0)
|
|
continue
|
|
|
|
count = 0
|
|
batch: list[RetrievedEntry] = []
|
|
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
|
batch.append(entry)
|
|
if len(batch) >= batch_size:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
batch.clear()
|
|
if batch:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
|
|
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
|
|
conn.commit()
|
|
|
|
stats[source_id] = stats.get(source_id, 0) + count
|
|
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
|
|
|
conn.close()
|
|
|
|
if skipped:
|
|
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
|
|
|
|
logger.info("Building FTS index...")
|
|
build_fts_index(db_path)
|
|
logger.info("FTS index ready")
|
|
|
|
return stats
|
|
|
|
|
|
def _stream_and_write(
|
|
transport: SSHTransport,
|
|
cmd: str,
|
|
parser,
|
|
source_id: str,
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
conn: sqlite3.Connection,
|
|
batch_size: int,
|
|
) -> int:
|
|
"""Stream *cmd* output through *parser* and write entries to *conn*.
|
|
|
|
Catches SSHCommandError per-item so one bad command doesn't abort the rest
|
|
of the glean items for this host. Returns the number of entries written.
|
|
"""
|
|
count = 0
|
|
batch: list[RetrievedEntry] = []
|
|
try:
|
|
for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time):
|
|
batch.append(entry)
|
|
if len(batch) >= batch_size:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
batch.clear()
|
|
if batch:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
except SSHCommandError as exc:
|
|
logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc)
|
|
logger.info("Gleaned %d entries from SSH source %s", count, source_id)
|
|
return count
|
|
|
|
|
|
def _glean_ssh_source(
|
|
src: dict, # type: ignore[type-arg]
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
conn: sqlite3.Connection,
|
|
batch_size: int,
|
|
) -> dict[str, int]:
|
|
"""Open one SSHTransport connection for *src* and glean all its glean items.
|
|
|
|
One SSH connection is shared across all items in the ``glean:`` list so
|
|
the handshake overhead is paid only once per host per glean run.
|
|
|
|
Returns a stats dict mapping ``{source_id: entry_count}`` for each item.
|
|
Gracefully skips the entire source on SSHConnectionError.
|
|
"""
|
|
host_id = src.get("id", src.get("host", "unknown"))
|
|
host = src["host"]
|
|
user = src["user"]
|
|
key_path = str(Path(src["key_path"]).expanduser())
|
|
port = int(src.get("port", 22))
|
|
glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg]
|
|
|
|
stats: dict[str, int] = {}
|
|
|
|
try:
|
|
with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t:
|
|
for item in glean_items:
|
|
item_type = item.get("type", "plaintext")
|
|
# Per-item source_id — falls back to host_id/type for un-labelled items
|
|
item_id = item.get("id") or f"{host_id}/{item_type}"
|
|
|
|
if item_type == "journald":
|
|
cmd = _build_journald_command(item)
|
|
count = _stream_and_write(
|
|
t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size
|
|
)
|
|
stats[item_id] = stats.get(item_id, 0) + count
|
|
|
|
elif item_type == "syslog":
|
|
cmd = _build_syslog_command(item)
|
|
count = _stream_and_write(
|
|
t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size
|
|
)
|
|
stats[item_id] = stats.get(item_id, 0) + count
|
|
|
|
elif item_type == "plaintext":
|
|
cmd = _build_plaintext_command(item)
|
|
count = _stream_and_write(
|
|
t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size
|
|
)
|
|
stats[item_id] = stats.get(item_id, 0) + count
|
|
|
|
elif item_type == "docker":
|
|
cmds = _build_docker_command(item)
|
|
if isinstance(cmds, str):
|
|
cmds = [cmds]
|
|
containers: list[str] = item.get("containers", [])
|
|
for i, cmd in enumerate(cmds):
|
|
# Use the container name as the final path segment when available
|
|
container_name = containers[i] if i < len(containers) else str(i)
|
|
container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id
|
|
count = _stream_and_write(
|
|
t, cmd, docker_log.parse, container_id,
|
|
compiled, ingest_time, conn, batch_size,
|
|
)
|
|
stats[container_id] = stats.get(container_id, 0) + count
|
|
|
|
else:
|
|
logger.warning(
|
|
"Unknown SSH glean type %r for source %r — skipping item",
|
|
item_type, host_id,
|
|
)
|
|
|
|
except SSHConnectionError as exc:
|
|
logger.warning("SSH connection failed for source %r: %s", host_id, exc)
|
|
|
|
return stats
|
|
|
|
|
|
def glean_ssh_source(
|
|
src: dict, # type: ignore[type-arg]
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
) -> dict[str, int]:
|
|
"""Glean a single SSH source dict and write results to *db_path*.
|
|
|
|
Public wrapper around :func:`_glean_ssh_source` for the REST layer.
|
|
Manages the DB connection, pattern compilation, and FTS rebuild so callers
|
|
don't have to deal with those lifecycle concerns.
|
|
|
|
Returns stats mapping ``{sub_source_id: entry_count}``.
|
|
"""
|
|
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
compiled = _compile(load_patterns(effective_pattern_file))
|
|
ingest_time = now_iso()
|
|
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
try:
|
|
stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
|
|
finally:
|
|
conn.close()
|
|
|
|
logger.info("Rebuilding FTS index after SSH source glean...")
|
|
build_fts_index(db_path)
|
|
return stats
|
|
|
|
|
|
def glean_dir(
|
|
corpus_dir: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
force: bool = False,
|
|
) -> dict[str, int]:
|
|
"""Glean all .jsonl and .log files from a corpus directory.
|
|
|
|
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
|
|
regardless of whether they have changed since the last run.
|
|
"""
|
|
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
|
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
|
|
|
|
|
|
def glean_file(
|
|
log_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
force: bool = False,
|
|
) -> dict[str, int]:
|
|
"""Glean a single log file (any supported format).
|
|
|
|
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
|
|
"""
|
|
return _glean_files([log_file], db_path, pattern_file, force=force)
|
|
|
|
|
|
def glean_sources(
|
|
sources_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
force: bool = False,
|
|
) -> dict[str, int]:
|
|
"""Glean all sources listed in a sources.yaml config file.
|
|
|
|
Supports two source types:
|
|
|
|
Local file sources (default):
|
|
sources:
|
|
- id: sonarr
|
|
path: /opt/sonarr/config/logs/sonarr.0.txt
|
|
|
|
SSH remote sources (transport: ssh):
|
|
sources:
|
|
- id: rack01
|
|
transport: ssh
|
|
host: 192.168.1.10
|
|
user: admin
|
|
key_path: ~/.ssh/id_ed25519
|
|
glean:
|
|
- type: journald
|
|
args: ["--since", "2 hours ago"]
|
|
- type: syslog
|
|
path: /var/log/syslog
|
|
- type: plaintext
|
|
path: /var/log/app/error.log
|
|
- type: docker
|
|
containers: [myapp, nginx]
|
|
|
|
Missing local paths and SSH connection failures are logged as warnings
|
|
so the cron keeps running when a source is temporarily down.
|
|
"""
|
|
with open(sources_file) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
local_sources: list[dict] = [] # type: ignore[type-arg]
|
|
ssh_sources: list[dict] = [] # type: ignore[type-arg]
|
|
|
|
for src in config.get("sources", []):
|
|
if src.get("transport") == "ssh":
|
|
ssh_sources.append(src)
|
|
else:
|
|
local_sources.append(src)
|
|
|
|
# ── Local file sources ─────────────────────────────────────────────────
|
|
files: list[Path] = []
|
|
source_id_map: dict[Path, str] = {}
|
|
|
|
for src in local_sources:
|
|
path = Path(src["path"])
|
|
if not path.exists():
|
|
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
|
|
continue
|
|
files.append(path)
|
|
if "id" in src:
|
|
source_id_map[path] = src["id"]
|
|
|
|
if not files and not ssh_sources:
|
|
logger.warning("No sources found — check sources.yaml paths")
|
|
return {}
|
|
|
|
stats: dict[str, int] = {}
|
|
if files:
|
|
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
|
|
|
|
# ── SSH remote sources ─────────────────────────────────────────────────
|
|
if not ssh_sources:
|
|
return stats
|
|
|
|
# Compile patterns once, share across all SSH sources in this run.
|
|
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
compiled = _compile(load_patterns(effective_pattern_file))
|
|
ingest_time = now_iso()
|
|
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
try:
|
|
for src in ssh_sources:
|
|
ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
|
|
for k, v in ssh_stats.items():
|
|
stats[k] = stats.get(k, 0) + v
|
|
finally:
|
|
conn.close()
|
|
|
|
# Rebuild FTS only when SSH sources added entries (_glean_files already
|
|
# rebuilds when local sources are present; safe to call again if both ran).
|
|
if ssh_sources:
|
|
logger.info("Rebuilding FTS index after SSH glean...")
|
|
build_fts_index(db_path)
|
|
|
|
return stats
|