"""SSH target registry — persisted in the main SQLite DB. Targets are stored as path references only. The private key is never read into the database, logged, or returned by any API response. """ from __future__ import annotations import os import sqlite3 import stat import time import uuid from dataclasses import dataclass from pathlib import Path from typing import Any @dataclass class SshTarget: id: str label: str host: str port: int user: str key_path: str last_tested: str | None last_ok: bool | None last_error: str | None created_at: str updated_at: str def _row_to_target(row: tuple) -> SshTarget: return SshTarget( id=row[0], label=row[1], host=row[2], port=row[3], user=row[4], key_path=row[5], last_tested=row[6], last_ok=bool(row[7]) if row[7] is not None else None, last_error=row[8], created_at=row[9], updated_at=row[10], ) def _now() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) # --------------------------------------------------------------------------- # CRUD # --------------------------------------------------------------------------- def list_targets(db_path: Path) -> list[SshTarget]: conn = sqlite3.connect(str(db_path), timeout=10) rows = conn.execute( "SELECT id, label, host, port, user, key_path, last_tested, last_ok, last_error, created_at, updated_at " "FROM ssh_targets ORDER BY label" ).fetchall() conn.close() return [_row_to_target(r) for r in rows] def get_target(db_path: Path, target_id: str) -> SshTarget | None: conn = sqlite3.connect(str(db_path), timeout=10) row = conn.execute( "SELECT id, label, host, port, user, key_path, last_tested, last_ok, last_error, created_at, updated_at " "FROM ssh_targets WHERE id = ?", (target_id,), ).fetchone() conn.close() return _row_to_target(row) if row else None def create_target( db_path: Path, label: str, host: str, port: int, user: str, key_path: str, ) -> SshTarget: resolved = _validate_key_path(key_path) now = _now() target_id = str(uuid.uuid4()) conn = sqlite3.connect(str(db_path), timeout=10) conn.execute( "INSERT INTO ssh_targets (id, label, host, port, user, key_path, created_at, updated_at) " "VALUES (?,?,?,?,?,?,?,?)", (target_id, label, host, port, user, str(resolved), now, now), ) conn.commit() conn.close() return get_target(db_path, target_id) # type: ignore[return-value] def update_target( db_path: Path, target_id: str, *, label: str | None = None, host: str | None = None, port: int | None = None, user: str | None = None, key_path: str | None = None, ) -> SshTarget | None: existing = get_target(db_path, target_id) if existing is None: return None resolved_key = str(_validate_key_path(key_path)) if key_path else existing.key_path conn = sqlite3.connect(str(db_path), timeout=10) conn.execute( "UPDATE ssh_targets SET label=?, host=?, port=?, user=?, key_path=?, updated_at=? WHERE id=?", ( label if label is not None else existing.label, host if host is not None else existing.host, port if port is not None else existing.port, user if user is not None else existing.user, resolved_key, _now(), target_id, ), ) conn.commit() conn.close() return get_target(db_path, target_id) def delete_target(db_path: Path, target_id: str) -> bool: conn = sqlite3.connect(str(db_path), timeout=10) cur = conn.execute("DELETE FROM ssh_targets WHERE id = ?", (target_id,)) conn.commit() conn.close() return cur.rowcount > 0 # --------------------------------------------------------------------------- # Test connection # --------------------------------------------------------------------------- def test_connection(db_path: Path, target_id: str) -> dict[str, Any]: """Attempt an SSH no-op and record the result. Runs `true` on the remote host — no data is pulled. Returns {ok: bool, error: str|null, tested_at: str}. """ target = get_target(db_path, target_id) if target is None: raise KeyError(f"SSH target {target_id!r} not found") # Lazy import — paramiko is optional try: from paramiko import SSHClient, AutoAddPolicy, AuthenticationException, SSHException except ImportError: _record_test(db_path, target_id, ok=False, error="paramiko not installed") return {"ok": False, "error": "paramiko not installed — run: pip install paramiko", "tested_at": _now()} key_path = str(Path(target.key_path).expanduser()) error: str | None = None ok = False try: client = SSHClient() client.set_missing_host_key_policy(AutoAddPolicy()) client.connect( hostname=target.host, port=target.port, username=target.user, key_filename=key_path, timeout=10, banner_timeout=10, ) stdin, stdout, stderr = client.exec_command("true", timeout=10) exit_code = stdout.channel.recv_exit_status() client.close() ok = exit_code == 0 if not ok: error = f"Remote command exited with code {exit_code}" except AuthenticationException: error = f"Authentication failed — check key path and remote authorized_keys" except SSHException as exc: error = f"SSH error: {exc}" except OSError as exc: error = f"Connection failed: {exc}" except Exception as exc: error = f"Unexpected error: {exc}" tested_at = _now() _record_test(db_path, target_id, ok=ok, error=error, tested_at=tested_at) return {"ok": ok, "error": error, "tested_at": tested_at} def _record_test( db_path: Path, target_id: str, *, ok: bool, error: str | None, tested_at: str | None = None, ) -> None: if tested_at is None: tested_at = _now() conn = sqlite3.connect(str(db_path), timeout=10) conn.execute( "UPDATE ssh_targets SET last_tested=?, last_ok=?, last_error=?, updated_at=? WHERE id=?", (tested_at, 1 if ok else 0, error, _now(), target_id), ) conn.commit() conn.close() # --------------------------------------------------------------------------- # Validation # --------------------------------------------------------------------------- def _validate_key_path(raw: str) -> Path: """Resolve and validate the SSH key path. Returns the resolved Path. Raises ValueError with a user-readable message on any problem (does not raise on world-readable — just returns a warning to the caller so the UI can display it non-blocking). """ p = Path(raw).expanduser() if not p.exists(): raise ValueError(f"Key file not found: {p}") if not p.is_file(): raise ValueError(f"Key path is not a file: {p}") return p def key_path_warning(key_path: str) -> str | None: """Return a warning string if the key file has overly permissive mode, else None.""" try: p = Path(key_path).expanduser() mode = p.stat().st_mode if mode & (stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH): perms = oct(mode & 0o777) return f"Key file permissions are too open ({perms}). SSH may refuse to use it — run: chmod 600 {p}" except OSError: pass return None def target_to_dict(t: SshTarget, include_warning: bool = False) -> dict[str, Any]: """Serialize a target for API responses. Never includes key contents.""" d: dict[str, Any] = { "id": t.id, "label": t.label, "host": t.host, "port": t.port, "user": t.user, "key_path": t.key_path, "last_tested": t.last_tested, "last_ok": t.last_ok, "last_error": t.last_error, "created_at": t.created_at, "updated_at": t.updated_at, } if include_warning: d["key_warning"] = key_path_warning(t.key_path) return d