- type: file uses tail -F (handles rotation) with auto-format detection - _parse_lines dispatches to journald/servarr/qbit/caddy/syslog/plaintext based on first-line format detection — same logic as batch ingest - watch.yaml updated with file type docs and example-node-specific example - scripts/journal-bridge.sh + .service written directly to example-node Xander's watch.yaml covers: system-journal-live (via bridge file), sonarr, radarr, lidarr, prowlarr, bazarr, qbittorrent, nzbget, tautulli
290 lines
11 KiB
Python
290 lines
11 KiB
Python
"""Live watch: tail active log sources and ingest entries in near-real-time.
|
|
|
|
Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f)
|
|
in a daemon thread and pipes lines through the existing ingestors into SQLite.
|
|
FTS is synced incrementally after each flush.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import subprocess
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import yaml
|
|
|
|
from app.ingest import journald as journald_parser, syslog as syslog_parser
|
|
from app.ingest import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
|
|
from app.ingest import qbittorrent as qbit_parser, caddy as caddy_parser
|
|
from app.ingest.pipeline import _detect_format
|
|
from app.ingest.base import _compile, load_patterns, now_iso
|
|
from app.ingest.pipeline import _write_batch, _SCHEMA
|
|
from app.services.search import build_fts_index
|
|
from app.services.models import RetrievedEntry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FLUSH_INTERVAL_SEC = 10
|
|
FLUSH_BATCH_SIZE = 100
|
|
FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load
|
|
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass(frozen=True)
|
|
class WatchConfig:
|
|
source_type: str # "journald" | "docker" | "podman" | "file"
|
|
source_id: str
|
|
args: list[str] = field(default_factory=list) # extra CLI args
|
|
|
|
|
|
def load_watch_config(path: Path) -> list[WatchConfig]:
|
|
"""Load watch.yaml; return empty list if file absent."""
|
|
if not path.exists():
|
|
return []
|
|
raw = yaml.safe_load(path.read_text()) or {}
|
|
sources = []
|
|
for src in raw.get("sources", []):
|
|
sources.append(WatchConfig(
|
|
source_type=src["type"],
|
|
source_id=src["id"],
|
|
args=src.get("args", []),
|
|
))
|
|
return sources
|
|
|
|
|
|
# ── Per-source runner ─────────────────────────────────────────────────────────
|
|
|
|
class WatchSource:
|
|
"""Tails a single log source in a background daemon thread."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: WatchConfig,
|
|
db_path: Path,
|
|
pattern_file: Path,
|
|
) -> None:
|
|
self.config = config
|
|
self.db_path = db_path
|
|
self.pattern_file = pattern_file
|
|
self._stop = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
self._proc: subprocess.Popen | None = None
|
|
self._last_event: str | None = None
|
|
self._entry_count: int = 0
|
|
self._error: str | None = None
|
|
|
|
@property
|
|
def status(self) -> dict:
|
|
return {
|
|
"source_id": self.config.source_id,
|
|
"type": self.config.source_type,
|
|
"running": self._thread is not None and self._thread.is_alive(),
|
|
"entries_ingested": self._entry_count,
|
|
"last_event": self._last_event,
|
|
"error": self._error,
|
|
}
|
|
|
|
def start(self) -> None:
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(target=self._run, daemon=True, name=f"watch:{self.config.source_id}")
|
|
self._thread.start()
|
|
logger.info("Watch source started: %s (%s)", self.config.source_id, self.config.source_type)
|
|
|
|
def stop(self) -> None:
|
|
self._stop.set()
|
|
if self._proc:
|
|
try:
|
|
self._proc.terminate()
|
|
except OSError:
|
|
pass
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
logger.info("Watch source stopped: %s", self.config.source_id)
|
|
|
|
def _run(self) -> None:
|
|
patterns = load_patterns(self.pattern_file)
|
|
compiled = _compile(patterns)
|
|
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
try:
|
|
cmd = self._build_command()
|
|
if not cmd:
|
|
return
|
|
self._proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
self._drain(conn, compiled)
|
|
except Exception as exc:
|
|
self._error = str(exc)
|
|
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
|
|
finally:
|
|
conn.close()
|
|
|
|
def _build_command(self) -> list[str] | None:
|
|
t = self.config.source_type
|
|
extra = self.config.args
|
|
if t == "journald":
|
|
return ["journalctl", "-f", "--output=json", "--no-pager"] + extra
|
|
if t == "docker":
|
|
if not extra:
|
|
logger.error("docker source %r requires args: [container_name]", self.config.source_id)
|
|
return None
|
|
return ["docker", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
|
if t == "podman":
|
|
if not extra:
|
|
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
|
|
return None
|
|
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
|
if t == "file":
|
|
if not extra:
|
|
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
|
|
return None
|
|
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
|
|
return ["tail", "-F", "-n", "0", extra[0]]
|
|
logger.error("Unknown watch source type: %r", t)
|
|
return None
|
|
|
|
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
|
|
t = self.config.source_type
|
|
sid = self.config.source_id
|
|
|
|
if t == "journald":
|
|
return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
|
|
|
|
if t in ("docker", "podman"):
|
|
# Output: "2024-01-15T12:34:56.789012345Z log line text"
|
|
stripped = [_strip_docker_ts(ln) for ln in lines]
|
|
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
|
|
|
|
if t == "file":
|
|
# Auto-detect format from the first non-empty line
|
|
non_empty = [ln for ln in lines if ln.strip()]
|
|
if not non_empty:
|
|
return []
|
|
fmt = _detect_format(non_empty[0])
|
|
it = iter(non_empty)
|
|
if fmt == "journald":
|
|
return list(journald_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "servarr":
|
|
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "plex":
|
|
return list(plex_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "qbittorrent":
|
|
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "caddy":
|
|
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "syslog":
|
|
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
|
|
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
|
|
|
|
return []
|
|
|
|
def _drain(self, conn: sqlite3.Connection, compiled) -> None:
|
|
"""Read lines from the subprocess and flush to DB periodically."""
|
|
assert self._proc is not None
|
|
buffer: list[str] = []
|
|
flush_count = 0
|
|
last_flush = datetime.now(tz=timezone.utc)
|
|
|
|
while not self._stop.is_set():
|
|
assert self._proc.stdout is not None
|
|
# Non-blocking check with short readline timeout via select
|
|
import select
|
|
ready, _, _ = select.select([self._proc.stdout], [], [], 1.0)
|
|
|
|
if ready:
|
|
line = self._proc.stdout.readline()
|
|
if not line:
|
|
if not self._stop.is_set():
|
|
logger.warning("Watch process exited for %r — will retry in 5s", self.config.source_id)
|
|
self._stop.wait(5)
|
|
break
|
|
line = line.rstrip("\n")
|
|
if line:
|
|
buffer.append(line)
|
|
|
|
elapsed = (datetime.now(tz=timezone.utc) - last_flush).total_seconds()
|
|
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
|
|
|
|
if buffer and should_flush:
|
|
flush_count = self._flush(conn, buffer, compiled, flush_count)
|
|
buffer.clear()
|
|
last_flush = datetime.now(tz=timezone.utc)
|
|
|
|
# Flush remainder
|
|
if buffer:
|
|
self._flush(conn, buffer, compiled, flush_count)
|
|
|
|
def _flush(self, conn: sqlite3.Connection, lines: list[str], compiled, flush_count: int) -> int:
|
|
ingest_time = now_iso()
|
|
try:
|
|
entries = self._parse_lines(lines, ingest_time, compiled)
|
|
if entries:
|
|
_write_batch(conn, entries)
|
|
conn.commit()
|
|
self._entry_count += len(entries)
|
|
self._last_event = now_iso()
|
|
if entries:
|
|
self._last_event = entries[-1].timestamp_iso or self._last_event
|
|
|
|
flush_count += 1
|
|
if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0:
|
|
build_fts_index(self.db_path)
|
|
except Exception as exc:
|
|
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
|
|
return flush_count
|
|
|
|
|
|
def _strip_docker_ts(line: str) -> str:
|
|
"""Remove leading RFC3339 timestamp that docker/podman logs -f --timestamps adds."""
|
|
# Format: "2024-01-15T12:34:56.789012345Z actual log text"
|
|
parts = line.split(" ", 1)
|
|
if len(parts) == 2 and "T" in parts[0] and parts[0].endswith("Z"):
|
|
return parts[1]
|
|
return line
|
|
|
|
|
|
# ── Orchestrator ──────────────────────────────────────────────────────────────
|
|
|
|
class Watcher:
|
|
"""Manages all active WatchSource instances."""
|
|
|
|
def __init__(self, db_path: Path, pattern_file: Path) -> None:
|
|
self.db_path = db_path
|
|
self.pattern_file = pattern_file
|
|
self._sources: list[WatchSource] = []
|
|
|
|
def configure(self, configs: list[WatchConfig]) -> None:
|
|
self._sources = [
|
|
WatchSource(c, self.db_path, self.pattern_file)
|
|
for c in configs
|
|
]
|
|
|
|
def start(self) -> None:
|
|
for src in self._sources:
|
|
src.start()
|
|
|
|
def stop(self) -> None:
|
|
for src in self._sources:
|
|
src.stop()
|
|
|
|
@property
|
|
def status(self) -> list[dict]:
|
|
return [src.status for src in self._sources]
|
|
|
|
def is_active(self) -> bool:
|
|
return any(src._thread is not None and src._thread.is_alive() for src in self._sources)
|