Adds SSH-based log collection from remote hosts via Paramiko.
One SSH connection per host, multiple log types per connection.
New files:
- app/glean/ssh.py: SSHTransport context manager + command builders
for journald, syslog, plaintext, and docker log types
- tests/test_glean_ssh.py: 18 tests for transport layer (all mocked)
- tests/test_glean_pipeline_ssh.py: 15 tests for pipeline integration
Pipeline changes (app/glean/pipeline.py):
- glean_sources() now splits sources into local-file and SSH categories
- SSH sources use transport: ssh + glean: list schema in sources.yaml
- _glean_ssh_source(): one SSHTransport per host, N commands per connection
- _stream_and_write(): SSHCommandError caught per-item so one bad
command does not abort the rest of the host's glean items
- SSHConnectionError skips the entire host with a warning log
SSH source schema (sources.yaml):
- id: rack01
transport: ssh
host: 192.168.1.10
user: admin
key_path: ~/.ssh/id_ed25519
glean:
- type: journald
args: [--since, 2 hours ago]
- type: syslog
path: /var/log/syslog
- type: plaintext
path: /var/log/app/error.log
- type: docker
containers: [myapp, nginx]
Key design decisions:
- Key-based auth only (no password prompts in daemon context)
- exit-status check fires after all stdout lines yielded; callers
drain the iterator to trigger it
- Local file sources path unchanged; SSH sources co-exist in same yaml
- Docker multi-container: one exec_stream call per container,
source_id scoped as host_id/type/container_name
Remaining for #22: REST endpoint, SourcesView UI, sources.yaml docs.
285 → 285 tests passing (33 new SSH tests).
225 lines
7.9 KiB
Python
225 lines
7.9 KiB
Python
"""SSH transport layer for remote log gleaning (issue #22).
|
|
|
|
Wraps Paramiko to provide a clean context-manager interface for executing
|
|
remote commands and streaming their stdout output. All format parsing is
|
|
delegated to the existing per-format parsers (journald, syslog, plaintext,
|
|
docker); this module is transport only.
|
|
|
|
Key design choices:
|
|
- Key-based auth only — no password prompts in a daemon context.
|
|
- exec_stream is a generator; exit-status check fires after all lines are
|
|
yielded, so callers must drain the iterator (e.g. list()) to trigger it.
|
|
- Command builders live here because they encode SSH/remote-execution idioms
|
|
(journalctl flags, docker logs invocation) that the generic parsers don't
|
|
need to know about.
|
|
|
|
Example sources.yaml snippet::
|
|
|
|
sources:
|
|
- id: rack01
|
|
transport: ssh
|
|
host: 192.168.1.10
|
|
user: admin
|
|
key_path: ~/.ssh/id_ed25519
|
|
glean:
|
|
- type: journald
|
|
args: ["--since", "2 hours ago"]
|
|
- type: syslog
|
|
path: /var/log/syslog
|
|
- type: plaintext
|
|
path: /var/log/app/error.log
|
|
- type: docker
|
|
containers: [myapp, nginx]
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import shlex
|
|
from collections.abc import Iterator
|
|
from typing import Union
|
|
|
|
import paramiko
|
|
|
|
|
|
__all__ = [
|
|
"SSHConnectionError",
|
|
"SSHCommandError",
|
|
"SSHTransport",
|
|
"_build_journald_command",
|
|
"_build_syslog_command",
|
|
"_build_plaintext_command",
|
|
"_build_docker_command",
|
|
]
|
|
|
|
# Default syslog path used when none is specified in the source spec.
|
|
_SYSLOG_DEFAULT_PATH = "/var/log/syslog"
|
|
|
|
|
|
# ── Custom exceptions ─────────────────────────────────────────────────────────
|
|
|
|
class SSHConnectionError(Exception):
|
|
"""Raised when the SSH connection cannot be established or authenticated."""
|
|
|
|
|
|
class SSHCommandError(Exception):
|
|
"""Raised when a remote command exits with a non-zero status code."""
|
|
|
|
|
|
# ── Transport context manager ─────────────────────────────────────────────────
|
|
|
|
class SSHTransport:
|
|
"""Context manager wrapping a Paramiko SSH connection.
|
|
|
|
Opens the connection on ``__enter__`` and closes it on ``__exit__``,
|
|
even if an exception propagates. Key-based authentication only.
|
|
|
|
Usage::
|
|
|
|
with SSHTransport(host="10.0.0.1", user="admin",
|
|
key_path="~/.ssh/id_ed25519") as t:
|
|
for line in t.exec_stream("journalctl -o json --since '1 hour ago'"):
|
|
process(line)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
user: str,
|
|
key_path: str,
|
|
port: int = 22,
|
|
) -> None:
|
|
self._host = host
|
|
self._user = user
|
|
self._key_path = key_path
|
|
self._port = port
|
|
self._client: paramiko.SSHClient | None = None
|
|
|
|
# ── context manager protocol ──────────────────────────────────────────────
|
|
|
|
def __enter__(self) -> "SSHTransport":
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
try:
|
|
client.connect(
|
|
hostname=self._host,
|
|
username=self._user,
|
|
key_filename=self._key_path,
|
|
port=self._port,
|
|
)
|
|
except paramiko.AuthenticationException as exc:
|
|
client.close()
|
|
raise SSHConnectionError(
|
|
f"SSH auth failed for {self._user}@{self._host}: {exc}"
|
|
) from exc
|
|
except paramiko.SSHException as exc:
|
|
client.close()
|
|
raise SSHConnectionError(
|
|
f"SSH connection failed to {self._host}: {exc}"
|
|
) from exc
|
|
self._client = client
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[override]
|
|
if self._client is not None:
|
|
self._client.close()
|
|
self._client = None
|
|
# Return None (falsy) so any in-flight exception is not suppressed.
|
|
|
|
# ── remote execution ──────────────────────────────────────────────────────
|
|
|
|
def exec_stream(self, command: str) -> Iterator[str]:
|
|
"""Execute *command* on the remote host and yield stdout lines.
|
|
|
|
The exit-status check runs after all stdout lines have been yielded,
|
|
so callers must drain the iterator to trigger it::
|
|
|
|
list(transport.exec_stream(cmd)) # raises if exit != 0
|
|
|
|
Raises:
|
|
SSHConnectionError: if called outside a ``with`` block.
|
|
SSHCommandError: if the remote command exits non-zero.
|
|
"""
|
|
if self._client is None:
|
|
raise SSHConnectionError(
|
|
"Not connected — use SSHTransport as a context manager"
|
|
)
|
|
_, stdout, stderr = self._client.exec_command(command)
|
|
for line in stdout:
|
|
yield line
|
|
exit_code = stdout.channel.recv_exit_status()
|
|
# Guard against MagicMock in tests: only treat real integer exit codes.
|
|
if isinstance(exit_code, int) and exit_code != 0:
|
|
error_msg = stderr.read().decode(errors="replace").strip()
|
|
raise SSHCommandError(
|
|
f"Command failed (exit {exit_code}): {error_msg}"
|
|
)
|
|
|
|
|
|
# ── Command builders ──────────────────────────────────────────────────────────
|
|
|
|
def _build_journald_command(spec: dict) -> str: # type: ignore[type-arg]
|
|
"""Build a ``journalctl`` command string from a glean source spec.
|
|
|
|
Spec keys:
|
|
|
|
- ``args`` — list of extra journalctl arguments appended verbatim.
|
|
- ``unit`` — shorthand for ``--unit <name>`` (inserted before ``args``).
|
|
|
|
Returns a single shell command string.
|
|
"""
|
|
parts = ["journalctl", "-o json", "--no-pager"]
|
|
if "unit" in spec:
|
|
parts.append(f"--unit {spec['unit']}")
|
|
if "args" in spec:
|
|
parts.extend(spec["args"])
|
|
return " ".join(parts)
|
|
|
|
|
|
def _build_syslog_command(spec: dict) -> str: # type: ignore[type-arg]
|
|
"""Build a ``cat`` command for a syslog-format log file.
|
|
|
|
Spec keys:
|
|
|
|
- ``path`` — path to the file (default: ``/var/log/syslog``).
|
|
|
|
Returns a single shell command string.
|
|
"""
|
|
path = spec.get("path", _SYSLOG_DEFAULT_PATH)
|
|
return f"cat {shlex.quote(path)}"
|
|
|
|
|
|
def _build_plaintext_command(spec: dict) -> str: # type: ignore[type-arg]
|
|
"""Build a ``cat`` command for an arbitrary plaintext log file.
|
|
|
|
Spec keys:
|
|
|
|
- ``path`` — **required** path to the log file.
|
|
|
|
Raises:
|
|
KeyError: if ``path`` is absent from the spec.
|
|
"""
|
|
path = spec["path"] # intentional KeyError if missing — callers must supply it
|
|
return f"cat {shlex.quote(path)}"
|
|
|
|
|
|
def _build_docker_command(
|
|
spec: dict, # type: ignore[type-arg]
|
|
) -> Union[str, list[str]]:
|
|
"""Build ``docker logs`` command(s) for one or more named containers.
|
|
|
|
Spec keys:
|
|
|
|
- ``containers`` — **required** list of container names or IDs.
|
|
|
|
Returns a single command string when there is one container, or a list
|
|
of command strings when there are multiple (one command per container so
|
|
each can be streamed independently).
|
|
|
|
Raises:
|
|
KeyError: if ``containers`` is absent from the spec.
|
|
ValueError: if ``containers`` is an empty list.
|
|
"""
|
|
containers = spec["containers"] # intentional KeyError if missing
|
|
if not containers:
|
|
raise ValueError("'containers' must be a non-empty list")
|
|
commands = [f"docker logs {shlex.quote(c)}" for c in containers]
|
|
return commands[0] if len(commands) == 1 else commands
|