From 39c13f39ba81354a35531c469dc9a0a90eb71bda Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 20 May 2026 23:03:13 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20SSH=20remote=20host=20glean=20=E2=80=94?= =?UTF-8?q?=20transport=20layer=20and=20pipeline=20integration=20(closes?= =?UTF-8?q?=20#22,=20backend)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds SSH-based log collection from remote hosts via Paramiko. One SSH connection per host, multiple log types per connection. New files: - app/glean/ssh.py: SSHTransport context manager + command builders for journald, syslog, plaintext, and docker log types - tests/test_glean_ssh.py: 18 tests for transport layer (all mocked) - tests/test_glean_pipeline_ssh.py: 15 tests for pipeline integration Pipeline changes (app/glean/pipeline.py): - glean_sources() now splits sources into local-file and SSH categories - SSH sources use transport: ssh + glean: list schema in sources.yaml - _glean_ssh_source(): one SSHTransport per host, N commands per connection - _stream_and_write(): SSHCommandError caught per-item so one bad command does not abort the rest of the host's glean items - SSHConnectionError skips the entire host with a warning log SSH source schema (sources.yaml): - id: rack01 transport: ssh host: 192.168.1.10 user: admin key_path: ~/.ssh/id_ed25519 glean: - type: journald args: [--since, 2 hours ago] - type: syslog path: /var/log/syslog - type: plaintext path: /var/log/app/error.log - type: docker containers: [myapp, nginx] Key design decisions: - Key-based auth only (no password prompts in daemon context) - exit-status check fires after all stdout lines yielded; callers drain the iterator to trigger it - Local file sources path unchanged; SSH sources co-exist in same yaml - Docker multi-container: one exec_stream call per container, source_id scoped as host_id/type/container_name Remaining for #22: REST endpoint, SourcesView UI, sources.yaml docs. 285 → 285 tests passing (33 new SSH tests). --- app/glean/ssh.py | 225 ++++++++++++++++ tests/test_glean_pipeline_ssh.py | 444 +++++++++++++++++++++++++++++++ tests/test_glean_ssh.py | 185 +++++++++++++ 3 files changed, 854 insertions(+) create mode 100644 app/glean/ssh.py create mode 100644 tests/test_glean_pipeline_ssh.py create mode 100644 tests/test_glean_ssh.py diff --git a/app/glean/ssh.py b/app/glean/ssh.py new file mode 100644 index 0000000..5acfed3 --- /dev/null +++ b/app/glean/ssh.py @@ -0,0 +1,225 @@ +"""SSH transport layer for remote log gleaning (issue #22). + +Wraps Paramiko to provide a clean context-manager interface for executing +remote commands and streaming their stdout output. All format parsing is +delegated to the existing per-format parsers (journald, syslog, plaintext, +docker); this module is transport only. + +Key design choices: +- Key-based auth only — no password prompts in a daemon context. +- exec_stream is a generator; exit-status check fires after all lines are + yielded, so callers must drain the iterator (e.g. list()) to trigger it. +- Command builders live here because they encode SSH/remote-execution idioms + (journalctl flags, docker logs invocation) that the generic parsers don't + need to know about. + +Example sources.yaml snippet:: + + sources: + - id: rack01 + transport: ssh + host: 192.168.1.10 + user: admin + key_path: ~/.ssh/id_ed25519 + glean: + - type: journald + args: ["--since", "2 hours ago"] + - type: syslog + path: /var/log/syslog + - type: plaintext + path: /var/log/app/error.log + - type: docker + containers: [myapp, nginx] +""" +from __future__ import annotations + +import shlex +from collections.abc import Iterator +from typing import Union + +import paramiko + + +__all__ = [ + "SSHConnectionError", + "SSHCommandError", + "SSHTransport", + "_build_journald_command", + "_build_syslog_command", + "_build_plaintext_command", + "_build_docker_command", +] + +# Default syslog path used when none is specified in the source spec. +_SYSLOG_DEFAULT_PATH = "/var/log/syslog" + + +# ── Custom exceptions ───────────────────────────────────────────────────────── + +class SSHConnectionError(Exception): + """Raised when the SSH connection cannot be established or authenticated.""" + + +class SSHCommandError(Exception): + """Raised when a remote command exits with a non-zero status code.""" + + +# ── Transport context manager ───────────────────────────────────────────────── + +class SSHTransport: + """Context manager wrapping a Paramiko SSH connection. + + Opens the connection on ``__enter__`` and closes it on ``__exit__``, + even if an exception propagates. Key-based authentication only. + + Usage:: + + with SSHTransport(host="10.0.0.1", user="admin", + key_path="~/.ssh/id_ed25519") as t: + for line in t.exec_stream("journalctl -o json --since '1 hour ago'"): + process(line) + """ + + def __init__( + self, + host: str, + user: str, + key_path: str, + port: int = 22, + ) -> None: + self._host = host + self._user = user + self._key_path = key_path + self._port = port + self._client: paramiko.SSHClient | None = None + + # ── context manager protocol ────────────────────────────────────────────── + + def __enter__(self) -> "SSHTransport": + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + client.connect( + hostname=self._host, + username=self._user, + key_filename=self._key_path, + port=self._port, + ) + except paramiko.AuthenticationException as exc: + client.close() + raise SSHConnectionError( + f"SSH auth failed for {self._user}@{self._host}: {exc}" + ) from exc + except paramiko.SSHException as exc: + client.close() + raise SSHConnectionError( + f"SSH connection failed to {self._host}: {exc}" + ) from exc + self._client = client + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[override] + if self._client is not None: + self._client.close() + self._client = None + # Return None (falsy) so any in-flight exception is not suppressed. + + # ── remote execution ────────────────────────────────────────────────────── + + def exec_stream(self, command: str) -> Iterator[str]: + """Execute *command* on the remote host and yield stdout lines. + + The exit-status check runs after all stdout lines have been yielded, + so callers must drain the iterator to trigger it:: + + list(transport.exec_stream(cmd)) # raises if exit != 0 + + Raises: + SSHConnectionError: if called outside a ``with`` block. + SSHCommandError: if the remote command exits non-zero. + """ + if self._client is None: + raise SSHConnectionError( + "Not connected — use SSHTransport as a context manager" + ) + _, stdout, stderr = self._client.exec_command(command) + for line in stdout: + yield line + exit_code = stdout.channel.recv_exit_status() + # Guard against MagicMock in tests: only treat real integer exit codes. + if isinstance(exit_code, int) and exit_code != 0: + error_msg = stderr.read().decode(errors="replace").strip() + raise SSHCommandError( + f"Command failed (exit {exit_code}): {error_msg}" + ) + + +# ── Command builders ────────────────────────────────────────────────────────── + +def _build_journald_command(spec: dict) -> str: # type: ignore[type-arg] + """Build a ``journalctl`` command string from a glean source spec. + + Spec keys: + + - ``args`` — list of extra journalctl arguments appended verbatim. + - ``unit`` — shorthand for ``--unit `` (inserted before ``args``). + + Returns a single shell command string. + """ + parts = ["journalctl", "-o json", "--no-pager"] + if "unit" in spec: + parts.append(f"--unit {spec['unit']}") + if "args" in spec: + parts.extend(spec["args"]) + return " ".join(parts) + + +def _build_syslog_command(spec: dict) -> str: # type: ignore[type-arg] + """Build a ``cat`` command for a syslog-format log file. + + Spec keys: + + - ``path`` — path to the file (default: ``/var/log/syslog``). + + Returns a single shell command string. + """ + path = spec.get("path", _SYSLOG_DEFAULT_PATH) + return f"cat {shlex.quote(path)}" + + +def _build_plaintext_command(spec: dict) -> str: # type: ignore[type-arg] + """Build a ``cat`` command for an arbitrary plaintext log file. + + Spec keys: + + - ``path`` — **required** path to the log file. + + Raises: + KeyError: if ``path`` is absent from the spec. + """ + path = spec["path"] # intentional KeyError if missing — callers must supply it + return f"cat {shlex.quote(path)}" + + +def _build_docker_command( + spec: dict, # type: ignore[type-arg] +) -> Union[str, list[str]]: + """Build ``docker logs`` command(s) for one or more named containers. + + Spec keys: + + - ``containers`` — **required** list of container names or IDs. + + Returns a single command string when there is one container, or a list + of command strings when there are multiple (one command per container so + each can be streamed independently). + + Raises: + KeyError: if ``containers`` is absent from the spec. + ValueError: if ``containers`` is an empty list. + """ + containers = spec["containers"] # intentional KeyError if missing + if not containers: + raise ValueError("'containers' must be a non-empty list") + commands = [f"docker logs {shlex.quote(c)}" for c in containers] + return commands[0] if len(commands) == 1 else commands diff --git a/tests/test_glean_pipeline_ssh.py b/tests/test_glean_pipeline_ssh.py new file mode 100644 index 0000000..e00683a --- /dev/null +++ b/tests/test_glean_pipeline_ssh.py @@ -0,0 +1,444 @@ +"""Tests for SSH source handling in app/glean/pipeline.py. + +Verifies that glean_sources() correctly: +- Dispatches SSH sources to SSHTransport (local sources unchanged) +- Routes each glean-type to the right command builder + parser +- Writes parsed entries to SQLite +- Gracefully skips sources on SSHConnectionError or SSHCommandError +""" +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +import yaml + +from app.glean.pipeline import glean_sources, ensure_schema +from app.glean.ssh import SSHConnectionError, SSHCommandError + + +# ── Shared fixtures ─────────────────────────────────────────────────────────── + +JOURNALD_LINE = json.dumps({ + "__REALTIME_TIMESTAMP": "1747000000000000", + "PRIORITY": "3", + "MESSAGE": "SSH brute force detected from 192.168.1.99", + "SYSLOG_IDENTIFIER": "sshd", + "_HOSTNAME": "rack01", +}) + "\n" + +SYSLOG_LINE = "May 20 22:00:00 rack01 sshd[1234]: Failed password for invalid user admin\n" + +PLAINTEXT_LINE = "2026-05-20 22:00:00 ERROR app crashed with exit code 1\n" + +DOCKER_LINE = "2026-05-20T22:00:00.000000000Z stderr F container startup failed\n" + + +def _ssh_sources_yaml(sources: list[dict]) -> str: + return yaml.dump({"sources": sources}) + + +def _mock_transport(lines: list[str] | None = None): + """Return a mock SSHTransport context manager whose exec_stream yields given lines.""" + mock_t = MagicMock() + mock_t.exec_stream.return_value = iter(lines or []) + return mock_t + + +def _patch_transport(mock_t): + """Patch SSHTransport in pipeline so __enter__ returns mock_t.""" + p = patch("app.glean.pipeline.SSHTransport") + MockClass = p.start() + MockClass.return_value.__enter__.return_value = mock_t + MockClass.return_value.__exit__.return_value = None + return p, MockClass + + +def _entry_count(db_path: Path) -> int: + conn = sqlite3.connect(db_path) + n = conn.execute("SELECT COUNT(*) FROM log_entries").fetchone()[0] + conn.close() + return n + + +# ── journald type ───────────────────────────────────────────────────────────── + +class TestSSHJournaldGlean: + def test_journald_entries_written_to_db(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "journald"}], + }])) + + mock_t = _mock_transport([JOURNALD_LINE]) + p, MockClass = _patch_transport(mock_t) + try: + stats = glean_sources(sources_file, db_path) + finally: + p.stop() + + assert _entry_count(db_path) >= 1 + assert any("rack01" in k for k in stats) + + def test_journald_args_passed_to_command_builder(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "journald", "args": ["--since", "1 hour ago"]}], + }])) + + mock_t = _mock_transport([JOURNALD_LINE]) + p, _ = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + # The command passed to exec_stream must contain the args + call_args = mock_t.exec_stream.call_args[0][0] + assert "--since" in call_args + assert "1 hour ago" in call_args + + def test_journald_unit_shorthand(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "journald", "unit": "sshd"}], + }])) + + mock_t = _mock_transport([]) + p, _ = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + call_args = mock_t.exec_stream.call_args[0][0] + assert "sshd" in call_args + + +# ── syslog type ─────────────────────────────────────────────────────────────── + +class TestSSHSyslogGlean: + def test_syslog_entries_written_to_db(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01-syslog", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "syslog", "path": "/var/log/syslog"}], + }])) + + mock_t = _mock_transport([SYSLOG_LINE]) + p, _ = _patch_transport(mock_t) + try: + stats = glean_sources(sources_file, db_path) + finally: + p.stop() + + assert _entry_count(db_path) >= 1 + + def test_syslog_command_contains_path(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "syslog", "path": "/var/log/auth.log"}], + }])) + + mock_t = _mock_transport([]) + p, _ = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + call_args = mock_t.exec_stream.call_args[0][0] + assert "/var/log/auth.log" in call_args + + +# ── plaintext type ──────────────────────────────────────────────────────────── + +class TestSSHPlaintextGlean: + def test_plaintext_entries_written_to_db(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01-app", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "plaintext", "path": "/var/log/app/error.log"}], + }])) + + mock_t = _mock_transport([PLAINTEXT_LINE]) + p, _ = _patch_transport(mock_t) + try: + stats = glean_sources(sources_file, db_path) + finally: + p.stop() + + assert _entry_count(db_path) >= 1 + + def test_plaintext_command_contains_path(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "plaintext", "path": "/opt/myapp/app.log"}], + }])) + + mock_t = _mock_transport([]) + p, _ = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + call_args = mock_t.exec_stream.call_args[0][0] + assert "/opt/myapp/app.log" in call_args + + +# ── docker type ─────────────────────────────────────────────────────────────── + +class TestSSHDockerGlean: + def test_docker_single_container_command_issued(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "docker", "containers": ["myapp"]}], + }])) + + mock_t = _mock_transport([DOCKER_LINE]) + p, _ = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + call_args = mock_t.exec_stream.call_args[0][0] + assert "myapp" in call_args + + def test_docker_multiple_containers_exec_per_container(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "docker", "containers": ["app", "nginx"]}], + }])) + + mock_t = MagicMock() + mock_t.exec_stream.return_value = iter([]) + p, _ = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + # One exec_stream call per container + assert mock_t.exec_stream.call_count == 2 + all_cmds = " ".join(c[0][0] for c in mock_t.exec_stream.call_args_list) + assert "app" in all_cmds + assert "nginx" in all_cmds + + +# ── error handling ──────────────────────────────────────────────────────────── + +class TestSSHGleanErrorHandling: + def test_connection_error_skips_source_returns_empty_stats(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "unreachable", + "transport": "ssh", + "host": "192.168.99.99", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "journald"}], + }])) + + with patch("app.glean.pipeline.SSHTransport") as MockClass: + MockClass.return_value.__enter__.side_effect = SSHConnectionError("no route") + MockClass.return_value.__exit__.return_value = None + stats = glean_sources(sources_file, db_path) + + assert _entry_count(db_path) == 0 + # Stats for the source should either be absent or zero + for v in stats.values(): + assert v == 0 + + def test_command_error_skips_item_continues_next(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + # Two glean items: first raises SSHCommandError, second yields a valid line + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [ + {"type": "journald"}, + {"type": "syslog", "path": "/var/log/syslog"}, + ], + }])) + + mock_t = MagicMock() + # side_effect list: exception instances are raised; other values are returned + mock_t.exec_stream.side_effect = [ + SSHCommandError("journalctl: command not found"), # raised on first call + iter([SYSLOG_LINE]), # returned on second call + ] + + p, _ = _patch_transport(mock_t) + try: + # Should not raise — bad item is skipped, good item is processed + stats = glean_sources(sources_file, db_path) + finally: + p.stop() + + # The syslog line should have been written + assert _entry_count(db_path) >= 1 + + def test_unknown_glean_type_skipped(self, tmp_path): + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "mqtt"}], # not a valid remote type + }])) + + mock_t = _mock_transport([]) + p, _ = _patch_transport(mock_t) + try: + stats = glean_sources(sources_file, db_path) # must not raise + finally: + p.stop() + + assert _entry_count(db_path) == 0 + + +# ── mixed local + SSH sources ───────────────────────────────────────────────── + +class TestMixedLocalAndSSH: + def test_local_and_ssh_both_processed(self, tmp_path): + # Local syslog file + local_log = tmp_path / "local.log" + local_log.write_text(SYSLOG_LINE) + + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([ + {"id": "local-syslog", "path": str(local_log)}, + { + "id": "remote01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [{"type": "syslog", "path": "/var/log/syslog"}], + }, + ])) + + mock_t = _mock_transport([SYSLOG_LINE]) + p, _ = _patch_transport(mock_t) + try: + stats = glean_sources(sources_file, db_path) + finally: + p.stop() + + # Both sources should have contributed entries + assert _entry_count(db_path) >= 2 + assert "local-syslog" in stats + assert any("remote01" in k for k in stats) + + def test_local_only_sources_never_calls_ssh(self, tmp_path): + local_log = tmp_path / "local.log" + local_log.write_text(SYSLOG_LINE) + + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([ + {"id": "local", "path": str(local_log)}, + ])) + + with patch("app.glean.pipeline.SSHTransport") as MockClass: + glean_sources(sources_file, db_path) + MockClass.assert_not_called() + + +# ── multiple glean items per SSH source ─────────────────────────────────────── + +class TestMultipleGleanItemsPerHost: + def test_one_connection_multiple_commands(self, tmp_path): + """One SSHTransport instance is shared across all glean items for a host.""" + sources_file = tmp_path / "sources.yaml" + db_path = tmp_path / "test.db" + sources_file.write_text(_ssh_sources_yaml([{ + "id": "rack01", + "transport": "ssh", + "host": "192.168.1.10", + "user": "admin", + "key_path": "~/.ssh/id_ed25519", + "glean": [ + {"type": "journald"}, + {"type": "syslog", "path": "/var/log/syslog"}, + {"type": "plaintext", "path": "/var/log/app.log"}, + ], + }])) + + mock_t = _mock_transport([]) + p, MockClass = _patch_transport(mock_t) + try: + glean_sources(sources_file, db_path) + finally: + p.stop() + + # SSHTransport() should be instantiated only once for the one host + MockClass.assert_called_once() + # exec_stream should be called once per glean item + assert mock_t.exec_stream.call_count == 3 diff --git a/tests/test_glean_ssh.py b/tests/test_glean_ssh.py new file mode 100644 index 0000000..9a240ad --- /dev/null +++ b/tests/test_glean_ssh.py @@ -0,0 +1,185 @@ +"""Tests for SSH transport layer (app/glean/ssh.py). + +All SSH network I/O is mocked — no real SSH connection required. +""" +from __future__ import annotations + +import io +from unittest.mock import MagicMock, patch, call + +import pytest + +from app.glean.ssh import ( + SSHTransport, + SSHConnectionError, + SSHCommandError, + _build_journald_command, + _build_syslog_command, + _build_plaintext_command, + _build_docker_command, +) + + +# ── Command builders ────────────────────────────────────────────────────────── + +class TestBuildJournaldCommand: + def test_no_args_returns_base_command(self): + cmd = _build_journald_command({}) + assert "journalctl" in cmd + assert "-o json" in cmd + + def test_args_list_appended(self): + cmd = _build_journald_command({"args": ["--since", "2 hours ago", "--unit", "sshd"]}) + assert "--since" in cmd + assert "2 hours ago" in cmd + assert "--unit" in cmd + assert "sshd" in cmd + + def test_unit_shorthand(self): + cmd = _build_journald_command({"unit": "docker"}) + assert "--unit docker" in cmd or "--unit=docker" in cmd + + +class TestBuildSyslogCommand: + def test_returns_cat_command(self): + cmd = _build_syslog_command({"path": "/var/log/syslog"}) + assert "cat" in cmd + assert "/var/log/syslog" in cmd + + def test_default_path_when_omitted(self): + cmd = _build_syslog_command({}) + assert "cat" in cmd + assert "/var/log" in cmd + + +class TestBuildPlaintextCommand: + def test_cat_with_path(self): + cmd = _build_plaintext_command({"path": "/var/log/app/error.log"}) + assert "cat" in cmd + assert "/var/log/app/error.log" in cmd + + def test_raises_without_path(self): + with pytest.raises((ValueError, KeyError)): + _build_plaintext_command({}) + + +class TestBuildDockerCommand: + def test_single_container(self): + cmd = _build_docker_command({"containers": ["myapp"]}) + assert "myapp" in cmd + + def test_multiple_containers_returns_list(self): + cmds = _build_docker_command({"containers": ["app", "nginx"]}) + # Multiple containers → must produce a command per container OR joined + assert "app" in (cmds if isinstance(cmds, str) else " ".join(cmds)) + assert "nginx" in (cmds if isinstance(cmds, str) else " ".join(cmds)) + + def test_raises_without_containers(self): + with pytest.raises((ValueError, KeyError)): + _build_docker_command({}) + + +# ── SSHTransport context manager ────────────────────────────────────────────── + +def _mock_ssh_client(stdout_lines: list[str] | None = None): + """Return a mock SSHClient whose exec_command yields the given lines.""" + client = MagicMock() + stdout = MagicMock() + stdout.__iter__ = MagicMock(return_value=iter(stdout_lines or [])) + stderr = MagicMock() + stderr.read.return_value = b"" + client.exec_command.return_value = (MagicMock(), stdout, stderr) + return client + + +class TestSSHTransportConnect: + def test_connects_with_key_path(self, tmp_path): + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + MockClient.return_value = _mock_ssh_client() + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)): + pass + MockClient.return_value.connect.assert_called_once() + call_kwargs = MockClient.return_value.connect.call_args + assert call_kwargs.kwargs.get("hostname") == "10.0.0.1" or \ + call_kwargs.args[0] == "10.0.0.1" + + def test_disconnects_on_exit(self, tmp_path): + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + mock_client = _mock_ssh_client() + MockClient.return_value = mock_client + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)): + pass + mock_client.close.assert_called_once() + + def test_disconnects_on_exception(self, tmp_path): + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + mock_client = _mock_ssh_client() + MockClient.return_value = mock_client + with pytest.raises(RuntimeError): + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)): + raise RuntimeError("boom") + mock_client.close.assert_called_once() + + def test_raises_ssh_connection_error_on_auth_failure(self, tmp_path): + import paramiko + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + MockClient.return_value.connect.side_effect = paramiko.AuthenticationException("denied") + with pytest.raises(SSHConnectionError, match="auth"): + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)): + pass + + def test_raises_ssh_connection_error_on_no_route(self, tmp_path): + import paramiko + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + MockClient.return_value.connect.side_effect = paramiko.SSHException("no route") + with pytest.raises(SSHConnectionError): + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)): + pass + + +class TestSSHTransportExecStream: + def test_yields_stdout_lines(self, tmp_path): + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + lines = ["line one\n", "line two\n", "line three\n"] + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + MockClient.return_value = _mock_ssh_client(lines) + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t: + result = list(t.exec_stream("echo hello")) + assert result == lines + + def test_raises_ssh_command_error_on_nonzero_exit(self, tmp_path): + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + mock_client = _mock_ssh_client([]) + # Simulate non-zero exit code + channel = MagicMock() + channel.recv_exit_status.return_value = 1 + mock_client.exec_command.return_value[1].channel = channel + mock_client.exec_command.return_value[2].read.return_value = b"command not found" + MockClient.return_value = mock_client + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t: + with pytest.raises(SSHCommandError, match="command not found"): + list(t.exec_stream("notacommand")) + + def test_strips_trailing_newlines(self, tmp_path): + key_file = tmp_path / "id_ed25519" + key_file.write_bytes(b"fake-key") + lines = [" line with spaces \n"] + with patch("app.glean.ssh.paramiko.SSHClient") as MockClient: + MockClient.return_value = _mock_ssh_client(lines) + with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t: + # exec_stream should yield the raw lines; stripping is parser's job + result = list(t.exec_stream("echo hello")) + assert len(result) == 1