Adds SSH-based log collection from remote hosts via Paramiko.
One SSH connection per host, multiple log types per connection.
New files:
- app/glean/ssh.py: SSHTransport context manager + command builders
for journald, syslog, plaintext, and docker log types
- tests/test_glean_ssh.py: 18 tests for transport layer (all mocked)
- tests/test_glean_pipeline_ssh.py: 15 tests for pipeline integration
Pipeline changes (app/glean/pipeline.py):
- glean_sources() now splits sources into local-file and SSH categories
- SSH sources use transport: ssh + glean: list schema in sources.yaml
- _glean_ssh_source(): one SSHTransport per host, N commands per connection
- _stream_and_write(): SSHCommandError caught per-item so one bad
command does not abort the rest of the host's glean items
- SSHConnectionError skips the entire host with a warning log
SSH source schema (sources.yaml):
- id: rack01
transport: ssh
host: 192.168.1.10
user: admin
key_path: ~/.ssh/id_ed25519
glean:
- type: journald
args: [--since, 2 hours ago]
- type: syslog
path: /var/log/syslog
- type: plaintext
path: /var/log/app/error.log
- type: docker
containers: [myapp, nginx]
Key design decisions:
- Key-based auth only (no password prompts in daemon context)
- exit-status check fires after all stdout lines yielded; callers
drain the iterator to trigger it
- Local file sources path unchanged; SSH sources co-exist in same yaml
- Docker multi-container: one exec_stream call per container,
source_id scoped as host_id/type/container_name
Remaining for #22: REST endpoint, SourcesView UI, sources.yaml docs.
285 → 285 tests passing (33 new SSH tests).
444 lines
16 KiB
Python
444 lines
16 KiB
Python
"""Tests for SSH source handling in app/glean/pipeline.py.
|
|
|
|
Verifies that glean_sources() correctly:
|
|
- Dispatches SSH sources to SSHTransport (local sources unchanged)
|
|
- Routes each glean-type to the right command builder + parser
|
|
- Writes parsed entries to SQLite
|
|
- Gracefully skips sources on SSHConnectionError or SSHCommandError
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
from app.glean.pipeline import glean_sources, ensure_schema
|
|
from app.glean.ssh import SSHConnectionError, SSHCommandError
|
|
|
|
|
|
# ── Shared fixtures ───────────────────────────────────────────────────────────
|
|
|
|
JOURNALD_LINE = json.dumps({
|
|
"__REALTIME_TIMESTAMP": "1747000000000000",
|
|
"PRIORITY": "3",
|
|
"MESSAGE": "SSH brute force detected from 192.168.1.99",
|
|
"SYSLOG_IDENTIFIER": "sshd",
|
|
"_HOSTNAME": "rack01",
|
|
}) + "\n"
|
|
|
|
SYSLOG_LINE = "May 20 22:00:00 rack01 sshd[1234]: Failed password for invalid user admin\n"
|
|
|
|
PLAINTEXT_LINE = "2026-05-20 22:00:00 ERROR app crashed with exit code 1\n"
|
|
|
|
DOCKER_LINE = "2026-05-20T22:00:00.000000000Z stderr F container startup failed\n"
|
|
|
|
|
|
def _ssh_sources_yaml(sources: list[dict]) -> str:
|
|
return yaml.dump({"sources": sources})
|
|
|
|
|
|
def _mock_transport(lines: list[str] | None = None):
|
|
"""Return a mock SSHTransport context manager whose exec_stream yields given lines."""
|
|
mock_t = MagicMock()
|
|
mock_t.exec_stream.return_value = iter(lines or [])
|
|
return mock_t
|
|
|
|
|
|
def _patch_transport(mock_t):
|
|
"""Patch SSHTransport in pipeline so __enter__ returns mock_t."""
|
|
p = patch("app.glean.pipeline.SSHTransport")
|
|
MockClass = p.start()
|
|
MockClass.return_value.__enter__.return_value = mock_t
|
|
MockClass.return_value.__exit__.return_value = None
|
|
return p, MockClass
|
|
|
|
|
|
def _entry_count(db_path: Path) -> int:
|
|
conn = sqlite3.connect(db_path)
|
|
n = conn.execute("SELECT COUNT(*) FROM log_entries").fetchone()[0]
|
|
conn.close()
|
|
return n
|
|
|
|
|
|
# ── journald type ─────────────────────────────────────────────────────────────
|
|
|
|
class TestSSHJournaldGlean:
|
|
def test_journald_entries_written_to_db(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "journald"}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([JOURNALD_LINE])
|
|
p, MockClass = _patch_transport(mock_t)
|
|
try:
|
|
stats = glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
assert _entry_count(db_path) >= 1
|
|
assert any("rack01" in k for k in stats)
|
|
|
|
def test_journald_args_passed_to_command_builder(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "journald", "args": ["--since", "1 hour ago"]}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([JOURNALD_LINE])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
# The command passed to exec_stream must contain the args
|
|
call_args = mock_t.exec_stream.call_args[0][0]
|
|
assert "--since" in call_args
|
|
assert "1 hour ago" in call_args
|
|
|
|
def test_journald_unit_shorthand(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "journald", "unit": "sshd"}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
call_args = mock_t.exec_stream.call_args[0][0]
|
|
assert "sshd" in call_args
|
|
|
|
|
|
# ── syslog type ───────────────────────────────────────────────────────────────
|
|
|
|
class TestSSHSyslogGlean:
|
|
def test_syslog_entries_written_to_db(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01-syslog",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "syslog", "path": "/var/log/syslog"}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([SYSLOG_LINE])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
stats = glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
assert _entry_count(db_path) >= 1
|
|
|
|
def test_syslog_command_contains_path(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "syslog", "path": "/var/log/auth.log"}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
call_args = mock_t.exec_stream.call_args[0][0]
|
|
assert "/var/log/auth.log" in call_args
|
|
|
|
|
|
# ── plaintext type ────────────────────────────────────────────────────────────
|
|
|
|
class TestSSHPlaintextGlean:
|
|
def test_plaintext_entries_written_to_db(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01-app",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "plaintext", "path": "/var/log/app/error.log"}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([PLAINTEXT_LINE])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
stats = glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
assert _entry_count(db_path) >= 1
|
|
|
|
def test_plaintext_command_contains_path(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "plaintext", "path": "/opt/myapp/app.log"}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
call_args = mock_t.exec_stream.call_args[0][0]
|
|
assert "/opt/myapp/app.log" in call_args
|
|
|
|
|
|
# ── docker type ───────────────────────────────────────────────────────────────
|
|
|
|
class TestSSHDockerGlean:
|
|
def test_docker_single_container_command_issued(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "docker", "containers": ["myapp"]}],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([DOCKER_LINE])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
call_args = mock_t.exec_stream.call_args[0][0]
|
|
assert "myapp" in call_args
|
|
|
|
def test_docker_multiple_containers_exec_per_container(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "docker", "containers": ["app", "nginx"]}],
|
|
}]))
|
|
|
|
mock_t = MagicMock()
|
|
mock_t.exec_stream.return_value = iter([])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
# One exec_stream call per container
|
|
assert mock_t.exec_stream.call_count == 2
|
|
all_cmds = " ".join(c[0][0] for c in mock_t.exec_stream.call_args_list)
|
|
assert "app" in all_cmds
|
|
assert "nginx" in all_cmds
|
|
|
|
|
|
# ── error handling ────────────────────────────────────────────────────────────
|
|
|
|
class TestSSHGleanErrorHandling:
|
|
def test_connection_error_skips_source_returns_empty_stats(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "unreachable",
|
|
"transport": "ssh",
|
|
"host": "192.168.99.99",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "journald"}],
|
|
}]))
|
|
|
|
with patch("app.glean.pipeline.SSHTransport") as MockClass:
|
|
MockClass.return_value.__enter__.side_effect = SSHConnectionError("no route")
|
|
MockClass.return_value.__exit__.return_value = None
|
|
stats = glean_sources(sources_file, db_path)
|
|
|
|
assert _entry_count(db_path) == 0
|
|
# Stats for the source should either be absent or zero
|
|
for v in stats.values():
|
|
assert v == 0
|
|
|
|
def test_command_error_skips_item_continues_next(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
# Two glean items: first raises SSHCommandError, second yields a valid line
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [
|
|
{"type": "journald"},
|
|
{"type": "syslog", "path": "/var/log/syslog"},
|
|
],
|
|
}]))
|
|
|
|
mock_t = MagicMock()
|
|
# side_effect list: exception instances are raised; other values are returned
|
|
mock_t.exec_stream.side_effect = [
|
|
SSHCommandError("journalctl: command not found"), # raised on first call
|
|
iter([SYSLOG_LINE]), # returned on second call
|
|
]
|
|
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
# Should not raise — bad item is skipped, good item is processed
|
|
stats = glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
# The syslog line should have been written
|
|
assert _entry_count(db_path) >= 1
|
|
|
|
def test_unknown_glean_type_skipped(self, tmp_path):
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "mqtt"}], # not a valid remote type
|
|
}]))
|
|
|
|
mock_t = _mock_transport([])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
stats = glean_sources(sources_file, db_path) # must not raise
|
|
finally:
|
|
p.stop()
|
|
|
|
assert _entry_count(db_path) == 0
|
|
|
|
|
|
# ── mixed local + SSH sources ─────────────────────────────────────────────────
|
|
|
|
class TestMixedLocalAndSSH:
|
|
def test_local_and_ssh_both_processed(self, tmp_path):
|
|
# Local syslog file
|
|
local_log = tmp_path / "local.log"
|
|
local_log.write_text(SYSLOG_LINE)
|
|
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([
|
|
{"id": "local-syslog", "path": str(local_log)},
|
|
{
|
|
"id": "remote01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [{"type": "syslog", "path": "/var/log/syslog"}],
|
|
},
|
|
]))
|
|
|
|
mock_t = _mock_transport([SYSLOG_LINE])
|
|
p, _ = _patch_transport(mock_t)
|
|
try:
|
|
stats = glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
# Both sources should have contributed entries
|
|
assert _entry_count(db_path) >= 2
|
|
assert "local-syslog" in stats
|
|
assert any("remote01" in k for k in stats)
|
|
|
|
def test_local_only_sources_never_calls_ssh(self, tmp_path):
|
|
local_log = tmp_path / "local.log"
|
|
local_log.write_text(SYSLOG_LINE)
|
|
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([
|
|
{"id": "local", "path": str(local_log)},
|
|
]))
|
|
|
|
with patch("app.glean.pipeline.SSHTransport") as MockClass:
|
|
glean_sources(sources_file, db_path)
|
|
MockClass.assert_not_called()
|
|
|
|
|
|
# ── multiple glean items per SSH source ───────────────────────────────────────
|
|
|
|
class TestMultipleGleanItemsPerHost:
|
|
def test_one_connection_multiple_commands(self, tmp_path):
|
|
"""One SSHTransport instance is shared across all glean items for a host."""
|
|
sources_file = tmp_path / "sources.yaml"
|
|
db_path = tmp_path / "test.db"
|
|
sources_file.write_text(_ssh_sources_yaml([{
|
|
"id": "rack01",
|
|
"transport": "ssh",
|
|
"host": "192.168.1.10",
|
|
"user": "admin",
|
|
"key_path": "~/.ssh/id_ed25519",
|
|
"glean": [
|
|
{"type": "journald"},
|
|
{"type": "syslog", "path": "/var/log/syslog"},
|
|
{"type": "plaintext", "path": "/var/log/app.log"},
|
|
],
|
|
}]))
|
|
|
|
mock_t = _mock_transport([])
|
|
p, MockClass = _patch_transport(mock_t)
|
|
try:
|
|
glean_sources(sources_file, db_path)
|
|
finally:
|
|
p.stop()
|
|
|
|
# SSHTransport() should be instantiated only once for the one host
|
|
MockClass.assert_called_once()
|
|
# exec_stream should be called once per glean item
|
|
assert mock_t.exec_stream.call_count == 3
|