turnstone/tests/test_glean_ssh.py
pyr0ball 39c13f39ba feat: SSH remote host glean — transport layer and pipeline integration (closes #22, backend)
Adds SSH-based log collection from remote hosts via Paramiko.
One SSH connection per host, multiple log types per connection.

New files:
- app/glean/ssh.py: SSHTransport context manager + command builders
  for journald, syslog, plaintext, and docker log types
- tests/test_glean_ssh.py: 18 tests for transport layer (all mocked)
- tests/test_glean_pipeline_ssh.py: 15 tests for pipeline integration

Pipeline changes (app/glean/pipeline.py):
- glean_sources() now splits sources into local-file and SSH categories
- SSH sources use transport: ssh + glean: list schema in sources.yaml
- _glean_ssh_source(): one SSHTransport per host, N commands per connection
- _stream_and_write(): SSHCommandError caught per-item so one bad
  command does not abort the rest of the host's glean items
- SSHConnectionError skips the entire host with a warning log

SSH source schema (sources.yaml):
  - id: rack01
    transport: ssh
    host: 192.168.1.10
    user: admin
    key_path: ~/.ssh/id_ed25519
    glean:
      - type: journald
        args: [--since, 2 hours ago]
      - type: syslog
        path: /var/log/syslog
      - type: plaintext
        path: /var/log/app/error.log
      - type: docker
        containers: [myapp, nginx]

Key design decisions:
- Key-based auth only (no password prompts in daemon context)
- exit-status check fires after all stdout lines yielded; callers
  drain the iterator to trigger it
- Local file sources path unchanged; SSH sources co-exist in same yaml
- Docker multi-container: one exec_stream call per container,
  source_id scoped as host_id/type/container_name

Remaining for #22: REST endpoint, SourcesView UI, sources.yaml docs.
285 → 285 tests passing (33 new SSH tests).
2026-05-20 23:03:13 -07:00

185 lines
7.7 KiB
Python

"""Tests for SSH transport layer (app/glean/ssh.py).
All SSH network I/O is mocked — no real SSH connection required.
"""
from __future__ import annotations
import io
from unittest.mock import MagicMock, patch, call
import pytest
from app.glean.ssh import (
SSHTransport,
SSHConnectionError,
SSHCommandError,
_build_journald_command,
_build_syslog_command,
_build_plaintext_command,
_build_docker_command,
)
# ── Command builders ──────────────────────────────────────────────────────────
class TestBuildJournaldCommand:
def test_no_args_returns_base_command(self):
cmd = _build_journald_command({})
assert "journalctl" in cmd
assert "-o json" in cmd
def test_args_list_appended(self):
cmd = _build_journald_command({"args": ["--since", "2 hours ago", "--unit", "sshd"]})
assert "--since" in cmd
assert "2 hours ago" in cmd
assert "--unit" in cmd
assert "sshd" in cmd
def test_unit_shorthand(self):
cmd = _build_journald_command({"unit": "docker"})
assert "--unit docker" in cmd or "--unit=docker" in cmd
class TestBuildSyslogCommand:
def test_returns_cat_command(self):
cmd = _build_syslog_command({"path": "/var/log/syslog"})
assert "cat" in cmd
assert "/var/log/syslog" in cmd
def test_default_path_when_omitted(self):
cmd = _build_syslog_command({})
assert "cat" in cmd
assert "/var/log" in cmd
class TestBuildPlaintextCommand:
def test_cat_with_path(self):
cmd = _build_plaintext_command({"path": "/var/log/app/error.log"})
assert "cat" in cmd
assert "/var/log/app/error.log" in cmd
def test_raises_without_path(self):
with pytest.raises((ValueError, KeyError)):
_build_plaintext_command({})
class TestBuildDockerCommand:
def test_single_container(self):
cmd = _build_docker_command({"containers": ["myapp"]})
assert "myapp" in cmd
def test_multiple_containers_returns_list(self):
cmds = _build_docker_command({"containers": ["app", "nginx"]})
# Multiple containers → must produce a command per container OR joined
assert "app" in (cmds if isinstance(cmds, str) else " ".join(cmds))
assert "nginx" in (cmds if isinstance(cmds, str) else " ".join(cmds))
def test_raises_without_containers(self):
with pytest.raises((ValueError, KeyError)):
_build_docker_command({})
# ── SSHTransport context manager ──────────────────────────────────────────────
def _mock_ssh_client(stdout_lines: list[str] | None = None):
"""Return a mock SSHClient whose exec_command yields the given lines."""
client = MagicMock()
stdout = MagicMock()
stdout.__iter__ = MagicMock(return_value=iter(stdout_lines or []))
stderr = MagicMock()
stderr.read.return_value = b""
client.exec_command.return_value = (MagicMock(), stdout, stderr)
return client
class TestSSHTransportConnect:
def test_connects_with_key_path(self, tmp_path):
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
MockClient.return_value = _mock_ssh_client()
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
pass
MockClient.return_value.connect.assert_called_once()
call_kwargs = MockClient.return_value.connect.call_args
assert call_kwargs.kwargs.get("hostname") == "10.0.0.1" or \
call_kwargs.args[0] == "10.0.0.1"
def test_disconnects_on_exit(self, tmp_path):
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
mock_client = _mock_ssh_client()
MockClient.return_value = mock_client
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
pass
mock_client.close.assert_called_once()
def test_disconnects_on_exception(self, tmp_path):
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
mock_client = _mock_ssh_client()
MockClient.return_value = mock_client
with pytest.raises(RuntimeError):
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
raise RuntimeError("boom")
mock_client.close.assert_called_once()
def test_raises_ssh_connection_error_on_auth_failure(self, tmp_path):
import paramiko
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
MockClient.return_value.connect.side_effect = paramiko.AuthenticationException("denied")
with pytest.raises(SSHConnectionError, match="auth"):
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
pass
def test_raises_ssh_connection_error_on_no_route(self, tmp_path):
import paramiko
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
MockClient.return_value.connect.side_effect = paramiko.SSHException("no route")
with pytest.raises(SSHConnectionError):
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
pass
class TestSSHTransportExecStream:
def test_yields_stdout_lines(self, tmp_path):
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
lines = ["line one\n", "line two\n", "line three\n"]
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
MockClient.return_value = _mock_ssh_client(lines)
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t:
result = list(t.exec_stream("echo hello"))
assert result == lines
def test_raises_ssh_command_error_on_nonzero_exit(self, tmp_path):
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
mock_client = _mock_ssh_client([])
# Simulate non-zero exit code
channel = MagicMock()
channel.recv_exit_status.return_value = 1
mock_client.exec_command.return_value[1].channel = channel
mock_client.exec_command.return_value[2].read.return_value = b"command not found"
MockClient.return_value = mock_client
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t:
with pytest.raises(SSHCommandError, match="command not found"):
list(t.exec_stream("notacommand"))
def test_strips_trailing_newlines(self, tmp_path):
key_file = tmp_path / "id_ed25519"
key_file.write_bytes(b"fake-key")
lines = [" line with spaces \n"]
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
MockClient.return_value = _mock_ssh_client(lines)
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t:
# exec_stream should yield the raw lines; stripping is parser's job
result = list(t.exec_stream("echo hello"))
assert len(result) == 1