"""SSH transport layer for remote log gleaning (issue #22). Wraps Paramiko to provide a clean context-manager interface for executing remote commands and streaming their stdout output. All format parsing is delegated to the existing per-format parsers (journald, syslog, plaintext, docker); this module is transport only. Key design choices: - Key-based auth only — no password prompts in a daemon context. - exec_stream is a generator; exit-status check fires after all lines are yielded, so callers must drain the iterator (e.g. list()) to trigger it. - Command builders live here because they encode SSH/remote-execution idioms (journalctl flags, docker logs invocation) that the generic parsers don't need to know about. Example sources.yaml snippet:: sources: - id: rack01 transport: ssh host: 192.168.1.10 user: admin key_path: ~/.ssh/id_ed25519 glean: - type: journald args: ["--since", "2 hours ago"] - type: syslog path: /var/log/syslog - type: plaintext path: /var/log/app/error.log - type: docker containers: [myapp, nginx] """ from __future__ import annotations import shlex from collections.abc import Iterator from typing import Union import paramiko __all__ = [ "SSHConnectionError", "SSHCommandError", "SSHTransport", "_build_journald_command", "_build_syslog_command", "_build_plaintext_command", "_build_docker_command", ] # Default syslog path used when none is specified in the source spec. _SYSLOG_DEFAULT_PATH = "/var/log/syslog" # ── Custom exceptions ───────────────────────────────────────────────────────── class SSHConnectionError(Exception): """Raised when the SSH connection cannot be established or authenticated.""" class SSHCommandError(Exception): """Raised when a remote command exits with a non-zero status code.""" # ── Transport context manager ───────────────────────────────────────────────── class SSHTransport: """Context manager wrapping a Paramiko SSH connection. Opens the connection on ``__enter__`` and closes it on ``__exit__``, even if an exception propagates. Key-based authentication only. Usage:: with SSHTransport(host="10.0.0.1", user="admin", key_path="~/.ssh/id_ed25519") as t: for line in t.exec_stream("journalctl -o json --since '1 hour ago'"): process(line) """ def __init__( self, host: str, user: str, key_path: str, port: int = 22, ) -> None: self._host = host self._user = user self._key_path = key_path self._port = port self._client: paramiko.SSHClient | None = None # ── context manager protocol ────────────────────────────────────────────── def __enter__(self) -> "SSHTransport": client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect( hostname=self._host, username=self._user, key_filename=self._key_path, port=self._port, ) except paramiko.AuthenticationException as exc: client.close() raise SSHConnectionError( f"SSH auth failed for {self._user}@{self._host}: {exc}" ) from exc except paramiko.SSHException as exc: client.close() raise SSHConnectionError( f"SSH connection failed to {self._host}: {exc}" ) from exc self._client = client return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[override] if self._client is not None: self._client.close() self._client = None # Return None (falsy) so any in-flight exception is not suppressed. # ── remote execution ────────────────────────────────────────────────────── def exec_stream(self, command: str) -> Iterator[str]: """Execute *command* on the remote host and yield stdout lines. The exit-status check runs after all stdout lines have been yielded, so callers must drain the iterator to trigger it:: list(transport.exec_stream(cmd)) # raises if exit != 0 Raises: SSHConnectionError: if called outside a ``with`` block. SSHCommandError: if the remote command exits non-zero. """ if self._client is None: raise SSHConnectionError( "Not connected — use SSHTransport as a context manager" ) _, stdout, stderr = self._client.exec_command(command) for line in stdout: yield line exit_code = stdout.channel.recv_exit_status() # Guard against MagicMock in tests: only treat real integer exit codes. if isinstance(exit_code, int) and exit_code != 0: error_msg = stderr.read().decode(errors="replace").strip() raise SSHCommandError( f"Command failed (exit {exit_code}): {error_msg}" ) # ── Command builders ────────────────────────────────────────────────────────── def _build_journald_command(spec: dict) -> str: # type: ignore[type-arg] """Build a ``journalctl`` command string from a glean source spec. Spec keys: - ``args`` — list of extra journalctl arguments appended verbatim. - ``unit`` — shorthand for ``--unit `` (inserted before ``args``). Returns a single shell command string. """ parts = ["journalctl", "-o json", "--no-pager"] if "unit" in spec: parts.append(f"--unit {spec['unit']}") if "args" in spec: parts.extend(spec["args"]) return " ".join(parts) def _build_syslog_command(spec: dict) -> str: # type: ignore[type-arg] """Build a ``cat`` command for a syslog-format log file. Spec keys: - ``path`` — path to the file (default: ``/var/log/syslog``). Returns a single shell command string. """ path = spec.get("path", _SYSLOG_DEFAULT_PATH) return f"cat {shlex.quote(path)}" def _build_plaintext_command(spec: dict) -> str: # type: ignore[type-arg] """Build a ``cat`` command for an arbitrary plaintext log file. Spec keys: - ``path`` — **required** path to the log file. Raises: KeyError: if ``path`` is absent from the spec. """ path = spec["path"] # intentional KeyError if missing — callers must supply it return f"cat {shlex.quote(path)}" def _build_docker_command( spec: dict, # type: ignore[type-arg] ) -> Union[str, list[str]]: """Build ``docker logs`` command(s) for one or more named containers. Spec keys: - ``containers`` — **required** list of container names or IDs. Returns a single command string when there is one container, or a list of command strings when there are multiple (one command per container so each can be streamed independently). Raises: KeyError: if ``containers`` is absent from the spec. ValueError: if ``containers`` is an empty list. """ containers = spec["containers"] # intentional KeyError if missing if not containers: raise ValueError("'containers' must be a non-empty list") commands = [f"docker logs {shlex.quote(c)}" for c in containers] return commands[0] if len(commands) == 1 else commands