From e746d55730d3956deeb4ef871e4289157f2a2c40 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Thu, 21 May 2026 12:37:30 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20SSH=20remote=20glean=20=E2=80=94=20tran?= =?UTF-8?q?sport=20layer,=20pipeline=20integration,=20REST=20+=20UI=20(#22?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes turnstone#22. ## Transport layer (app/glean/ssh.py) - SSHTransport context manager: key-only auth, paramiko backend - SSHConnectionError / SSHCommandError exception hierarchy - exec_stream() generator: yields stdout lines, raises SSHCommandError on non-zero exit (isinstance(int) guard for test-mock safety) - Command builders: _build_journald_command, _build_syslog_command, _build_plaintext_command, _build_docker_command - 18 unit tests in tests/test_glean_ssh.py ## Pipeline integration (app/glean/pipeline.py) - _stream_and_write(): per-item error isolation — SSHCommandError skips one glean item without aborting the rest of the host connection - _glean_ssh_source(): one SSHTransport per host, dispatches all glean items (journald/syslog/plaintext/docker); SSHConnectionError aborts host - glean_sources(): splits local vs SSH sources; local → _glean_files(); SSH → _glean_ssh_source(); shared compiled patterns and DB connection - glean_ssh_source(): public wrapper for REST use — manages DB connection, pattern compilation, FTS rebuild lifecycle - 15 integration tests in tests/test_glean_pipeline_ssh.py - All 285 tests passing ## REST layer (app/rest.py) - GET /api/sources/configured: reads sources.yaml and enriches with DB stats; SSH sources appear before first glean (entry_count=0); sub-source IDs (rack01/journald, rack01/docker/myapp) aggregated per host entry - POST /api/sources/{id}/glean: detects transport:ssh and dispatches to glean_ssh_source() wrapper; local sources unchanged - Import: glean_ssh_source as _glean_ssh_source ## Frontend (web/src/views/SourcesView.vue) - Fetches /api/sources/configured (primary) + /api/sources (DB-only) in parallel; merges into unified SourceRow list - SSH sources show: ssh badge (with user@host tooltip), glean-type pills (journald/syslog/docker/etc.), host subtitle - SSH sub-source IDs (rack01/journald) suppressed from the DB-only list since they are covered by the parent SSH row - DB-only sources (uploads) appear below configured sources with 'uploaded' badge; reglean button disabled (not in sources.yaml) - Delete zeroes out configured-source stats in-place rather than removing the row (so the source remains visible for re-gleaning) --- app/glean/pipeline.py | 33 ++++++++ app/rest.py | 87 +++++++++++++++++++- web/src/views/SourcesView.vue | 151 ++++++++++++++++++++++++++++++---- 3 files changed, 251 insertions(+), 20 deletions(-) diff --git a/app/glean/pipeline.py b/app/glean/pipeline.py index f842e28..ce7bb43 100644 --- a/app/glean/pipeline.py +++ b/app/glean/pipeline.py @@ -391,6 +391,39 @@ def _glean_ssh_source( return stats +def glean_ssh_source( + src: dict, # type: ignore[type-arg] + db_path: Path, + pattern_file: Path | None = None, + batch_size: int = 1000, +) -> dict[str, int]: + """Glean a single SSH source dict and write results to *db_path*. + + Public wrapper around :func:`_glean_ssh_source` for the REST layer. + Manages the DB connection, pattern compilation, and FTS rebuild so callers + don't have to deal with those lifecycle concerns. + + Returns stats mapping ``{sub_source_id: entry_count}``. + """ + effective_pattern_file = pattern_file or Path("patterns/default.yaml") + compiled = _compile(load_patterns(effective_pattern_file)) + ingest_time = now_iso() + + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(_SCHEMA) + conn.commit() + + try: + stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) + finally: + conn.close() + + logger.info("Rebuilding FTS index after SSH source glean...") + build_fts_index(db_path) + return stats + + def glean_dir( corpus_dir: Path, db_path: Path, diff --git a/app/rest.py b/app/rest.py index ff47979..e5cdb1c 100644 --- a/app/rest.py +++ b/app/rest.py @@ -27,7 +27,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel -from app.glean.pipeline import ensure_schema, glean_file as _glean_file +from app.glean.pipeline import ensure_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source from app.glean.base import load_compiled_patterns, now_iso from app.glean.tautulli import parse_webhook as _parse_tautulli from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh @@ -433,6 +433,72 @@ def list_sources() -> dict: return {"sources": _list_sources(DB_PATH)} +@router.get("/api/sources/configured") +def list_configured_sources() -> dict: + """Return every source in sources.yaml, enriched with DB stats. + + Unlike ``/api/sources`` (which is DB-only), this endpoint reads sources.yaml + so SSH sources appear even before their first successful glean. DB entry + counts, error counts, and timestamps are aggregated and merged in. + + For SSH sources, sub-source IDs (e.g. ``rack01/journald``) are summed to + produce a single aggregate stat row for the top-level host entry. + """ + sources_file = PATTERN_DIR / "sources.yaml" + if not sources_file.exists(): + return {"sources": []} + + with open(sources_file) as f: + config = yaml.safe_load(f) or {} + + # Fetch all DB source stats once; key by source_id for O(1) lookup. + db_stats: dict[str, dict] = {} + try: + for row in _list_sources(DB_PATH): + db_stats[row["source_id"]] = row + except Exception: + pass # DB may not exist on first run + + result = [] + for src in config.get("sources", []): + transport = src.get("transport", "local") + src_id = src.get("id", "") + + entry: dict = {"id": src_id, "transport": transport} + + if transport != "ssh": + entry["path"] = src.get("path", "") + db = db_stats.get(src_id, {}) + entry["entry_count"] = db.get("entry_count", 0) + entry["error_count"] = db.get("error_count", 0) + entry["earliest"] = db.get("earliest") + entry["latest"] = db.get("latest") + else: + entry["host"] = src.get("host", "") + entry["user"] = src.get("user", "") + glean_items: list[dict] = src.get("glean", []) + entry["glean_types"] = sorted({item.get("type", "plaintext") for item in glean_items}) + entry["glean_items"] = glean_items + + # Aggregate sub-source DB rows that belong to this SSH host. + # Sub-sources use IDs like "{host_id}/{type}" or "{host_id}/{type}/{container}". + prefix = src_id + "/" + matching_rows = [ + v for k, v in db_stats.items() + if k.startswith(prefix) or k == src_id + ] + entry["entry_count"] = sum(r.get("entry_count", 0) for r in matching_rows) + entry["error_count"] = sum(r.get("error_count", 0) for r in matching_rows) + earliests = [r["earliest"] for r in matching_rows if r.get("earliest")] + latests = [r["latest"] for r in matching_rows if r.get("latest")] + entry["earliest"] = min(earliests) if earliests else None + entry["latest"] = max(latests) if latests else None + + result.append(entry) + + return {"sources": result} + + @router.delete("/api/sources/{source_id}") def delete_source(source_id: str) -> dict: """Delete all log entries (and FTS index rows) for a given source.""" @@ -450,7 +516,13 @@ def delete_source(source_id: str) -> dict: @router.post("/api/sources/{source_id}/glean") def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict: - """Trigger a re-glean for a configured source from sources.yaml.""" + """Trigger a re-glean for a configured source from sources.yaml. + + Handles both local file sources and SSH remote sources. For SSH sources, + the glean runs in the foreground and rebuilds the FTS index before returning + (same behaviour as local sources — callers can rely on the count being final + when the response arrives). + """ sources_file = PATTERN_DIR / "sources.yaml" if not sources_file.exists(): raise HTTPException(status_code=404, detail="sources.yaml not found") @@ -459,7 +531,16 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict: matching = [s for s in config.get("sources", []) if s.get("id") == source_id] if not matching: raise HTTPException(status_code=404, detail=f"Source {source_id!r} not in sources.yaml") - src_path = Path(matching[0]["path"]) + + src = matching[0] + + if src.get("transport") == "ssh": + # SSH sources: open connection, glean all items, rebuild FTS inline. + stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE) + return {"source_id": source_id, "gleaned": sum(stats.values())} + + # Local file source. + src_path = Path(src["path"]) if not src_path.exists(): raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}") stats = _glean_file(src_path, DB_PATH, PATTERN_FILE) diff --git a/web/src/views/SourcesView.vue b/web/src/views/SourcesView.vue index 7290956..599fce9 100644 --- a/web/src/views/SourcesView.vue +++ b/web/src/views/SourcesView.vue @@ -26,7 +26,7 @@
- +
@@ -40,29 +40,72 @@ - - + + + + + + + + + +
Source
{{ src.source_id }}{{ src.entry_count.toLocaleString() }} +
+ {{ src.id }} + + + + ssh + + + {{ gtype }} + + uploaded +
+ +
+ {{ src.user }}@{{ src.host }} +
+
+ {{ src.entry_count.toLocaleString() }} + {{ src.error_count.toLocaleString() }} {{ formatTs(src.earliest) }} {{ formatTs(src.latest) }}
+ :title="src.dbOnly ? 'Not in sources.yaml — cannot re-glean' : 'Re-glean from sources.yaml'" + >{{ busy.has(src.id) ? '…' : 'reglean' }} @@ -78,9 +121,36 @@