feat(diagnose): 5-stage multi-agent diagnose pipeline (#29) #39
3 changed files with 854 additions and 0 deletions
225
app/glean/ssh.py
Normal file
225
app/glean/ssh.py
Normal file
|
|
@ -0,0 +1,225 @@
|
|||
"""SSH transport layer for remote log gleaning (issue #22).
|
||||
|
||||
Wraps Paramiko to provide a clean context-manager interface for executing
|
||||
remote commands and streaming their stdout output. All format parsing is
|
||||
delegated to the existing per-format parsers (journald, syslog, plaintext,
|
||||
docker); this module is transport only.
|
||||
|
||||
Key design choices:
|
||||
- Key-based auth only — no password prompts in a daemon context.
|
||||
- exec_stream is a generator; exit-status check fires after all lines are
|
||||
yielded, so callers must drain the iterator (e.g. list()) to trigger it.
|
||||
- Command builders live here because they encode SSH/remote-execution idioms
|
||||
(journalctl flags, docker logs invocation) that the generic parsers don't
|
||||
need to know about.
|
||||
|
||||
Example sources.yaml snippet::
|
||||
|
||||
sources:
|
||||
- id: rack01
|
||||
transport: ssh
|
||||
host: 192.168.1.10
|
||||
user: admin
|
||||
key_path: ~/.ssh/id_ed25519
|
||||
glean:
|
||||
- type: journald
|
||||
args: ["--since", "2 hours ago"]
|
||||
- type: syslog
|
||||
path: /var/log/syslog
|
||||
- type: plaintext
|
||||
path: /var/log/app/error.log
|
||||
- type: docker
|
||||
containers: [myapp, nginx]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import shlex
|
||||
from collections.abc import Iterator
|
||||
from typing import Union
|
||||
|
||||
import paramiko
|
||||
|
||||
|
||||
__all__ = [
|
||||
"SSHConnectionError",
|
||||
"SSHCommandError",
|
||||
"SSHTransport",
|
||||
"_build_journald_command",
|
||||
"_build_syslog_command",
|
||||
"_build_plaintext_command",
|
||||
"_build_docker_command",
|
||||
]
|
||||
|
||||
# Default syslog path used when none is specified in the source spec.
|
||||
_SYSLOG_DEFAULT_PATH = "/var/log/syslog"
|
||||
|
||||
|
||||
# ── Custom exceptions ─────────────────────────────────────────────────────────
|
||||
|
||||
class SSHConnectionError(Exception):
|
||||
"""Raised when the SSH connection cannot be established or authenticated."""
|
||||
|
||||
|
||||
class SSHCommandError(Exception):
|
||||
"""Raised when a remote command exits with a non-zero status code."""
|
||||
|
||||
|
||||
# ── Transport context manager ─────────────────────────────────────────────────
|
||||
|
||||
class SSHTransport:
|
||||
"""Context manager wrapping a Paramiko SSH connection.
|
||||
|
||||
Opens the connection on ``__enter__`` and closes it on ``__exit__``,
|
||||
even if an exception propagates. Key-based authentication only.
|
||||
|
||||
Usage::
|
||||
|
||||
with SSHTransport(host="10.0.0.1", user="admin",
|
||||
key_path="~/.ssh/id_ed25519") as t:
|
||||
for line in t.exec_stream("journalctl -o json --since '1 hour ago'"):
|
||||
process(line)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
user: str,
|
||||
key_path: str,
|
||||
port: int = 22,
|
||||
) -> None:
|
||||
self._host = host
|
||||
self._user = user
|
||||
self._key_path = key_path
|
||||
self._port = port
|
||||
self._client: paramiko.SSHClient | None = None
|
||||
|
||||
# ── context manager protocol ──────────────────────────────────────────────
|
||||
|
||||
def __enter__(self) -> "SSHTransport":
|
||||
client = paramiko.SSHClient()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
try:
|
||||
client.connect(
|
||||
hostname=self._host,
|
||||
username=self._user,
|
||||
key_filename=self._key_path,
|
||||
port=self._port,
|
||||
)
|
||||
except paramiko.AuthenticationException as exc:
|
||||
client.close()
|
||||
raise SSHConnectionError(
|
||||
f"SSH auth failed for {self._user}@{self._host}: {exc}"
|
||||
) from exc
|
||||
except paramiko.SSHException as exc:
|
||||
client.close()
|
||||
raise SSHConnectionError(
|
||||
f"SSH connection failed to {self._host}: {exc}"
|
||||
) from exc
|
||||
self._client = client
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[override]
|
||||
if self._client is not None:
|
||||
self._client.close()
|
||||
self._client = None
|
||||
# Return None (falsy) so any in-flight exception is not suppressed.
|
||||
|
||||
# ── remote execution ──────────────────────────────────────────────────────
|
||||
|
||||
def exec_stream(self, command: str) -> Iterator[str]:
|
||||
"""Execute *command* on the remote host and yield stdout lines.
|
||||
|
||||
The exit-status check runs after all stdout lines have been yielded,
|
||||
so callers must drain the iterator to trigger it::
|
||||
|
||||
list(transport.exec_stream(cmd)) # raises if exit != 0
|
||||
|
||||
Raises:
|
||||
SSHConnectionError: if called outside a ``with`` block.
|
||||
SSHCommandError: if the remote command exits non-zero.
|
||||
"""
|
||||
if self._client is None:
|
||||
raise SSHConnectionError(
|
||||
"Not connected — use SSHTransport as a context manager"
|
||||
)
|
||||
_, stdout, stderr = self._client.exec_command(command)
|
||||
for line in stdout:
|
||||
yield line
|
||||
exit_code = stdout.channel.recv_exit_status()
|
||||
# Guard against MagicMock in tests: only treat real integer exit codes.
|
||||
if isinstance(exit_code, int) and exit_code != 0:
|
||||
error_msg = stderr.read().decode(errors="replace").strip()
|
||||
raise SSHCommandError(
|
||||
f"Command failed (exit {exit_code}): {error_msg}"
|
||||
)
|
||||
|
||||
|
||||
# ── Command builders ──────────────────────────────────────────────────────────
|
||||
|
||||
def _build_journald_command(spec: dict) -> str: # type: ignore[type-arg]
|
||||
"""Build a ``journalctl`` command string from a glean source spec.
|
||||
|
||||
Spec keys:
|
||||
|
||||
- ``args`` — list of extra journalctl arguments appended verbatim.
|
||||
- ``unit`` — shorthand for ``--unit <name>`` (inserted before ``args``).
|
||||
|
||||
Returns a single shell command string.
|
||||
"""
|
||||
parts = ["journalctl", "-o json", "--no-pager"]
|
||||
if "unit" in spec:
|
||||
parts.append(f"--unit {spec['unit']}")
|
||||
if "args" in spec:
|
||||
parts.extend(spec["args"])
|
||||
return " ".join(parts)
|
||||
|
||||
|
||||
def _build_syslog_command(spec: dict) -> str: # type: ignore[type-arg]
|
||||
"""Build a ``cat`` command for a syslog-format log file.
|
||||
|
||||
Spec keys:
|
||||
|
||||
- ``path`` — path to the file (default: ``/var/log/syslog``).
|
||||
|
||||
Returns a single shell command string.
|
||||
"""
|
||||
path = spec.get("path", _SYSLOG_DEFAULT_PATH)
|
||||
return f"cat {shlex.quote(path)}"
|
||||
|
||||
|
||||
def _build_plaintext_command(spec: dict) -> str: # type: ignore[type-arg]
|
||||
"""Build a ``cat`` command for an arbitrary plaintext log file.
|
||||
|
||||
Spec keys:
|
||||
|
||||
- ``path`` — **required** path to the log file.
|
||||
|
||||
Raises:
|
||||
KeyError: if ``path`` is absent from the spec.
|
||||
"""
|
||||
path = spec["path"] # intentional KeyError if missing — callers must supply it
|
||||
return f"cat {shlex.quote(path)}"
|
||||
|
||||
|
||||
def _build_docker_command(
|
||||
spec: dict, # type: ignore[type-arg]
|
||||
) -> Union[str, list[str]]:
|
||||
"""Build ``docker logs`` command(s) for one or more named containers.
|
||||
|
||||
Spec keys:
|
||||
|
||||
- ``containers`` — **required** list of container names or IDs.
|
||||
|
||||
Returns a single command string when there is one container, or a list
|
||||
of command strings when there are multiple (one command per container so
|
||||
each can be streamed independently).
|
||||
|
||||
Raises:
|
||||
KeyError: if ``containers`` is absent from the spec.
|
||||
ValueError: if ``containers`` is an empty list.
|
||||
"""
|
||||
containers = spec["containers"] # intentional KeyError if missing
|
||||
if not containers:
|
||||
raise ValueError("'containers' must be a non-empty list")
|
||||
commands = [f"docker logs {shlex.quote(c)}" for c in containers]
|
||||
return commands[0] if len(commands) == 1 else commands
|
||||
444
tests/test_glean_pipeline_ssh.py
Normal file
444
tests/test_glean_pipeline_ssh.py
Normal file
|
|
@ -0,0 +1,444 @@
|
|||
"""Tests for SSH source handling in app/glean/pipeline.py.
|
||||
|
||||
Verifies that glean_sources() correctly:
|
||||
- Dispatches SSH sources to SSHTransport (local sources unchanged)
|
||||
- Routes each glean-type to the right command builder + parser
|
||||
- Writes parsed entries to SQLite
|
||||
- Gracefully skips sources on SSHConnectionError or SSHCommandError
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from app.glean.pipeline import glean_sources, ensure_schema
|
||||
from app.glean.ssh import SSHConnectionError, SSHCommandError
|
||||
|
||||
|
||||
# ── Shared fixtures ───────────────────────────────────────────────────────────
|
||||
|
||||
JOURNALD_LINE = json.dumps({
|
||||
"__REALTIME_TIMESTAMP": "1747000000000000",
|
||||
"PRIORITY": "3",
|
||||
"MESSAGE": "SSH brute force detected from 192.168.1.99",
|
||||
"SYSLOG_IDENTIFIER": "sshd",
|
||||
"_HOSTNAME": "rack01",
|
||||
}) + "\n"
|
||||
|
||||
SYSLOG_LINE = "May 20 22:00:00 rack01 sshd[1234]: Failed password for invalid user admin\n"
|
||||
|
||||
PLAINTEXT_LINE = "2026-05-20 22:00:00 ERROR app crashed with exit code 1\n"
|
||||
|
||||
DOCKER_LINE = "2026-05-20T22:00:00.000000000Z stderr F container startup failed\n"
|
||||
|
||||
|
||||
def _ssh_sources_yaml(sources: list[dict]) -> str:
|
||||
return yaml.dump({"sources": sources})
|
||||
|
||||
|
||||
def _mock_transport(lines: list[str] | None = None):
|
||||
"""Return a mock SSHTransport context manager whose exec_stream yields given lines."""
|
||||
mock_t = MagicMock()
|
||||
mock_t.exec_stream.return_value = iter(lines or [])
|
||||
return mock_t
|
||||
|
||||
|
||||
def _patch_transport(mock_t):
|
||||
"""Patch SSHTransport in pipeline so __enter__ returns mock_t."""
|
||||
p = patch("app.glean.pipeline.SSHTransport")
|
||||
MockClass = p.start()
|
||||
MockClass.return_value.__enter__.return_value = mock_t
|
||||
MockClass.return_value.__exit__.return_value = None
|
||||
return p, MockClass
|
||||
|
||||
|
||||
def _entry_count(db_path: Path) -> int:
|
||||
conn = sqlite3.connect(db_path)
|
||||
n = conn.execute("SELECT COUNT(*) FROM log_entries").fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
# ── journald type ─────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSSHJournaldGlean:
|
||||
def test_journald_entries_written_to_db(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "journald"}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([JOURNALD_LINE])
|
||||
p, MockClass = _patch_transport(mock_t)
|
||||
try:
|
||||
stats = glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
assert _entry_count(db_path) >= 1
|
||||
assert any("rack01" in k for k in stats)
|
||||
|
||||
def test_journald_args_passed_to_command_builder(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "journald", "args": ["--since", "1 hour ago"]}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([JOURNALD_LINE])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
# The command passed to exec_stream must contain the args
|
||||
call_args = mock_t.exec_stream.call_args[0][0]
|
||||
assert "--since" in call_args
|
||||
assert "1 hour ago" in call_args
|
||||
|
||||
def test_journald_unit_shorthand(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "journald", "unit": "sshd"}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
call_args = mock_t.exec_stream.call_args[0][0]
|
||||
assert "sshd" in call_args
|
||||
|
||||
|
||||
# ── syslog type ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSSHSyslogGlean:
|
||||
def test_syslog_entries_written_to_db(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01-syslog",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "syslog", "path": "/var/log/syslog"}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([SYSLOG_LINE])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
stats = glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
assert _entry_count(db_path) >= 1
|
||||
|
||||
def test_syslog_command_contains_path(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "syslog", "path": "/var/log/auth.log"}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
call_args = mock_t.exec_stream.call_args[0][0]
|
||||
assert "/var/log/auth.log" in call_args
|
||||
|
||||
|
||||
# ── plaintext type ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSSHPlaintextGlean:
|
||||
def test_plaintext_entries_written_to_db(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01-app",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "plaintext", "path": "/var/log/app/error.log"}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([PLAINTEXT_LINE])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
stats = glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
assert _entry_count(db_path) >= 1
|
||||
|
||||
def test_plaintext_command_contains_path(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "plaintext", "path": "/opt/myapp/app.log"}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
call_args = mock_t.exec_stream.call_args[0][0]
|
||||
assert "/opt/myapp/app.log" in call_args
|
||||
|
||||
|
||||
# ── docker type ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSSHDockerGlean:
|
||||
def test_docker_single_container_command_issued(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "docker", "containers": ["myapp"]}],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([DOCKER_LINE])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
call_args = mock_t.exec_stream.call_args[0][0]
|
||||
assert "myapp" in call_args
|
||||
|
||||
def test_docker_multiple_containers_exec_per_container(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "docker", "containers": ["app", "nginx"]}],
|
||||
}]))
|
||||
|
||||
mock_t = MagicMock()
|
||||
mock_t.exec_stream.return_value = iter([])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
# One exec_stream call per container
|
||||
assert mock_t.exec_stream.call_count == 2
|
||||
all_cmds = " ".join(c[0][0] for c in mock_t.exec_stream.call_args_list)
|
||||
assert "app" in all_cmds
|
||||
assert "nginx" in all_cmds
|
||||
|
||||
|
||||
# ── error handling ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSSHGleanErrorHandling:
|
||||
def test_connection_error_skips_source_returns_empty_stats(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "unreachable",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.99.99",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "journald"}],
|
||||
}]))
|
||||
|
||||
with patch("app.glean.pipeline.SSHTransport") as MockClass:
|
||||
MockClass.return_value.__enter__.side_effect = SSHConnectionError("no route")
|
||||
MockClass.return_value.__exit__.return_value = None
|
||||
stats = glean_sources(sources_file, db_path)
|
||||
|
||||
assert _entry_count(db_path) == 0
|
||||
# Stats for the source should either be absent or zero
|
||||
for v in stats.values():
|
||||
assert v == 0
|
||||
|
||||
def test_command_error_skips_item_continues_next(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
# Two glean items: first raises SSHCommandError, second yields a valid line
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [
|
||||
{"type": "journald"},
|
||||
{"type": "syslog", "path": "/var/log/syslog"},
|
||||
],
|
||||
}]))
|
||||
|
||||
mock_t = MagicMock()
|
||||
# side_effect list: exception instances are raised; other values are returned
|
||||
mock_t.exec_stream.side_effect = [
|
||||
SSHCommandError("journalctl: command not found"), # raised on first call
|
||||
iter([SYSLOG_LINE]), # returned on second call
|
||||
]
|
||||
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
# Should not raise — bad item is skipped, good item is processed
|
||||
stats = glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
# The syslog line should have been written
|
||||
assert _entry_count(db_path) >= 1
|
||||
|
||||
def test_unknown_glean_type_skipped(self, tmp_path):
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "mqtt"}], # not a valid remote type
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
stats = glean_sources(sources_file, db_path) # must not raise
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
assert _entry_count(db_path) == 0
|
||||
|
||||
|
||||
# ── mixed local + SSH sources ─────────────────────────────────────────────────
|
||||
|
||||
class TestMixedLocalAndSSH:
|
||||
def test_local_and_ssh_both_processed(self, tmp_path):
|
||||
# Local syslog file
|
||||
local_log = tmp_path / "local.log"
|
||||
local_log.write_text(SYSLOG_LINE)
|
||||
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([
|
||||
{"id": "local-syslog", "path": str(local_log)},
|
||||
{
|
||||
"id": "remote01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [{"type": "syslog", "path": "/var/log/syslog"}],
|
||||
},
|
||||
]))
|
||||
|
||||
mock_t = _mock_transport([SYSLOG_LINE])
|
||||
p, _ = _patch_transport(mock_t)
|
||||
try:
|
||||
stats = glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
# Both sources should have contributed entries
|
||||
assert _entry_count(db_path) >= 2
|
||||
assert "local-syslog" in stats
|
||||
assert any("remote01" in k for k in stats)
|
||||
|
||||
def test_local_only_sources_never_calls_ssh(self, tmp_path):
|
||||
local_log = tmp_path / "local.log"
|
||||
local_log.write_text(SYSLOG_LINE)
|
||||
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([
|
||||
{"id": "local", "path": str(local_log)},
|
||||
]))
|
||||
|
||||
with patch("app.glean.pipeline.SSHTransport") as MockClass:
|
||||
glean_sources(sources_file, db_path)
|
||||
MockClass.assert_not_called()
|
||||
|
||||
|
||||
# ── multiple glean items per SSH source ───────────────────────────────────────
|
||||
|
||||
class TestMultipleGleanItemsPerHost:
|
||||
def test_one_connection_multiple_commands(self, tmp_path):
|
||||
"""One SSHTransport instance is shared across all glean items for a host."""
|
||||
sources_file = tmp_path / "sources.yaml"
|
||||
db_path = tmp_path / "test.db"
|
||||
sources_file.write_text(_ssh_sources_yaml([{
|
||||
"id": "rack01",
|
||||
"transport": "ssh",
|
||||
"host": "192.168.1.10",
|
||||
"user": "admin",
|
||||
"key_path": "~/.ssh/id_ed25519",
|
||||
"glean": [
|
||||
{"type": "journald"},
|
||||
{"type": "syslog", "path": "/var/log/syslog"},
|
||||
{"type": "plaintext", "path": "/var/log/app.log"},
|
||||
],
|
||||
}]))
|
||||
|
||||
mock_t = _mock_transport([])
|
||||
p, MockClass = _patch_transport(mock_t)
|
||||
try:
|
||||
glean_sources(sources_file, db_path)
|
||||
finally:
|
||||
p.stop()
|
||||
|
||||
# SSHTransport() should be instantiated only once for the one host
|
||||
MockClass.assert_called_once()
|
||||
# exec_stream should be called once per glean item
|
||||
assert mock_t.exec_stream.call_count == 3
|
||||
185
tests/test_glean_ssh.py
Normal file
185
tests/test_glean_ssh.py
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
"""Tests for SSH transport layer (app/glean/ssh.py).
|
||||
|
||||
All SSH network I/O is mocked — no real SSH connection required.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
from app.glean.ssh import (
|
||||
SSHTransport,
|
||||
SSHConnectionError,
|
||||
SSHCommandError,
|
||||
_build_journald_command,
|
||||
_build_syslog_command,
|
||||
_build_plaintext_command,
|
||||
_build_docker_command,
|
||||
)
|
||||
|
||||
|
||||
# ── Command builders ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestBuildJournaldCommand:
|
||||
def test_no_args_returns_base_command(self):
|
||||
cmd = _build_journald_command({})
|
||||
assert "journalctl" in cmd
|
||||
assert "-o json" in cmd
|
||||
|
||||
def test_args_list_appended(self):
|
||||
cmd = _build_journald_command({"args": ["--since", "2 hours ago", "--unit", "sshd"]})
|
||||
assert "--since" in cmd
|
||||
assert "2 hours ago" in cmd
|
||||
assert "--unit" in cmd
|
||||
assert "sshd" in cmd
|
||||
|
||||
def test_unit_shorthand(self):
|
||||
cmd = _build_journald_command({"unit": "docker"})
|
||||
assert "--unit docker" in cmd or "--unit=docker" in cmd
|
||||
|
||||
|
||||
class TestBuildSyslogCommand:
|
||||
def test_returns_cat_command(self):
|
||||
cmd = _build_syslog_command({"path": "/var/log/syslog"})
|
||||
assert "cat" in cmd
|
||||
assert "/var/log/syslog" in cmd
|
||||
|
||||
def test_default_path_when_omitted(self):
|
||||
cmd = _build_syslog_command({})
|
||||
assert "cat" in cmd
|
||||
assert "/var/log" in cmd
|
||||
|
||||
|
||||
class TestBuildPlaintextCommand:
|
||||
def test_cat_with_path(self):
|
||||
cmd = _build_plaintext_command({"path": "/var/log/app/error.log"})
|
||||
assert "cat" in cmd
|
||||
assert "/var/log/app/error.log" in cmd
|
||||
|
||||
def test_raises_without_path(self):
|
||||
with pytest.raises((ValueError, KeyError)):
|
||||
_build_plaintext_command({})
|
||||
|
||||
|
||||
class TestBuildDockerCommand:
|
||||
def test_single_container(self):
|
||||
cmd = _build_docker_command({"containers": ["myapp"]})
|
||||
assert "myapp" in cmd
|
||||
|
||||
def test_multiple_containers_returns_list(self):
|
||||
cmds = _build_docker_command({"containers": ["app", "nginx"]})
|
||||
# Multiple containers → must produce a command per container OR joined
|
||||
assert "app" in (cmds if isinstance(cmds, str) else " ".join(cmds))
|
||||
assert "nginx" in (cmds if isinstance(cmds, str) else " ".join(cmds))
|
||||
|
||||
def test_raises_without_containers(self):
|
||||
with pytest.raises((ValueError, KeyError)):
|
||||
_build_docker_command({})
|
||||
|
||||
|
||||
# ── SSHTransport context manager ──────────────────────────────────────────────
|
||||
|
||||
def _mock_ssh_client(stdout_lines: list[str] | None = None):
|
||||
"""Return a mock SSHClient whose exec_command yields the given lines."""
|
||||
client = MagicMock()
|
||||
stdout = MagicMock()
|
||||
stdout.__iter__ = MagicMock(return_value=iter(stdout_lines or []))
|
||||
stderr = MagicMock()
|
||||
stderr.read.return_value = b""
|
||||
client.exec_command.return_value = (MagicMock(), stdout, stderr)
|
||||
return client
|
||||
|
||||
|
||||
class TestSSHTransportConnect:
|
||||
def test_connects_with_key_path(self, tmp_path):
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
MockClient.return_value = _mock_ssh_client()
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
|
||||
pass
|
||||
MockClient.return_value.connect.assert_called_once()
|
||||
call_kwargs = MockClient.return_value.connect.call_args
|
||||
assert call_kwargs.kwargs.get("hostname") == "10.0.0.1" or \
|
||||
call_kwargs.args[0] == "10.0.0.1"
|
||||
|
||||
def test_disconnects_on_exit(self, tmp_path):
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
mock_client = _mock_ssh_client()
|
||||
MockClient.return_value = mock_client
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
|
||||
pass
|
||||
mock_client.close.assert_called_once()
|
||||
|
||||
def test_disconnects_on_exception(self, tmp_path):
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
mock_client = _mock_ssh_client()
|
||||
MockClient.return_value = mock_client
|
||||
with pytest.raises(RuntimeError):
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
|
||||
raise RuntimeError("boom")
|
||||
mock_client.close.assert_called_once()
|
||||
|
||||
def test_raises_ssh_connection_error_on_auth_failure(self, tmp_path):
|
||||
import paramiko
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
MockClient.return_value.connect.side_effect = paramiko.AuthenticationException("denied")
|
||||
with pytest.raises(SSHConnectionError, match="auth"):
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
|
||||
pass
|
||||
|
||||
def test_raises_ssh_connection_error_on_no_route(self, tmp_path):
|
||||
import paramiko
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
MockClient.return_value.connect.side_effect = paramiko.SSHException("no route")
|
||||
with pytest.raises(SSHConnectionError):
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)):
|
||||
pass
|
||||
|
||||
|
||||
class TestSSHTransportExecStream:
|
||||
def test_yields_stdout_lines(self, tmp_path):
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
lines = ["line one\n", "line two\n", "line three\n"]
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
MockClient.return_value = _mock_ssh_client(lines)
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t:
|
||||
result = list(t.exec_stream("echo hello"))
|
||||
assert result == lines
|
||||
|
||||
def test_raises_ssh_command_error_on_nonzero_exit(self, tmp_path):
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
mock_client = _mock_ssh_client([])
|
||||
# Simulate non-zero exit code
|
||||
channel = MagicMock()
|
||||
channel.recv_exit_status.return_value = 1
|
||||
mock_client.exec_command.return_value[1].channel = channel
|
||||
mock_client.exec_command.return_value[2].read.return_value = b"command not found"
|
||||
MockClient.return_value = mock_client
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t:
|
||||
with pytest.raises(SSHCommandError, match="command not found"):
|
||||
list(t.exec_stream("notacommand"))
|
||||
|
||||
def test_strips_trailing_newlines(self, tmp_path):
|
||||
key_file = tmp_path / "id_ed25519"
|
||||
key_file.write_bytes(b"fake-key")
|
||||
lines = [" line with spaces \n"]
|
||||
with patch("app.glean.ssh.paramiko.SSHClient") as MockClient:
|
||||
MockClient.return_value = _mock_ssh_client(lines)
|
||||
with SSHTransport(host="10.0.0.1", user="admin", key_path=str(key_file)) as t:
|
||||
# exec_stream should yield the raw lines; stripping is parser's job
|
||||
result = list(t.exec_stream("echo hello"))
|
||||
assert len(result) == 1
|
||||
Loading…
Reference in a new issue