Merge pull request 'feat: live watch mode — tail journald/docker/podman continuously (#4)' (#16) from feat/live-watch into main

This commit is contained in:
pyr0ball 2026-05-11 15:45:30 -07:00
commit 946e369147
6 changed files with 580 additions and 14 deletions

View file

@ -11,6 +11,7 @@ import json
import os import os
import urllib.error import urllib.error
import urllib.request import urllib.request
from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
from typing import Annotated from typing import Annotated
@ -40,14 +41,31 @@ from app.services.search import (
format_results, format_results,
) )
from app.services.diagnose import diagnose as _diagnose from app.services.diagnose import diagnose as _diagnose
from app.watch.watcher import Watcher, load_watch_config
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
PREFS_PATH = DB_PATH.parent / "preferences.json" PREFS_PATH = DB_PATH.parent / "preferences.json"
DIST_DIR = Path(__file__).parent.parent / "web" / "dist" DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns"))
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None) _watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml")
@asynccontextmanager
async def _lifespan(app: FastAPI):
ensure_schema(DB_PATH)
watch_cfg_path = PATTERN_DIR / "watch.yaml"
configs = load_watch_config(watch_cfg_path)
if configs:
_watcher.configure(configs)
_watcher.start()
yield
_watcher.stop()
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
@ -57,11 +75,6 @@ app.add_middleware(
) )
@app.on_event("startup")
def _startup() -> None:
ensure_schema(DB_PATH)
_PREFS_DEFAULTS: dict = { _PREFS_DEFAULTS: dict = {
"entry_point_style": "topbar", "entry_point_style": "topbar",
"llm_url": "http://localhost:11434", "llm_url": "http://localhost:11434",
@ -271,6 +284,23 @@ def list_sources() -> dict:
return {"sources": _list_sources(DB_PATH)} return {"sources": _list_sources(DB_PATH)}
@router.get("/api/watch/status")
def watch_status() -> dict:
return {"active": _watcher.is_active(), "sources": _watcher.status}
@router.post("/api/watch/reload")
def watch_reload() -> dict:
"""Stop all watch sources and restart with current watch.yaml."""
_watcher.stop()
watch_cfg_path = PATTERN_DIR / "watch.yaml"
configs = load_watch_config(watch_cfg_path)
if configs:
_watcher.configure(configs)
_watcher.start()
return {"reloaded": True, "source_count": len(configs)}
@router.get("/api/stats") @router.get("/api/stats")
def get_stats( def get_stats(
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24, window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,

0
app/watch/__init__.py Normal file
View file

290
app/watch/watcher.py Normal file
View file

@ -0,0 +1,290 @@
"""Live watch: tail active log sources and ingest entries in near-real-time.
Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f)
in a daemon thread and pipes lines through the existing ingestors into SQLite.
FTS is synced incrementally after each flush.
"""
from __future__ import annotations
import json
import logging
import sqlite3
import subprocess
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import yaml
from app.ingest import journald as journald_parser, syslog as syslog_parser
from app.ingest import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
from app.ingest import qbittorrent as qbit_parser, caddy as caddy_parser
from app.ingest.pipeline import _detect_format
from app.ingest.base import _compile, load_patterns, now_iso
from app.ingest.pipeline import _write_batch, _SCHEMA
from app.services.search import build_fts_index
from app.services.models import RetrievedEntry
logger = logging.getLogger(__name__)
FLUSH_INTERVAL_SEC = 10
FLUSH_BATCH_SIZE = 100
FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load
# ── Config ────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class WatchConfig:
source_type: str # "journald" | "docker" | "podman" | "file"
source_id: str
args: list[str] = field(default_factory=list) # extra CLI args
def load_watch_config(path: Path) -> list[WatchConfig]:
"""Load watch.yaml; return empty list if file absent."""
if not path.exists():
return []
raw = yaml.safe_load(path.read_text()) or {}
sources = []
for src in raw.get("sources", []):
sources.append(WatchConfig(
source_type=src["type"],
source_id=src["id"],
args=src.get("args", []),
))
return sources
# ── Per-source runner ─────────────────────────────────────────────────────────
class WatchSource:
"""Tails a single log source in a background daemon thread."""
def __init__(
self,
config: WatchConfig,
db_path: Path,
pattern_file: Path,
) -> None:
self.config = config
self.db_path = db_path
self.pattern_file = pattern_file
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self._proc: subprocess.Popen | None = None
self._last_event: str | None = None
self._entry_count: int = 0
self._error: str | None = None
@property
def status(self) -> dict:
return {
"source_id": self.config.source_id,
"type": self.config.source_type,
"running": self._thread is not None and self._thread.is_alive(),
"entries_ingested": self._entry_count,
"last_event": self._last_event,
"error": self._error,
}
def start(self) -> None:
self._stop.clear()
self._thread = threading.Thread(target=self._run, daemon=True, name=f"watch:{self.config.source_id}")
self._thread.start()
logger.info("Watch source started: %s (%s)", self.config.source_id, self.config.source_type)
def stop(self) -> None:
self._stop.set()
if self._proc:
try:
self._proc.terminate()
except OSError:
pass
if self._thread:
self._thread.join(timeout=5)
logger.info("Watch source stopped: %s", self.config.source_id)
def _run(self) -> None:
patterns = load_patterns(self.pattern_file)
compiled = _compile(patterns)
conn = sqlite3.connect(str(self.db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript(_SCHEMA)
conn.commit()
try:
cmd = self._build_command()
if not cmd:
return
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
self._drain(conn, compiled)
except Exception as exc:
self._error = str(exc)
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
finally:
conn.close()
def _build_command(self) -> list[str] | None:
t = self.config.source_type
extra = self.config.args
if t == "journald":
return ["journalctl", "-f", "--output=json", "--no-pager"] + extra
if t == "docker":
if not extra:
logger.error("docker source %r requires args: [container_name]", self.config.source_id)
return None
return ["docker", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
if t == "podman":
if not extra:
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
return None
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
if t == "file":
if not extra:
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
return None
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
return ["tail", "-F", "-n", "0", extra[0]]
logger.error("Unknown watch source type: %r", t)
return None
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
t = self.config.source_type
sid = self.config.source_id
if t == "journald":
return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
if t in ("docker", "podman"):
# Output: "2024-01-15T12:34:56.789012345Z log line text"
stripped = [_strip_docker_ts(ln) for ln in lines]
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
if t == "file":
# Auto-detect format from the first non-empty line
non_empty = [ln for ln in lines if ln.strip()]
if not non_empty:
return []
fmt = _detect_format(non_empty[0])
it = iter(non_empty)
if fmt == "journald":
return list(journald_parser.parse(it, sid, compiled, ingest_time))
if fmt == "servarr":
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
if fmt == "plex":
return list(plex_parser.parse(it, sid, compiled, ingest_time))
if fmt == "qbittorrent":
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
if fmt == "caddy":
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
if fmt == "syslog":
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
return []
def _drain(self, conn: sqlite3.Connection, compiled) -> None:
"""Read lines from the subprocess and flush to DB periodically."""
assert self._proc is not None
buffer: list[str] = []
flush_count = 0
last_flush = datetime.now(tz=timezone.utc)
while not self._stop.is_set():
assert self._proc.stdout is not None
# Non-blocking check with short readline timeout via select
import select
ready, _, _ = select.select([self._proc.stdout], [], [], 1.0)
if ready:
line = self._proc.stdout.readline()
if not line:
if not self._stop.is_set():
logger.warning("Watch process exited for %r — will retry in 5s", self.config.source_id)
self._stop.wait(5)
break
line = line.rstrip("\n")
if line:
buffer.append(line)
elapsed = (datetime.now(tz=timezone.utc) - last_flush).total_seconds()
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
if buffer and should_flush:
flush_count = self._flush(conn, buffer, compiled, flush_count)
buffer.clear()
last_flush = datetime.now(tz=timezone.utc)
# Flush remainder
if buffer:
self._flush(conn, buffer, compiled, flush_count)
def _flush(self, conn: sqlite3.Connection, lines: list[str], compiled, flush_count: int) -> int:
ingest_time = now_iso()
try:
entries = self._parse_lines(lines, ingest_time, compiled)
if entries:
_write_batch(conn, entries)
conn.commit()
self._entry_count += len(entries)
self._last_event = now_iso()
if entries:
self._last_event = entries[-1].timestamp_iso or self._last_event
flush_count += 1
if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0:
build_fts_index(self.db_path)
except Exception as exc:
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
return flush_count
def _strip_docker_ts(line: str) -> str:
"""Remove leading RFC3339 timestamp that docker/podman logs -f --timestamps adds."""
# Format: "2024-01-15T12:34:56.789012345Z actual log text"
parts = line.split(" ", 1)
if len(parts) == 2 and "T" in parts[0] and parts[0].endswith("Z"):
return parts[1]
return line
# ── Orchestrator ──────────────────────────────────────────────────────────────
class Watcher:
"""Manages all active WatchSource instances."""
def __init__(self, db_path: Path, pattern_file: Path) -> None:
self.db_path = db_path
self.pattern_file = pattern_file
self._sources: list[WatchSource] = []
def configure(self, configs: list[WatchConfig]) -> None:
self._sources = [
WatchSource(c, self.db_path, self.pattern_file)
for c in configs
]
def start(self) -> None:
for src in self._sources:
src.start()
def stop(self) -> None:
for src in self._sources:
src.stop()
@property
def status(self) -> list[dict]:
return [src.status for src in self._sources]
def is_active(self) -> bool:
return any(src._thread is not None and src._thread.is_alive() for src in self._sources)

36
patterns/watch.yaml Normal file
View file

@ -0,0 +1,36 @@
# Turnstone live watch sources — entries here are tailed continuously.
# The watcher starts automatically when Turnstone starts.
#
# Source types:
# journald — system journal via `journalctl -f -o json` (requires journalctl in container)
# file — tail a log file by path (handles rotation; auto-detects format)
# docker — container logs via `docker logs -f --timestamps <container>`
# podman — container logs via `podman logs -f --timestamps <container>`
#
# For journald, optional args filter by unit:
# args: ["-u", "nginx", "-u", "sshd"]
#
# For docker/podman, args[0] is the container name (required).
#
# Leave this file empty (just the header) to disable live watching.
# Missing containers are skipped with a warning — safe to leave entries
# for services that are temporarily down.
sources: []
# ── Examples ────────────────────────────────────────────────────────────────
#
# - type: journald
# id: system-journal
#
# - type: journald
# id: sshd-journal
# args: ["-u", "sshd"]
#
# - type: podman
# id: podman:turnstone
# args: ["turnstone"]
#
# - type: docker
# id: docker:nginx
# args: ["nginx-proxy"]

174
tests/test_watch_watcher.py Normal file
View file

@ -0,0 +1,174 @@
"""Tests for app/watch/watcher.py — config loading, command building, output parsing."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from app.watch.watcher import (
WatchConfig,
WatchSource,
Watcher,
_strip_docker_ts,
load_watch_config,
)
# ── Config loading ──────────────────────────────────────────────────────────
def test_load_watch_config_missing_file(tmp_path: Path):
result = load_watch_config(tmp_path / "nonexistent.yaml")
assert result == []
def test_load_watch_config_empty_sources(tmp_path: Path):
cfg = tmp_path / "watch.yaml"
cfg.write_text("sources: []\n")
assert load_watch_config(cfg) == []
def test_load_watch_config_parses_journald(tmp_path: Path):
cfg = tmp_path / "watch.yaml"
cfg.write_text("""
sources:
- type: journald
id: system-journal
args: ["-u", "sshd"]
""")
configs = load_watch_config(cfg)
assert len(configs) == 1
assert configs[0].source_type == "journald"
assert configs[0].source_id == "system-journal"
assert configs[0].args == ["-u", "sshd"]
def test_load_watch_config_parses_podman(tmp_path: Path):
cfg = tmp_path / "watch.yaml"
cfg.write_text("""
sources:
- type: podman
id: podman:turnstone
args: ["turnstone"]
""")
configs = load_watch_config(cfg)
assert configs[0].source_type == "podman"
assert configs[0].args == ["turnstone"]
def test_load_watch_config_no_args_defaults_to_empty(tmp_path: Path):
cfg = tmp_path / "watch.yaml"
cfg.write_text("""
sources:
- type: journald
id: system
""")
configs = load_watch_config(cfg)
assert configs[0].args == []
# ── Command building ─────────────────────────────────────────────────────────
def _make_source(source_type: str, source_id: str, args: list = None, db=None, pattern=None):
if db is None:
db = Path("/tmp/fake.db")
if pattern is None:
pattern = Path("/tmp/fake.yaml")
cfg = WatchConfig(source_type=source_type, source_id=source_id, args=args or [])
return WatchSource(cfg, db, pattern)
def test_build_command_journald():
src = _make_source("journald", "sys")
cmd = src._build_command()
assert cmd[:3] == ["journalctl", "-f", "--output=json"]
def test_build_command_journald_with_unit_filter():
src = _make_source("journald", "sshd", args=["-u", "sshd"])
cmd = src._build_command()
assert "-u" in cmd
assert "sshd" in cmd
def test_build_command_podman_with_container():
src = _make_source("podman", "podman:ts", args=["turnstone"])
cmd = src._build_command()
assert cmd[0] == "podman"
assert "logs" in cmd
assert "-f" in cmd
assert "turnstone" in cmd
def test_build_command_docker_no_args_returns_none():
src = _make_source("docker", "docker:nginx")
cmd = src._build_command()
assert cmd is None
def test_build_command_unknown_type_returns_none():
src = _make_source("kafka", "topic:logs")
cmd = src._build_command()
assert cmd is None
def test_build_command_file_with_path():
src = _make_source("file", "sonarr", args=["/opt/sonarr/config/logs/sonarr.0.txt"])
cmd = src._build_command()
assert cmd[0] == "tail"
assert "-F" in cmd
assert "-n" in cmd and "0" in cmd
assert "/opt/sonarr/config/logs/sonarr.0.txt" in cmd
def test_build_command_file_no_args_returns_none():
src = _make_source("file", "sonarr")
cmd = src._build_command()
assert cmd is None
# ── Docker timestamp stripping ───────────────────────────────────────────────
def test_strip_docker_ts_removes_rfc3339_prefix():
line = "2024-01-15T12:34:56.789012345Z some log line"
assert _strip_docker_ts(line) == "some log line"
def test_strip_docker_ts_passes_plain_lines():
line = "plain log line without timestamp"
assert _strip_docker_ts(line) == line
def test_strip_docker_ts_handles_offset_timezone():
# Docker --timestamps always uses Z (UTC), but be safe
line = "2024-01-15T12:34:56Z message"
assert _strip_docker_ts(line) == "message"
# ── Watcher orchestrator ─────────────────────────────────────────────────────
def test_watcher_status_empty_when_no_sources(tmp_path: Path):
w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml")
assert w.status == []
assert not w.is_active()
def test_watcher_configure_creates_sources(tmp_path: Path):
w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml")
configs = [
WatchConfig("journald", "sys"),
WatchConfig("podman", "ts", ["turnstone"]),
]
w.configure(configs)
assert len(w.status) == 2
assert w.status[0]["source_id"] == "sys"
assert w.status[1]["source_id"] == "ts"
def test_watcher_status_shows_not_running_before_start(tmp_path: Path):
w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml")
w.configure([WatchConfig("journald", "sys")])
assert not w.is_active()
assert w.status[0]["running"] is False

View file

@ -1,13 +1,28 @@
<template> <template>
<div class="p-6 max-w-5xl mx-auto space-y-8"> <div class="p-6 max-w-5xl mx-auto space-y-8">
<!-- Data freshness banner --> <!-- Watch status + freshness row -->
<div v-if="!loading && stats" class="space-y-2">
<!-- Live watch indicator -->
<div class="flex items-center justify-end gap-2">
<span
:class="watchActive ? 'bg-green-500' : 'bg-surface-border'"
class="w-2 h-2 rounded-full flex-shrink-0"
></span>
<span :class="watchActive ? 'text-green-400' : 'text-text-dim'" class="text-xs">
{{ watchActive ? `Live — ${watchSources.length} source${watchSources.length !== 1 ? 's' : ''} watched` : 'Manual ingest mode' }}
</span>
</div>
<!-- Stale data banner -->
<div <div
v-if="!loading && stats && isStale" v-if="isStale"
class="flex items-center gap-2 rounded border border-surface-border bg-surface-raised px-4 py-2.5 text-xs text-text-dim" class="flex items-center gap-2 rounded border border-surface-border bg-surface-raised px-4 py-2.5 text-xs text-text-dim"
> >
<span class="text-sev-warn"></span> <span class="text-sev-warn"></span>
<span>Last ingested: <span class="text-text-muted">{{ shortTs(stats.last_ingested) }}</span> 24h counts reflect this window, not today.</span> <span v-if="watchActive">Live watch active — last event: <span class="text-text-muted">{{ shortTs(stats.last_ingested) }}</span>. Waiting for new entries to arrive.</span>
<span v-else>Last ingested: <span class="text-text-muted">{{ shortTs(stats.last_ingested) }}</span> 24h counts reflect this window, not today.</span>
</div>
</div> </div>
<!-- Stat cards --> <!-- Stat cards -->
@ -160,6 +175,15 @@ interface StatsResponse {
}> }>
} }
interface WatchSourceStatus {
source_id: string
type: string
running: boolean
entries_ingested: number
last_event: string | null
error: string | null
}
interface Incident { interface Incident {
id: string id: string
ended_at: string | null ended_at: string | null
@ -169,11 +193,16 @@ const stats = ref<StatsResponse | null>(null)
const loading = ref(true) const loading = ref(true)
const incidents = ref<Incident[]>([]) const incidents = ref<Incident[]>([])
const incidentsLoading = ref(true) const incidentsLoading = ref(true)
const watchSources = ref<WatchSourceStatus[]>([])
const activeIncidents = computed(() => const activeIncidents = computed(() =>
incidents.value.filter(i => !i.ended_at).length incidents.value.filter(i => !i.ended_at).length
) )
const watchActive = computed(() =>
watchSources.value.some(s => s.running)
)
const isStale = computed(() => { const isStale = computed(() => {
if (!stats.value?.last_ingested) return false if (!stats.value?.last_ingested) return false
const age = Date.now() - new Date(stats.value.last_ingested).getTime() const age = Date.now() - new Date(stats.value.last_ingested).getTime()
@ -181,7 +210,7 @@ const isStale = computed(() => {
}) })
onMounted(async () => { onMounted(async () => {
await Promise.all([loadStats(), loadIncidents()]) await Promise.all([loadStats(), loadIncidents(), loadWatchStatus()])
}) })
async function loadStats() { async function loadStats() {
@ -202,6 +231,13 @@ async function loadIncidents() {
} }
} }
async function loadWatchStatus() {
try {
const res = await fetch(`${BASE}/api/watch/status`)
if (res.ok) watchSources.value = (await res.json()).sources ?? []
} catch { /* non-critical */ }
}
function healthDot(errors: number, total: number): string { function healthDot(errors: number, total: number): string {
if (errors === 0) return 'bg-green-500' if (errors === 0) return 'bg-green-500'
const ratio = errors / Math.max(total, 1) const ratio = errors / Math.max(total, 1)