feat: live watch mode — tail journald/docker/podman continuously (#4) #16
6 changed files with 580 additions and 14 deletions
42
app/rest.py
42
app/rest.py
|
|
@ -11,6 +11,7 @@ import json
|
||||||
import os
|
import os
|
||||||
import urllib.error
|
import urllib.error
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
|
|
||||||
|
|
@ -40,14 +41,31 @@ from app.services.search import (
|
||||||
format_results,
|
format_results,
|
||||||
)
|
)
|
||||||
from app.services.diagnose import diagnose as _diagnose
|
from app.services.diagnose import diagnose as _diagnose
|
||||||
|
from app.watch.watcher import Watcher, load_watch_config
|
||||||
|
|
||||||
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
|
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
|
||||||
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
||||||
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
||||||
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
||||||
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
|
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
|
||||||
|
PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns"))
|
||||||
|
|
||||||
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None)
|
_watcher = Watcher(DB_PATH, PATTERN_DIR / "default.yaml")
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _lifespan(app: FastAPI):
|
||||||
|
ensure_schema(DB_PATH)
|
||||||
|
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
||||||
|
configs = load_watch_config(watch_cfg_path)
|
||||||
|
if configs:
|
||||||
|
_watcher.configure(configs)
|
||||||
|
_watcher.start()
|
||||||
|
yield
|
||||||
|
_watcher.stop()
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(title="Turnstone API", version="0.1.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
|
||||||
|
|
||||||
app.add_middleware(
|
app.add_middleware(
|
||||||
CORSMiddleware,
|
CORSMiddleware,
|
||||||
|
|
@ -57,11 +75,6 @@ app.add_middleware(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.on_event("startup")
|
|
||||||
def _startup() -> None:
|
|
||||||
ensure_schema(DB_PATH)
|
|
||||||
|
|
||||||
|
|
||||||
_PREFS_DEFAULTS: dict = {
|
_PREFS_DEFAULTS: dict = {
|
||||||
"entry_point_style": "topbar",
|
"entry_point_style": "topbar",
|
||||||
"llm_url": "http://localhost:11434",
|
"llm_url": "http://localhost:11434",
|
||||||
|
|
@ -271,6 +284,23 @@ def list_sources() -> dict:
|
||||||
return {"sources": _list_sources(DB_PATH)}
|
return {"sources": _list_sources(DB_PATH)}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/api/watch/status")
|
||||||
|
def watch_status() -> dict:
|
||||||
|
return {"active": _watcher.is_active(), "sources": _watcher.status}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/api/watch/reload")
|
||||||
|
def watch_reload() -> dict:
|
||||||
|
"""Stop all watch sources and restart with current watch.yaml."""
|
||||||
|
_watcher.stop()
|
||||||
|
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
||||||
|
configs = load_watch_config(watch_cfg_path)
|
||||||
|
if configs:
|
||||||
|
_watcher.configure(configs)
|
||||||
|
_watcher.start()
|
||||||
|
return {"reloaded": True, "source_count": len(configs)}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/api/stats")
|
@router.get("/api/stats")
|
||||||
def get_stats(
|
def get_stats(
|
||||||
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
|
window: Annotated[int, Query(ge=1, le=168, description="Hours to look back")] = 24,
|
||||||
|
|
|
||||||
0
app/watch/__init__.py
Normal file
0
app/watch/__init__.py
Normal file
290
app/watch/watcher.py
Normal file
290
app/watch/watcher.py
Normal file
|
|
@ -0,0 +1,290 @@
|
||||||
|
"""Live watch: tail active log sources and ingest entries in near-real-time.
|
||||||
|
|
||||||
|
Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f)
|
||||||
|
in a daemon thread and pipes lines through the existing ingestors into SQLite.
|
||||||
|
FTS is synced incrementally after each flush.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from app.ingest import journald as journald_parser, syslog as syslog_parser
|
||||||
|
from app.ingest import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
|
||||||
|
from app.ingest import qbittorrent as qbit_parser, caddy as caddy_parser
|
||||||
|
from app.ingest.pipeline import _detect_format
|
||||||
|
from app.ingest.base import _compile, load_patterns, now_iso
|
||||||
|
from app.ingest.pipeline import _write_batch, _SCHEMA
|
||||||
|
from app.services.search import build_fts_index
|
||||||
|
from app.services.models import RetrievedEntry
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
FLUSH_INTERVAL_SEC = 10
|
||||||
|
FLUSH_BATCH_SIZE = 100
|
||||||
|
FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load
|
||||||
|
|
||||||
|
|
||||||
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class WatchConfig:
|
||||||
|
source_type: str # "journald" | "docker" | "podman" | "file"
|
||||||
|
source_id: str
|
||||||
|
args: list[str] = field(default_factory=list) # extra CLI args
|
||||||
|
|
||||||
|
|
||||||
|
def load_watch_config(path: Path) -> list[WatchConfig]:
|
||||||
|
"""Load watch.yaml; return empty list if file absent."""
|
||||||
|
if not path.exists():
|
||||||
|
return []
|
||||||
|
raw = yaml.safe_load(path.read_text()) or {}
|
||||||
|
sources = []
|
||||||
|
for src in raw.get("sources", []):
|
||||||
|
sources.append(WatchConfig(
|
||||||
|
source_type=src["type"],
|
||||||
|
source_id=src["id"],
|
||||||
|
args=src.get("args", []),
|
||||||
|
))
|
||||||
|
return sources
|
||||||
|
|
||||||
|
|
||||||
|
# ── Per-source runner ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class WatchSource:
|
||||||
|
"""Tails a single log source in a background daemon thread."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: WatchConfig,
|
||||||
|
db_path: Path,
|
||||||
|
pattern_file: Path,
|
||||||
|
) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.db_path = db_path
|
||||||
|
self.pattern_file = pattern_file
|
||||||
|
self._stop = threading.Event()
|
||||||
|
self._thread: threading.Thread | None = None
|
||||||
|
self._proc: subprocess.Popen | None = None
|
||||||
|
self._last_event: str | None = None
|
||||||
|
self._entry_count: int = 0
|
||||||
|
self._error: str | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self) -> dict:
|
||||||
|
return {
|
||||||
|
"source_id": self.config.source_id,
|
||||||
|
"type": self.config.source_type,
|
||||||
|
"running": self._thread is not None and self._thread.is_alive(),
|
||||||
|
"entries_ingested": self._entry_count,
|
||||||
|
"last_event": self._last_event,
|
||||||
|
"error": self._error,
|
||||||
|
}
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
self._stop.clear()
|
||||||
|
self._thread = threading.Thread(target=self._run, daemon=True, name=f"watch:{self.config.source_id}")
|
||||||
|
self._thread.start()
|
||||||
|
logger.info("Watch source started: %s (%s)", self.config.source_id, self.config.source_type)
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
self._stop.set()
|
||||||
|
if self._proc:
|
||||||
|
try:
|
||||||
|
self._proc.terminate()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
if self._thread:
|
||||||
|
self._thread.join(timeout=5)
|
||||||
|
logger.info("Watch source stopped: %s", self.config.source_id)
|
||||||
|
|
||||||
|
def _run(self) -> None:
|
||||||
|
patterns = load_patterns(self.pattern_file)
|
||||||
|
compiled = _compile(patterns)
|
||||||
|
|
||||||
|
conn = sqlite3.connect(str(self.db_path))
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
conn.executescript(_SCHEMA)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
try:
|
||||||
|
cmd = self._build_command()
|
||||||
|
if not cmd:
|
||||||
|
return
|
||||||
|
self._proc = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
bufsize=1,
|
||||||
|
)
|
||||||
|
self._drain(conn, compiled)
|
||||||
|
except Exception as exc:
|
||||||
|
self._error = str(exc)
|
||||||
|
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def _build_command(self) -> list[str] | None:
|
||||||
|
t = self.config.source_type
|
||||||
|
extra = self.config.args
|
||||||
|
if t == "journald":
|
||||||
|
return ["journalctl", "-f", "--output=json", "--no-pager"] + extra
|
||||||
|
if t == "docker":
|
||||||
|
if not extra:
|
||||||
|
logger.error("docker source %r requires args: [container_name]", self.config.source_id)
|
||||||
|
return None
|
||||||
|
return ["docker", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
||||||
|
if t == "podman":
|
||||||
|
if not extra:
|
||||||
|
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
|
||||||
|
return None
|
||||||
|
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
||||||
|
if t == "file":
|
||||||
|
if not extra:
|
||||||
|
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
|
||||||
|
return None
|
||||||
|
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
|
||||||
|
return ["tail", "-F", "-n", "0", extra[0]]
|
||||||
|
logger.error("Unknown watch source type: %r", t)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
|
||||||
|
t = self.config.source_type
|
||||||
|
sid = self.config.source_id
|
||||||
|
|
||||||
|
if t == "journald":
|
||||||
|
return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
|
||||||
|
|
||||||
|
if t in ("docker", "podman"):
|
||||||
|
# Output: "2024-01-15T12:34:56.789012345Z log line text"
|
||||||
|
stripped = [_strip_docker_ts(ln) for ln in lines]
|
||||||
|
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
|
||||||
|
|
||||||
|
if t == "file":
|
||||||
|
# Auto-detect format from the first non-empty line
|
||||||
|
non_empty = [ln for ln in lines if ln.strip()]
|
||||||
|
if not non_empty:
|
||||||
|
return []
|
||||||
|
fmt = _detect_format(non_empty[0])
|
||||||
|
it = iter(non_empty)
|
||||||
|
if fmt == "journald":
|
||||||
|
return list(journald_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "servarr":
|
||||||
|
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "plex":
|
||||||
|
return list(plex_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "qbittorrent":
|
||||||
|
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "caddy":
|
||||||
|
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "syslog":
|
||||||
|
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _drain(self, conn: sqlite3.Connection, compiled) -> None:
|
||||||
|
"""Read lines from the subprocess and flush to DB periodically."""
|
||||||
|
assert self._proc is not None
|
||||||
|
buffer: list[str] = []
|
||||||
|
flush_count = 0
|
||||||
|
last_flush = datetime.now(tz=timezone.utc)
|
||||||
|
|
||||||
|
while not self._stop.is_set():
|
||||||
|
assert self._proc.stdout is not None
|
||||||
|
# Non-blocking check with short readline timeout via select
|
||||||
|
import select
|
||||||
|
ready, _, _ = select.select([self._proc.stdout], [], [], 1.0)
|
||||||
|
|
||||||
|
if ready:
|
||||||
|
line = self._proc.stdout.readline()
|
||||||
|
if not line:
|
||||||
|
if not self._stop.is_set():
|
||||||
|
logger.warning("Watch process exited for %r — will retry in 5s", self.config.source_id)
|
||||||
|
self._stop.wait(5)
|
||||||
|
break
|
||||||
|
line = line.rstrip("\n")
|
||||||
|
if line:
|
||||||
|
buffer.append(line)
|
||||||
|
|
||||||
|
elapsed = (datetime.now(tz=timezone.utc) - last_flush).total_seconds()
|
||||||
|
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
|
||||||
|
|
||||||
|
if buffer and should_flush:
|
||||||
|
flush_count = self._flush(conn, buffer, compiled, flush_count)
|
||||||
|
buffer.clear()
|
||||||
|
last_flush = datetime.now(tz=timezone.utc)
|
||||||
|
|
||||||
|
# Flush remainder
|
||||||
|
if buffer:
|
||||||
|
self._flush(conn, buffer, compiled, flush_count)
|
||||||
|
|
||||||
|
def _flush(self, conn: sqlite3.Connection, lines: list[str], compiled, flush_count: int) -> int:
|
||||||
|
ingest_time = now_iso()
|
||||||
|
try:
|
||||||
|
entries = self._parse_lines(lines, ingest_time, compiled)
|
||||||
|
if entries:
|
||||||
|
_write_batch(conn, entries)
|
||||||
|
conn.commit()
|
||||||
|
self._entry_count += len(entries)
|
||||||
|
self._last_event = now_iso()
|
||||||
|
if entries:
|
||||||
|
self._last_event = entries[-1].timestamp_iso or self._last_event
|
||||||
|
|
||||||
|
flush_count += 1
|
||||||
|
if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0:
|
||||||
|
build_fts_index(self.db_path)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
|
||||||
|
return flush_count
|
||||||
|
|
||||||
|
|
||||||
|
def _strip_docker_ts(line: str) -> str:
|
||||||
|
"""Remove leading RFC3339 timestamp that docker/podman logs -f --timestamps adds."""
|
||||||
|
# Format: "2024-01-15T12:34:56.789012345Z actual log text"
|
||||||
|
parts = line.split(" ", 1)
|
||||||
|
if len(parts) == 2 and "T" in parts[0] and parts[0].endswith("Z"):
|
||||||
|
return parts[1]
|
||||||
|
return line
|
||||||
|
|
||||||
|
|
||||||
|
# ── Orchestrator ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class Watcher:
|
||||||
|
"""Manages all active WatchSource instances."""
|
||||||
|
|
||||||
|
def __init__(self, db_path: Path, pattern_file: Path) -> None:
|
||||||
|
self.db_path = db_path
|
||||||
|
self.pattern_file = pattern_file
|
||||||
|
self._sources: list[WatchSource] = []
|
||||||
|
|
||||||
|
def configure(self, configs: list[WatchConfig]) -> None:
|
||||||
|
self._sources = [
|
||||||
|
WatchSource(c, self.db_path, self.pattern_file)
|
||||||
|
for c in configs
|
||||||
|
]
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
for src in self._sources:
|
||||||
|
src.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
for src in self._sources:
|
||||||
|
src.stop()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self) -> list[dict]:
|
||||||
|
return [src.status for src in self._sources]
|
||||||
|
|
||||||
|
def is_active(self) -> bool:
|
||||||
|
return any(src._thread is not None and src._thread.is_alive() for src in self._sources)
|
||||||
36
patterns/watch.yaml
Normal file
36
patterns/watch.yaml
Normal file
|
|
@ -0,0 +1,36 @@
|
||||||
|
# Turnstone live watch sources — entries here are tailed continuously.
|
||||||
|
# The watcher starts automatically when Turnstone starts.
|
||||||
|
#
|
||||||
|
# Source types:
|
||||||
|
# journald — system journal via `journalctl -f -o json` (requires journalctl in container)
|
||||||
|
# file — tail a log file by path (handles rotation; auto-detects format)
|
||||||
|
# docker — container logs via `docker logs -f --timestamps <container>`
|
||||||
|
# podman — container logs via `podman logs -f --timestamps <container>`
|
||||||
|
#
|
||||||
|
# For journald, optional args filter by unit:
|
||||||
|
# args: ["-u", "nginx", "-u", "sshd"]
|
||||||
|
#
|
||||||
|
# For docker/podman, args[0] is the container name (required).
|
||||||
|
#
|
||||||
|
# Leave this file empty (just the header) to disable live watching.
|
||||||
|
# Missing containers are skipped with a warning — safe to leave entries
|
||||||
|
# for services that are temporarily down.
|
||||||
|
|
||||||
|
sources: []
|
||||||
|
|
||||||
|
# ── Examples ────────────────────────────────────────────────────────────────
|
||||||
|
#
|
||||||
|
# - type: journald
|
||||||
|
# id: system-journal
|
||||||
|
#
|
||||||
|
# - type: journald
|
||||||
|
# id: sshd-journal
|
||||||
|
# args: ["-u", "sshd"]
|
||||||
|
#
|
||||||
|
# - type: podman
|
||||||
|
# id: podman:turnstone
|
||||||
|
# args: ["turnstone"]
|
||||||
|
#
|
||||||
|
# - type: docker
|
||||||
|
# id: docker:nginx
|
||||||
|
# args: ["nginx-proxy"]
|
||||||
174
tests/test_watch_watcher.py
Normal file
174
tests/test_watch_watcher.py
Normal file
|
|
@ -0,0 +1,174 @@
|
||||||
|
"""Tests for app/watch/watcher.py — config loading, command building, output parsing."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.watch.watcher import (
|
||||||
|
WatchConfig,
|
||||||
|
WatchSource,
|
||||||
|
Watcher,
|
||||||
|
_strip_docker_ts,
|
||||||
|
load_watch_config,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Config loading ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_load_watch_config_missing_file(tmp_path: Path):
|
||||||
|
result = load_watch_config(tmp_path / "nonexistent.yaml")
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_watch_config_empty_sources(tmp_path: Path):
|
||||||
|
cfg = tmp_path / "watch.yaml"
|
||||||
|
cfg.write_text("sources: []\n")
|
||||||
|
assert load_watch_config(cfg) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_watch_config_parses_journald(tmp_path: Path):
|
||||||
|
cfg = tmp_path / "watch.yaml"
|
||||||
|
cfg.write_text("""
|
||||||
|
sources:
|
||||||
|
- type: journald
|
||||||
|
id: system-journal
|
||||||
|
args: ["-u", "sshd"]
|
||||||
|
""")
|
||||||
|
configs = load_watch_config(cfg)
|
||||||
|
assert len(configs) == 1
|
||||||
|
assert configs[0].source_type == "journald"
|
||||||
|
assert configs[0].source_id == "system-journal"
|
||||||
|
assert configs[0].args == ["-u", "sshd"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_watch_config_parses_podman(tmp_path: Path):
|
||||||
|
cfg = tmp_path / "watch.yaml"
|
||||||
|
cfg.write_text("""
|
||||||
|
sources:
|
||||||
|
- type: podman
|
||||||
|
id: podman:turnstone
|
||||||
|
args: ["turnstone"]
|
||||||
|
""")
|
||||||
|
configs = load_watch_config(cfg)
|
||||||
|
assert configs[0].source_type == "podman"
|
||||||
|
assert configs[0].args == ["turnstone"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_watch_config_no_args_defaults_to_empty(tmp_path: Path):
|
||||||
|
cfg = tmp_path / "watch.yaml"
|
||||||
|
cfg.write_text("""
|
||||||
|
sources:
|
||||||
|
- type: journald
|
||||||
|
id: system
|
||||||
|
""")
|
||||||
|
configs = load_watch_config(cfg)
|
||||||
|
assert configs[0].args == []
|
||||||
|
|
||||||
|
|
||||||
|
# ── Command building ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_source(source_type: str, source_id: str, args: list = None, db=None, pattern=None):
|
||||||
|
if db is None:
|
||||||
|
db = Path("/tmp/fake.db")
|
||||||
|
if pattern is None:
|
||||||
|
pattern = Path("/tmp/fake.yaml")
|
||||||
|
cfg = WatchConfig(source_type=source_type, source_id=source_id, args=args or [])
|
||||||
|
return WatchSource(cfg, db, pattern)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_journald():
|
||||||
|
src = _make_source("journald", "sys")
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd[:3] == ["journalctl", "-f", "--output=json"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_journald_with_unit_filter():
|
||||||
|
src = _make_source("journald", "sshd", args=["-u", "sshd"])
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert "-u" in cmd
|
||||||
|
assert "sshd" in cmd
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_podman_with_container():
|
||||||
|
src = _make_source("podman", "podman:ts", args=["turnstone"])
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd[0] == "podman"
|
||||||
|
assert "logs" in cmd
|
||||||
|
assert "-f" in cmd
|
||||||
|
assert "turnstone" in cmd
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_docker_no_args_returns_none():
|
||||||
|
src = _make_source("docker", "docker:nginx")
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_unknown_type_returns_none():
|
||||||
|
src = _make_source("kafka", "topic:logs")
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_file_with_path():
|
||||||
|
src = _make_source("file", "sonarr", args=["/opt/sonarr/config/logs/sonarr.0.txt"])
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd[0] == "tail"
|
||||||
|
assert "-F" in cmd
|
||||||
|
assert "-n" in cmd and "0" in cmd
|
||||||
|
assert "/opt/sonarr/config/logs/sonarr.0.txt" in cmd
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_file_no_args_returns_none():
|
||||||
|
src = _make_source("file", "sonarr")
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd is None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Docker timestamp stripping ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_strip_docker_ts_removes_rfc3339_prefix():
|
||||||
|
line = "2024-01-15T12:34:56.789012345Z some log line"
|
||||||
|
assert _strip_docker_ts(line) == "some log line"
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_docker_ts_passes_plain_lines():
|
||||||
|
line = "plain log line without timestamp"
|
||||||
|
assert _strip_docker_ts(line) == line
|
||||||
|
|
||||||
|
|
||||||
|
def test_strip_docker_ts_handles_offset_timezone():
|
||||||
|
# Docker --timestamps always uses Z (UTC), but be safe
|
||||||
|
line = "2024-01-15T12:34:56Z message"
|
||||||
|
assert _strip_docker_ts(line) == "message"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Watcher orchestrator ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_watcher_status_empty_when_no_sources(tmp_path: Path):
|
||||||
|
w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml")
|
||||||
|
assert w.status == []
|
||||||
|
assert not w.is_active()
|
||||||
|
|
||||||
|
|
||||||
|
def test_watcher_configure_creates_sources(tmp_path: Path):
|
||||||
|
w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml")
|
||||||
|
configs = [
|
||||||
|
WatchConfig("journald", "sys"),
|
||||||
|
WatchConfig("podman", "ts", ["turnstone"]),
|
||||||
|
]
|
||||||
|
w.configure(configs)
|
||||||
|
assert len(w.status) == 2
|
||||||
|
assert w.status[0]["source_id"] == "sys"
|
||||||
|
assert w.status[1]["source_id"] == "ts"
|
||||||
|
|
||||||
|
|
||||||
|
def test_watcher_status_shows_not_running_before_start(tmp_path: Path):
|
||||||
|
w = Watcher(tmp_path / "fake.db", tmp_path / "fake.yaml")
|
||||||
|
w.configure([WatchConfig("journald", "sys")])
|
||||||
|
assert not w.is_active()
|
||||||
|
assert w.status[0]["running"] is False
|
||||||
|
|
@ -1,13 +1,28 @@
|
||||||
<template>
|
<template>
|
||||||
<div class="p-6 max-w-5xl mx-auto space-y-8">
|
<div class="p-6 max-w-5xl mx-auto space-y-8">
|
||||||
|
|
||||||
<!-- Data freshness banner -->
|
<!-- Watch status + freshness row -->
|
||||||
<div
|
<div v-if="!loading && stats" class="space-y-2">
|
||||||
v-if="!loading && stats && isStale"
|
<!-- Live watch indicator -->
|
||||||
class="flex items-center gap-2 rounded border border-surface-border bg-surface-raised px-4 py-2.5 text-xs text-text-dim"
|
<div class="flex items-center justify-end gap-2">
|
||||||
>
|
<span
|
||||||
<span class="text-sev-warn">⚠</span>
|
:class="watchActive ? 'bg-green-500' : 'bg-surface-border'"
|
||||||
<span>Last ingested: <span class="text-text-muted">{{ shortTs(stats.last_ingested) }}</span> — 24h counts reflect this window, not today.</span>
|
class="w-2 h-2 rounded-full flex-shrink-0"
|
||||||
|
></span>
|
||||||
|
<span :class="watchActive ? 'text-green-400' : 'text-text-dim'" class="text-xs">
|
||||||
|
{{ watchActive ? `Live — ${watchSources.length} source${watchSources.length !== 1 ? 's' : ''} watched` : 'Manual ingest mode' }}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<!-- Stale data banner -->
|
||||||
|
<div
|
||||||
|
v-if="isStale"
|
||||||
|
class="flex items-center gap-2 rounded border border-surface-border bg-surface-raised px-4 py-2.5 text-xs text-text-dim"
|
||||||
|
>
|
||||||
|
<span class="text-sev-warn">⚠</span>
|
||||||
|
<span v-if="watchActive">Live watch active — last event: <span class="text-text-muted">{{ shortTs(stats.last_ingested) }}</span>. Waiting for new entries to arrive.</span>
|
||||||
|
<span v-else>Last ingested: <span class="text-text-muted">{{ shortTs(stats.last_ingested) }}</span> — 24h counts reflect this window, not today.</span>
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<!-- Stat cards -->
|
<!-- Stat cards -->
|
||||||
|
|
@ -160,6 +175,15 @@ interface StatsResponse {
|
||||||
}>
|
}>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface WatchSourceStatus {
|
||||||
|
source_id: string
|
||||||
|
type: string
|
||||||
|
running: boolean
|
||||||
|
entries_ingested: number
|
||||||
|
last_event: string | null
|
||||||
|
error: string | null
|
||||||
|
}
|
||||||
|
|
||||||
interface Incident {
|
interface Incident {
|
||||||
id: string
|
id: string
|
||||||
ended_at: string | null
|
ended_at: string | null
|
||||||
|
|
@ -169,11 +193,16 @@ const stats = ref<StatsResponse | null>(null)
|
||||||
const loading = ref(true)
|
const loading = ref(true)
|
||||||
const incidents = ref<Incident[]>([])
|
const incidents = ref<Incident[]>([])
|
||||||
const incidentsLoading = ref(true)
|
const incidentsLoading = ref(true)
|
||||||
|
const watchSources = ref<WatchSourceStatus[]>([])
|
||||||
|
|
||||||
const activeIncidents = computed(() =>
|
const activeIncidents = computed(() =>
|
||||||
incidents.value.filter(i => !i.ended_at).length
|
incidents.value.filter(i => !i.ended_at).length
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const watchActive = computed(() =>
|
||||||
|
watchSources.value.some(s => s.running)
|
||||||
|
)
|
||||||
|
|
||||||
const isStale = computed(() => {
|
const isStale = computed(() => {
|
||||||
if (!stats.value?.last_ingested) return false
|
if (!stats.value?.last_ingested) return false
|
||||||
const age = Date.now() - new Date(stats.value.last_ingested).getTime()
|
const age = Date.now() - new Date(stats.value.last_ingested).getTime()
|
||||||
|
|
@ -181,7 +210,7 @@ const isStale = computed(() => {
|
||||||
})
|
})
|
||||||
|
|
||||||
onMounted(async () => {
|
onMounted(async () => {
|
||||||
await Promise.all([loadStats(), loadIncidents()])
|
await Promise.all([loadStats(), loadIncidents(), loadWatchStatus()])
|
||||||
})
|
})
|
||||||
|
|
||||||
async function loadStats() {
|
async function loadStats() {
|
||||||
|
|
@ -202,6 +231,13 @@ async function loadIncidents() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function loadWatchStatus() {
|
||||||
|
try {
|
||||||
|
const res = await fetch(`${BASE}/api/watch/status`)
|
||||||
|
if (res.ok) watchSources.value = (await res.json()).sources ?? []
|
||||||
|
} catch { /* non-critical */ }
|
||||||
|
}
|
||||||
|
|
||||||
function healthDot(errors: number, total: number): string {
|
function healthDot(errors: number, total: number): string {
|
||||||
if (errors === 0) return 'bg-green-500'
|
if (errors === 0) return 'bg-green-500'
|
||||||
const ratio = errors / Math.max(total, 1)
|
const ratio = errors / Math.max(total, 1)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue