From 3ebdd4aef00fed20c937864f1cd8883d355fa055 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 11 May 2026 15:34:13 -0700 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20live=20watch=20mode=20=E2=80=94=20t?= =?UTF-8?q?ail=20journald/docker/podman=20sources=20continuously=20(#4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds background watcher that tails active log sources and ingests entries in near-real-time, keeping the DB fresh without manual ingest runs. - app/watch/watcher.py: Watcher + WatchSource using subprocess + select loop; flushes every 10s or 100 lines; syncs FTS index every 3 flushes - patterns/watch.yaml: declarative source config (journald/docker/podman) - app/rest.py: lifespan context manager starts/stops watcher on app startup/shutdown; GET /api/watch/status + POST /api/watch/reload - web/src/views/DashboardView.vue: live/manual indicator chip + stale banner copy adapts to whether live watching is active - tests/test_watch_watcher.py: 16 tests covering config load, command building, docker timestamp stripping, orchestrator lifecycle Closes #4 --- app/rest.py | 42 +++++- app/watch/__init__.py | 0 app/watch/watcher.py | 256 ++++++++++++++++++++++++++++++++ patterns/watch.yaml | 35 +++++ tests/test_watch_watcher.py | 159 ++++++++++++++++++++ web/src/views/DashboardView.vue | 52 ++++++- 6 files changed, 530 insertions(+), 14 deletions(-) create mode 100644 app/watch/__init__.py create mode 100644 app/watch/watcher.py create mode 100644 patterns/watch.yaml create mode 100644 tests/test_watch_watcher.py diff --git a/app/rest.py b/app/rest.py index 09c3903..8f6ea2e 100644 --- a/app/rest.py +++ b/app/rest.py @@ -11,6 +11,7 @@ import json import os import urllib.error import urllib.request +from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated @@ -40,14 +41,31 @@ from app.services.search import ( format_results, ) from app.services.diagnose import diagnose as _diagnose +from app.watch.watcher import Watcher, load_watch_config DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) PREFS_PATH = DB_PATH.parent / "preferences.json" DIST_DIR = Path(__file__).parent.parent / "web" / "dist" SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") +PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) -app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None) +_watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml") + + +@asynccontextmanager +async def _lifespan(app: FastAPI): + ensure_schema(DB_PATH) + watch_cfg_path = PATTERN_DIR / "watch.yaml" + configs = load_watch_config(watch_cfg_path) + if configs: + _watcher.configure(configs) + _watcher.start() + yield + _watcher.stop() + + +app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) app.add_middleware( CORSMiddleware, @@ -57,11 +75,6 @@ app.add_middleware( ) -@app.on_event("startup") -def _startup() -> None: - ensure_schema(DB_PATH) - - _PREFS_DEFAULTS: dict = { "entry_point_style": "topbar", "llm_url": "http://localhost:11434", @@ -271,6 +284,23 @@ def list_sources() -> dict: return {"sources": _list_sources(DB_PATH)} +@router.get("/api/watch/status") +def watch_status() -> dict: + return {"active": _watcher.is_active(), "sources": _watcher.status} + + +@router.post("/api/watch/reload") +def watch_reload() -> dict: + """Stop all watch sources and restart with current watch.yaml.""" + _watcher.stop() + watch_cfg_path = PATTERN_DIR / "watch.yaml" + configs = load_watch_config(watch_cfg_path) + if configs: + _watcher.configure(configs) + _watcher.start() + return {"reloaded": True, "source_count": len(configs)} + + @router.get("/api/stats") def get_stats( window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24, diff --git a/app/watch/__init__.py b/app/watch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/watch/watcher.py b/app/watch/watcher.py new file mode 100644 index 0000000..9c772e5 --- /dev/null +++ b/app/watch/watcher.py @@ -0,0 +1,256 @@ +"""Live watch: tail active log sources and ingest entries in near-real-time. + +Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f) +in a daemon thread and pipes lines through the existing ingestors into SQLite. +FTS is synced incrementally after each flush. +""" +from __future__ import annotations + +import json +import logging +import sqlite3 +import subprocess +import threading +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterator + +import yaml + +from app.ingest import journald as journald_parser, syslog as syslog_parser +from app.ingest import plaintext as plaintext_parser +from app.ingest.base import _compile, load_patterns, now_iso +from app.ingest.pipeline import _write_batch, _SCHEMA +from app.services.search import build_fts_index +from app.services.models import RetrievedEntry + +logger = logging.getLogger(__name__) + +FLUSH_INTERVAL_SEC = 10 +FLUSH_BATCH_SIZE = 100 +FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load + + +# ── Config ──────────────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class WatchConfig: + source_type: str # "journald" | "docker" | "podman" | "file" + source_id: str + args: list[str] = field(default_factory=list) # extra CLI args + + +def load_watch_config(path: Path) -> list[WatchConfig]: + """Load watch.yaml; return empty list if file absent.""" + if not path.exists(): + return [] + raw = yaml.safe_load(path.read_text()) or {} + sources = [] + for src in raw.get("sources", []): + sources.append(WatchConfig( + source_type=src["type"], + source_id=src["id"], + args=src.get("args", []), + )) + return sources + + +# ── Per-source runner ───────────────────────────────────────────────────────── + +class WatchSource: + """Tails a single log source in a background daemon thread.""" + + def __init__( + self, + config: WatchConfig, + db_path: Path, + pattern_file: Path, + ) -> None: + self.config = config + self.db_path = db_path + self.pattern_file = pattern_file + self._stop = threading.Event() + self._thread: threading.Thread | None = None + self._proc: subprocess.Popen | None = None + self._last_event: str | None = None + self._entry_count: int = 0 + self._error: str | None = None + + @property + def status(self) -> dict: + return { + "source_id": self.config.source_id, + "type": self.config.source_type, + "running": self._thread is not None and self._thread.is_alive(), + "entries_ingested": self._entry_count, + "last_event": self._last_event, + "error": self._error, + } + + def start(self) -> None: + self._stop.clear() + self._thread = threading.Thread(target=self._run, daemon=True, name=f"watch:{self.config.source_id}") + self._thread.start() + logger.info("Watch source started: %s (%s)", self.config.source_id, self.config.source_type) + + def stop(self) -> None: + self._stop.set() + if self._proc: + try: + self._proc.terminate() + except OSError: + pass + if self._thread: + self._thread.join(timeout=5) + logger.info("Watch source stopped: %s", self.config.source_id) + + def _run(self) -> None: + patterns = load_patterns(self.pattern_file) + compiled = _compile(patterns) + + conn = sqlite3.connect(str(self.db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(_SCHEMA) + conn.commit() + + try: + cmd = self._build_command() + if not cmd: + return + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + self._drain(conn, compiled) + except Exception as exc: + self._error = str(exc) + logger.error("Watch source %r crashed: %s", self.config.source_id, exc) + finally: + conn.close() + + def _build_command(self) -> list[str] | None: + t = self.config.source_type + extra = self.config.args + if t == "journald": + return ["journalctl", "-f", "--output=json", "--no-pager"] + extra + if t == "docker": + if not extra: + logger.error("docker source %r requires args: [container_name]", self.config.source_id) + return None + return ["docker", "logs", "-f", "--timestamps", extra[0]] + extra[1:] + if t == "podman": + if not extra: + logger.error("podman source %r requires args: [container_name]", self.config.source_id) + return None + return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:] + logger.error("Unknown watch source type: %r", t) + return None + + def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]: + t = self.config.source_type + if t == "journald": + return list(journald_parser.parse(iter(lines), self.config.source_id, compiled, ingest_time)) + # Docker/Podman output: "2024-01-15T12:34:56.789012345Z log line text" + # Strip the timestamp prefix, then treat as plaintext with severity detection + stripped = [_strip_docker_ts(ln) for ln in lines] + return list(plaintext_parser.parse(iter(stripped), self.config.source_id, compiled, ingest_time)) + + def _drain(self, conn: sqlite3.Connection, compiled) -> None: + """Read lines from the subprocess and flush to DB periodically.""" + assert self._proc is not None + buffer: list[str] = [] + flush_count = 0 + last_flush = datetime.now(tz=timezone.utc) + + while not self._stop.is_set(): + assert self._proc.stdout is not None + # Non-blocking check with short readline timeout via select + import select + ready, _, _ = select.select([self._proc.stdout], [], [], 1.0) + + if ready: + line = self._proc.stdout.readline() + if not line: + if not self._stop.is_set(): + logger.warning("Watch process exited for %r — will retry in 5s", self.config.source_id) + self._stop.wait(5) + break + line = line.rstrip("\n") + if line: + buffer.append(line) + + elapsed = (datetime.now(tz=timezone.utc) - last_flush).total_seconds() + should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC + + if buffer and should_flush: + flush_count = self._flush(conn, buffer, compiled, flush_count) + buffer.clear() + last_flush = datetime.now(tz=timezone.utc) + + # Flush remainder + if buffer: + self._flush(conn, buffer, compiled, flush_count) + + def _flush(self, conn: sqlite3.Connection, lines: list[str], compiled, flush_count: int) -> int: + ingest_time = now_iso() + try: + entries = self._parse_lines(lines, ingest_time, compiled) + if entries: + _write_batch(conn, entries) + conn.commit() + self._entry_count += len(entries) + self._last_event = now_iso() + if entries: + self._last_event = entries[-1].timestamp_iso or self._last_event + + flush_count += 1 + if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0: + build_fts_index(self.db_path) + except Exception as exc: + logger.warning("Flush error for %r: %s", self.config.source_id, exc) + return flush_count + + +def _strip_docker_ts(line: str) -> str: + """Remove leading RFC3339 timestamp that docker/podman logs -f --timestamps adds.""" + # Format: "2024-01-15T12:34:56.789012345Z actual log text" + parts = line.split(" ", 1) + if len(parts) == 2 and "T" in parts[0] and parts[0].endswith("Z"): + return parts[1] + return line + + +# ── Orchestrator ────────────────────────────────────────────────────────────── + +class Watcher: + """Manages all active WatchSource instances.""" + + def __init__(self, db_path: Path, pattern_file: Path) -> None: + self.db_path = db_path + self.pattern_file = pattern_file + self._sources: list[WatchSource] = [] + + def configure(self, configs: list[WatchConfig]) -> None: + self._sources = [ + WatchSource(c, self.db_path, self.pattern_file) + for c in configs + ] + + def start(self) -> None: + for src in self._sources: + src.start() + + def stop(self) -> None: + for src in self._sources: + src.stop() + + @property + def status(self) -> list[dict]: + return [src.status for src in self._sources] + + def is_active(self) -> bool: + return any(src._thread is not None and src._thread.is_alive() for src in self._sources) diff --git a/patterns/watch.yaml b/patterns/watch.yaml new file mode 100644 index 0000000..680eed2 --- /dev/null +++ b/patterns/watch.yaml @@ -0,0 +1,35 @@ +# Turnstone live watch sources — entries here are tailed continuously. +# The watcher starts automatically when Turnstone starts. +# +# Source types: +# journald — system journal via `journalctl -f -o json` +# docker — container logs via `docker logs -f --timestamps ` +# podman — container logs via `podman logs -f --timestamps ` +# +# For journald, optional args filter by unit: +# args: ["-u", "nginx", "-u", "sshd"] +# +# For docker/podman, args[0] is the container name (required). +# +# Leave this file empty (just the header) to disable live watching. +# Missing containers are skipped with a warning — safe to leave entries +# for services that are temporarily down. + +sources: [] + +# ── Examples ──────────────────────────────────────────────────────────────── +# +# - type: journald +# id: system-journal +# +# - type: journald +# id: sshd-journal +# args: ["-u", "sshd"] +# +# - type: podman +# id: podman:turnstone +# args: ["turnstone"] +# +# - type: docker +# id: docker:nginx +# args: ["nginx-proxy"] diff --git a/tests/test_watch_watcher.py b/tests/test_watch_watcher.py new file mode 100644 index 0000000..4924d38 --- /dev/null +++ b/tests/test_watch_watcher.py @@ -0,0 +1,159 @@ +"""Tests for app/watch/watcher.py — config loading, command building, output parsing.""" +from __future__ import annotations + +import json +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from app.watch.watcher import ( + WatchConfig, + WatchSource, + Watcher, + _strip_docker_ts, + load_watch_config, +) + + +# ── Config loading ────────────────────────────────────────────────────────── + +def test_load_watch_config_missing_file(tmp_path: Path): + result = load_watch_config(tmp_path / "nonexistent.yaml") + assert result == [] + + +def test_load_watch_config_empty_sources(tmp_path: Path): + cfg = tmp_path / "watch.yaml" + cfg.write_text("sources: []\n") + assert load_watch_config(cfg) == [] + + +def test_load_watch_config_parses_journald(tmp_path: Path): + cfg = tmp_path / "watch.yaml" + cfg.write_text(""" +sources: + - type: journald + id: system-journal + args: ["-u", "sshd"] +""") + configs = load_watch_config(cfg) + assert len(configs) == 1 + assert configs[0].source_type == "journald" + assert configs[0].source_id == "system-journal" + assert configs[0].args == ["-u", "sshd"] + + +def test_load_watch_config_parses_podman(tmp_path: Path): + cfg = tmp_path / "watch.yaml" + cfg.write_text(""" +sources: + - type: podman + id: podman:turnstone + args: ["turnstone"] +""") + configs = load_watch_config(cfg) + assert configs[0].source_type == "podman" + assert configs[0].args == ["turnstone"] + + +def test_load_watch_config_no_args_defaults_to_empty(tmp_path: Path): + cfg = tmp_path / "watch.yaml" + cfg.write_text(""" +sources: + - type: journald + id: system +""") + configs = load_watch_config(cfg) + assert configs[0].args == [] + + +# ── Command building ───────────────────────────────────────────────────────── + +def _make_source(source_type: str, source_id: str, args: list = None, db=None, pattern=None): + if db is None: + db = Path("/tmp/fake.db") + if pattern is None: + pattern = Path("/tmp/fake.yaml") + cfg = WatchConfig(source_type=source_type, source_id=source_id, args=args or []) + return WatchSource(cfg, db, pattern) + + +def test_build_command_journald(): + src = _make_source("journald", "sys") + cmd = src._build_command() + assert cmd[:3] == ["journalctl", "-f", "--output=json"] + + +def test_build_command_journald_with_unit_filter(): + src = _make_source("journald", "sshd", args=["-u", "sshd"]) + cmd = src._build_command() + assert "-u" in cmd + assert "sshd" in cmd + + +def test_build_command_podman_with_container(): + src = _make_source("podman", "podman:ts", args=["turnstone"]) + cmd = src._build_command() + assert cmd[0] == "podman" + assert "logs" in cmd + assert "-f" in cmd + assert "turnstone" in cmd + + +def test_build_command_docker_no_args_returns_none(): + src = _make_source("docker", "docker:nginx") + cmd = src._build_command() + assert cmd is None + + +def test_build_command_unknown_type_returns_none(): + src = _make_source("kafka", "topic:logs") + cmd = src._build_command() + assert cmd is None + + +# ── Docker timestamp stripping ─────────────────────────────────────────────── + +def test_strip_docker_ts_removes_rfc3339_prefix(): + line = "2024-01-15T12:34:56.789012345Z some log line" + assert _strip_docker_ts(line) == "some log line" + + +def test_strip_docker_ts_passes_plain_lines(): + line = "plain log line without timestamp" + assert _strip_docker_ts(line) == line + + +def test_strip_docker_ts_handles_offset_timezone(): + # Docker --timestamps always uses Z (UTC), but be safe + line = "2024-01-15T12:34:56Z message" + assert _strip_docker_ts(line) == "message" + + +# ── Watcher orchestrator ───────────────────────────────────────────────────── + +def test_watcher_status_empty_when_no_sources(tmp_path: Path): + w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml") + assert w.status == [] + assert not w.is_active() + + +def test_watcher_configure_creates_sources(tmp_path: Path): + w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml") + configs = [ + WatchConfig("journald", "sys"), + WatchConfig("podman", "ts", ["turnstone"]), + ] + w.configure(configs) + assert len(w.status) == 2 + assert w.status[0]["source_id"] == "sys" + assert w.status[1]["source_id"] == "ts" + + +def test_watcher_status_shows_not_running_before_start(tmp_path: Path): + w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml") + w.configure([WatchConfig("journald", "sys")]) + assert not w.is_active() + assert w.status[0]["running"] is False diff --git a/web/src/views/DashboardView.vue b/web/src/views/DashboardView.vue index 21805bc..c211b7f 100644 --- a/web/src/views/DashboardView.vue +++ b/web/src/views/DashboardView.vue @@ -1,13 +1,28 @@