Closes turnstone#22.
## Transport layer (app/glean/ssh.py)
- SSHTransport context manager: key-only auth, paramiko backend
- SSHConnectionError / SSHCommandError exception hierarchy
- exec_stream() generator: yields stdout lines, raises SSHCommandError on
non-zero exit (isinstance(int) guard for test-mock safety)
- Command builders: _build_journald_command, _build_syslog_command,
_build_plaintext_command, _build_docker_command
- 18 unit tests in tests/test_glean_ssh.py
## Pipeline integration (app/glean/pipeline.py)
- _stream_and_write(): per-item error isolation — SSHCommandError skips
one glean item without aborting the rest of the host connection
- _glean_ssh_source(): one SSHTransport per host, dispatches all glean
items (journald/syslog/plaintext/docker); SSHConnectionError aborts host
- glean_sources(): splits local vs SSH sources; local → _glean_files();
SSH → _glean_ssh_source(); shared compiled patterns and DB connection
- glean_ssh_source(): public wrapper for REST use — manages DB connection,
pattern compilation, FTS rebuild lifecycle
- 15 integration tests in tests/test_glean_pipeline_ssh.py
- All 285 tests passing
## REST layer (app/rest.py)
- GET /api/sources/configured: reads sources.yaml and enriches with DB
stats; SSH sources appear before first glean (entry_count=0); sub-source
IDs (rack01/journald, rack01/docker/myapp) aggregated per host entry
- POST /api/sources/{id}/glean: detects transport:ssh and dispatches to
glean_ssh_source() wrapper; local sources unchanged
- Import: glean_ssh_source as _glean_ssh_source
## Frontend (web/src/views/SourcesView.vue)
- Fetches /api/sources/configured (primary) + /api/sources (DB-only) in
parallel; merges into unified SourceRow list
- SSH sources show: ssh badge (with user@host tooltip), glean-type pills
(journald/syslog/docker/etc.), host subtitle
- SSH sub-source IDs (rack01/journald) suppressed from the DB-only list
since they are covered by the parent SSH row
- DB-only sources (uploads) appear below configured sources with 'uploaded'
badge; reglean button disabled (not in sources.yaml)
- Delete zeroes out configured-source stats in-place rather than removing
the row (so the source remains visible for re-gleaning)
543 lines
19 KiB
Python
543 lines
19 KiB
Python
"""Glean pipeline: auto-detect format, parse, write to SQLite."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import yaml
|
|
|
|
from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh
|
|
from app.glean.base import _compile, load_patterns, now_iso
|
|
from app.glean.ssh import (
|
|
SSHTransport,
|
|
SSHConnectionError,
|
|
SSHCommandError,
|
|
_build_docker_command,
|
|
_build_journald_command,
|
|
_build_plaintext_command,
|
|
_build_syslog_command,
|
|
)
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
from app.services.search import build_fts_index
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS log_entries (
|
|
id TEXT PRIMARY KEY,
|
|
source_id TEXT NOT NULL,
|
|
sequence INTEGER NOT NULL,
|
|
timestamp_raw TEXT,
|
|
timestamp_iso TEXT,
|
|
ingest_time TEXT NOT NULL,
|
|
severity TEXT,
|
|
repeat_count INTEGER DEFAULT 1,
|
|
out_of_order INTEGER DEFAULT 0,
|
|
matched_patterns TEXT DEFAULT '[]',
|
|
text TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
|
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
|
|
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
|
|
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
|
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
|
|
|
CREATE TABLE IF NOT EXISTS incidents (
|
|
id TEXT PRIMARY KEY,
|
|
label TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
started_at TEXT,
|
|
ended_at TEXT,
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium'
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
|
|
|
CREATE TABLE IF NOT EXISTS received_bundles (
|
|
id TEXT PRIMARY KEY,
|
|
source_host TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
label TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium',
|
|
started_at TEXT,
|
|
bundled_at TEXT NOT NULL,
|
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
|
bundle_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
|
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_facts (
|
|
id TEXT PRIMARY KEY,
|
|
category TEXT NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
source TEXT,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_documents (
|
|
id TEXT PRIMARY KEY,
|
|
filename TEXT NOT NULL,
|
|
doc_type TEXT NOT NULL,
|
|
full_text TEXT NOT NULL,
|
|
file_size INTEGER,
|
|
uploaded_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_chunks (
|
|
id TEXT PRIMARY KEY,
|
|
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
|
|
chunk_index INTEGER NOT NULL,
|
|
text TEXT NOT NULL,
|
|
embedding BLOB
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS blocklist_candidates (
|
|
id TEXT PRIMARY KEY,
|
|
domain_or_ip TEXT NOT NULL,
|
|
source_device_ip TEXT,
|
|
source_device_name TEXT,
|
|
first_seen TEXT NOT NULL,
|
|
last_seen TEXT NOT NULL,
|
|
hit_count INTEGER DEFAULT 1,
|
|
status TEXT DEFAULT 'pending',
|
|
pushed_at TEXT,
|
|
log_evidence TEXT DEFAULT '[]',
|
|
matched_rule TEXT,
|
|
llm_score REAL,
|
|
llm_reason TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
|
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
|
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
|
"""
|
|
|
|
|
|
def ensure_schema(db_path: Path) -> None:
|
|
"""Create all tables and apply additive migrations. Safe to call on every startup."""
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
# Additive column migrations — ALTER TABLE silently skips if column exists
|
|
for stmt in [
|
|
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _detect_format(first_line: str) -> str:
|
|
try:
|
|
obj = json.loads(first_line)
|
|
if "__REALTIME_TIMESTAMP" in obj:
|
|
return "journald"
|
|
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
|
|
return "docker"
|
|
if wazuh.is_wazuh_alert(obj):
|
|
return "wazuh"
|
|
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
|
|
return "caddy"
|
|
except (json.JSONDecodeError, AttributeError):
|
|
pass
|
|
if plex.is_plex_log(first_line):
|
|
return "plex"
|
|
if qbittorrent.is_qbit_log(first_line):
|
|
return "qbittorrent"
|
|
if servarr.is_servarr_log(first_line):
|
|
return "servarr"
|
|
if dmesg_log.is_dmesg_log(first_line):
|
|
return "dmesg"
|
|
if syslog.is_syslog(first_line):
|
|
return "syslog"
|
|
return "plaintext"
|
|
|
|
|
|
def _parse_file(
|
|
path: Path,
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
source_id: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
source_id = source_id or path.stem
|
|
|
|
with path.open("r", errors="replace") as f:
|
|
lines = iter(f)
|
|
try:
|
|
first = next(lines)
|
|
except StopIteration:
|
|
return
|
|
|
|
fmt = _detect_format(first.strip())
|
|
logger.info("Detected format %r for %s", fmt, path.name)
|
|
|
|
def all_lines():
|
|
yield first
|
|
yield from lines
|
|
|
|
if fmt == "journald":
|
|
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "wazuh":
|
|
yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "docker":
|
|
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "caddy":
|
|
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "plex":
|
|
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "qbittorrent":
|
|
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "servarr":
|
|
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "dmesg":
|
|
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "syslog":
|
|
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
|
|
else:
|
|
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
|
|
|
|
|
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR IGNORE INTO log_entries
|
|
(id, source_id, sequence, timestamp_raw, timestamp_iso,
|
|
ingest_time, severity, repeat_count, out_of_order,
|
|
matched_patterns, text)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
e.entry_id, e.source_id, e.sequence,
|
|
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
|
|
e.severity, e.repeat_count, int(e.out_of_order),
|
|
json.dumps(list(e.matched_patterns)), e.text,
|
|
)
|
|
for e in batch
|
|
],
|
|
)
|
|
|
|
|
|
def _glean_files(
|
|
files: list[Path],
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
source_id_map: dict[Path, str] | None = None,
|
|
) -> dict[str, int]:
|
|
pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
patterns = load_patterns(pattern_file)
|
|
compiled = _compile(patterns)
|
|
ingest_time = now_iso()
|
|
source_id_map = source_id_map or {}
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
stats: dict[str, int] = {}
|
|
|
|
for log_file in files:
|
|
source_id = source_id_map.get(log_file, log_file.stem)
|
|
count = 0
|
|
batch: list[RetrievedEntry] = []
|
|
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
|
batch.append(entry)
|
|
if len(batch) >= batch_size:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
batch.clear()
|
|
if batch:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
stats[source_id] = stats.get(source_id, 0) + count
|
|
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
|
|
|
conn.close()
|
|
|
|
logger.info("Building FTS index...")
|
|
build_fts_index(db_path)
|
|
logger.info("FTS index ready")
|
|
|
|
return stats
|
|
|
|
|
|
def _stream_and_write(
|
|
transport: SSHTransport,
|
|
cmd: str,
|
|
parser,
|
|
source_id: str,
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
conn: sqlite3.Connection,
|
|
batch_size: int,
|
|
) -> int:
|
|
"""Stream *cmd* output through *parser* and write entries to *conn*.
|
|
|
|
Catches SSHCommandError per-item so one bad command doesn't abort the rest
|
|
of the glean items for this host. Returns the number of entries written.
|
|
"""
|
|
count = 0
|
|
batch: list[RetrievedEntry] = []
|
|
try:
|
|
for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time):
|
|
batch.append(entry)
|
|
if len(batch) >= batch_size:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
batch.clear()
|
|
if batch:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
except SSHCommandError as exc:
|
|
logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc)
|
|
logger.info("Gleaned %d entries from SSH source %s", count, source_id)
|
|
return count
|
|
|
|
|
|
def _glean_ssh_source(
|
|
src: dict, # type: ignore[type-arg]
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
conn: sqlite3.Connection,
|
|
batch_size: int,
|
|
) -> dict[str, int]:
|
|
"""Open one SSHTransport connection for *src* and glean all its glean items.
|
|
|
|
One SSH connection is shared across all items in the ``glean:`` list so
|
|
the handshake overhead is paid only once per host per glean run.
|
|
|
|
Returns a stats dict mapping ``{source_id: entry_count}`` for each item.
|
|
Gracefully skips the entire source on SSHConnectionError.
|
|
"""
|
|
host_id = src.get("id", src.get("host", "unknown"))
|
|
host = src["host"]
|
|
user = src["user"]
|
|
key_path = str(Path(src["key_path"]).expanduser())
|
|
port = int(src.get("port", 22))
|
|
glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg]
|
|
|
|
stats: dict[str, int] = {}
|
|
|
|
try:
|
|
with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t:
|
|
for item in glean_items:
|
|
item_type = item.get("type", "plaintext")
|
|
# Per-item source_id — falls back to host_id/type for un-labelled items
|
|
item_id = item.get("id") or f"{host_id}/{item_type}"
|
|
|
|
if item_type == "journald":
|
|
cmd = _build_journald_command(item)
|
|
count = _stream_and_write(
|
|
t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size
|
|
)
|
|
stats[item_id] = stats.get(item_id, 0) + count
|
|
|
|
elif item_type == "syslog":
|
|
cmd = _build_syslog_command(item)
|
|
count = _stream_and_write(
|
|
t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size
|
|
)
|
|
stats[item_id] = stats.get(item_id, 0) + count
|
|
|
|
elif item_type == "plaintext":
|
|
cmd = _build_plaintext_command(item)
|
|
count = _stream_and_write(
|
|
t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size
|
|
)
|
|
stats[item_id] = stats.get(item_id, 0) + count
|
|
|
|
elif item_type == "docker":
|
|
cmds = _build_docker_command(item)
|
|
if isinstance(cmds, str):
|
|
cmds = [cmds]
|
|
containers: list[str] = item.get("containers", [])
|
|
for i, cmd in enumerate(cmds):
|
|
# Use the container name as the final path segment when available
|
|
container_name = containers[i] if i < len(containers) else str(i)
|
|
container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id
|
|
count = _stream_and_write(
|
|
t, cmd, docker_log.parse, container_id,
|
|
compiled, ingest_time, conn, batch_size,
|
|
)
|
|
stats[container_id] = stats.get(container_id, 0) + count
|
|
|
|
else:
|
|
logger.warning(
|
|
"Unknown SSH glean type %r for source %r — skipping item",
|
|
item_type, host_id,
|
|
)
|
|
|
|
except SSHConnectionError as exc:
|
|
logger.warning("SSH connection failed for source %r: %s", host_id, exc)
|
|
|
|
return stats
|
|
|
|
|
|
def glean_ssh_source(
|
|
src: dict, # type: ignore[type-arg]
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
) -> dict[str, int]:
|
|
"""Glean a single SSH source dict and write results to *db_path*.
|
|
|
|
Public wrapper around :func:`_glean_ssh_source` for the REST layer.
|
|
Manages the DB connection, pattern compilation, and FTS rebuild so callers
|
|
don't have to deal with those lifecycle concerns.
|
|
|
|
Returns stats mapping ``{sub_source_id: entry_count}``.
|
|
"""
|
|
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
compiled = _compile(load_patterns(effective_pattern_file))
|
|
ingest_time = now_iso()
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
try:
|
|
stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
|
|
finally:
|
|
conn.close()
|
|
|
|
logger.info("Rebuilding FTS index after SSH source glean...")
|
|
build_fts_index(db_path)
|
|
return stats
|
|
|
|
|
|
def glean_dir(
|
|
corpus_dir: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
) -> dict[str, int]:
|
|
"""Glean all .jsonl and .log files from a corpus directory."""
|
|
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
|
return _glean_files(files, db_path, pattern_file, batch_size)
|
|
|
|
|
|
def glean_file(
|
|
log_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
) -> dict[str, int]:
|
|
"""Glean a single log file (any supported format)."""
|
|
return _glean_files([log_file], db_path, pattern_file)
|
|
|
|
|
|
def glean_sources(
|
|
sources_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
) -> dict[str, int]:
|
|
"""Glean all sources listed in a sources.yaml config file.
|
|
|
|
Supports two source types:
|
|
|
|
Local file sources (default):
|
|
sources:
|
|
- id: sonarr
|
|
path: /opt/sonarr/config/logs/sonarr.0.txt
|
|
|
|
SSH remote sources (transport: ssh):
|
|
sources:
|
|
- id: rack01
|
|
transport: ssh
|
|
host: 192.168.1.10
|
|
user: admin
|
|
key_path: ~/.ssh/id_ed25519
|
|
glean:
|
|
- type: journald
|
|
args: ["--since", "2 hours ago"]
|
|
- type: syslog
|
|
path: /var/log/syslog
|
|
- type: plaintext
|
|
path: /var/log/app/error.log
|
|
- type: docker
|
|
containers: [myapp, nginx]
|
|
|
|
Missing local paths and SSH connection failures are logged as warnings
|
|
so the cron keeps running when a source is temporarily down.
|
|
"""
|
|
with open(sources_file) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
local_sources: list[dict] = [] # type: ignore[type-arg]
|
|
ssh_sources: list[dict] = [] # type: ignore[type-arg]
|
|
|
|
for src in config.get("sources", []):
|
|
if src.get("transport") == "ssh":
|
|
ssh_sources.append(src)
|
|
else:
|
|
local_sources.append(src)
|
|
|
|
# ── Local file sources ─────────────────────────────────────────────────
|
|
files: list[Path] = []
|
|
source_id_map: dict[Path, str] = {}
|
|
|
|
for src in local_sources:
|
|
path = Path(src["path"])
|
|
if not path.exists():
|
|
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
|
|
continue
|
|
files.append(path)
|
|
if "id" in src:
|
|
source_id_map[path] = src["id"]
|
|
|
|
if not files and not ssh_sources:
|
|
logger.warning("No sources found — check sources.yaml paths")
|
|
return {}
|
|
|
|
stats: dict[str, int] = {}
|
|
if files:
|
|
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map))
|
|
|
|
# ── SSH remote sources ─────────────────────────────────────────────────
|
|
if not ssh_sources:
|
|
return stats
|
|
|
|
# Compile patterns once, share across all SSH sources in this run.
|
|
effective_pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
compiled = _compile(load_patterns(effective_pattern_file))
|
|
ingest_time = now_iso()
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
try:
|
|
for src in ssh_sources:
|
|
ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size)
|
|
for k, v in ssh_stats.items():
|
|
stats[k] = stats.get(k, 0) + v
|
|
finally:
|
|
conn.close()
|
|
|
|
# Rebuild FTS only when SSH sources added entries (_glean_files already
|
|
# rebuilds when local sources are present; safe to call again if both ran).
|
|
if ssh_sources:
|
|
logger.info("Rebuilding FTS index after SSH glean...")
|
|
build_fts_index(db_path)
|
|
|
|
return stats
|