Each WatchSource was calling build_fts_index() every 3 flushes (~30s). With 70+ active sources, this produced a near-continuous stream of FTS INSERT operations, each holding the SQLite write lock for several seconds while scanning the 5.4GB log_entries table. Every other writer (other watcher flushes, cybersec scorer) timed out with 'database is locked'. FTS index is now only updated by the glean scheduler (every 900s) and the manual `build-fts` command — both already call build_fts_index() through glean_dir(). Real-time freshness of watcher-ingested entries in FTS was ~30s before; it's now up to ~15min, which is acceptable. This is the root cause of the persistent 'database is locked' errors blocking the cybersec scorer (issue #9). Closes: #9
282 lines
10 KiB
Python
282 lines
10 KiB
Python
"""Live watch: tail active log sources and glean entries in near-real-time.
|
|
|
|
Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f)
|
|
in a daemon thread and pipes lines through the existing ingestors into SQLite.
|
|
FTS is synced incrementally after each flush.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import threading
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import yaml
|
|
|
|
from app.glean import journald as journald_parser, syslog as syslog_parser
|
|
from app.glean import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
|
|
from app.glean import qbittorrent as qbit_parser, caddy as caddy_parser
|
|
from app.db import get_conn
|
|
from app.db.schema import ensure_schema
|
|
from app.glean.pipeline import _detect_format, _write_batch
|
|
from app.glean.base import _compile, load_patterns, now_iso
|
|
from app.services.models import RetrievedEntry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FLUSH_INTERVAL_SEC = 10
|
|
FLUSH_BATCH_SIZE = 100
|
|
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass(frozen=True)
|
|
class WatchConfig:
|
|
source_type: str # "journald" | "docker" | "podman" | "file"
|
|
source_id: str
|
|
args: list[str] = field(default_factory=list) # extra CLI args
|
|
|
|
|
|
def load_watch_config(path: Path) -> list[WatchConfig]:
|
|
"""Load watch.yaml; return empty list if file absent."""
|
|
if not path.exists():
|
|
return []
|
|
raw = yaml.safe_load(path.read_text()) or {}
|
|
sources = []
|
|
for src in raw.get("sources", []):
|
|
sources.append(WatchConfig(
|
|
source_type=src["type"],
|
|
source_id=src["id"],
|
|
args=src.get("args", []),
|
|
))
|
|
return sources
|
|
|
|
|
|
# ── Per-source runner ─────────────────────────────────────────────────────────
|
|
|
|
class WatchSource:
|
|
"""Tails a single log source in a background daemon thread."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: WatchConfig,
|
|
db_path: Path,
|
|
pattern_file: Path,
|
|
) -> None:
|
|
self.config = config
|
|
self.db_path = db_path
|
|
self.pattern_file = pattern_file
|
|
self._stop = threading.Event()
|
|
self._thread: threading.Thread | None = None
|
|
self._proc: subprocess.Popen | None = None
|
|
self._last_event: str | None = None
|
|
self._entry_count: int = 0
|
|
self._error: str | None = None
|
|
|
|
@property
|
|
def status(self) -> dict:
|
|
return {
|
|
"source_id": self.config.source_id,
|
|
"type": self.config.source_type,
|
|
"running": self._thread is not None and self._thread.is_alive(),
|
|
"entries_gleaned": self._entry_count,
|
|
"last_event": self._last_event,
|
|
"error": self._error,
|
|
}
|
|
|
|
def start(self) -> None:
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(target=self._run, daemon=True, name=f"watch:{self.config.source_id}")
|
|
self._thread.start()
|
|
logger.info("Watch source started: %s (%s)", self.config.source_id, self.config.source_type)
|
|
|
|
def stop(self) -> None:
|
|
self._stop.set()
|
|
if self._proc:
|
|
try:
|
|
self._proc.terminate()
|
|
except OSError:
|
|
pass
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
logger.info("Watch source stopped: %s", self.config.source_id)
|
|
|
|
def _run(self) -> None:
|
|
patterns = load_patterns(self.pattern_file)
|
|
compiled = _compile(patterns)
|
|
|
|
ensure_schema(self.db_path)
|
|
|
|
try:
|
|
cmd = self._build_command()
|
|
if not cmd:
|
|
return
|
|
self._proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
self._drain(compiled)
|
|
except Exception as exc:
|
|
self._error = str(exc)
|
|
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
|
|
|
|
def _build_command(self) -> list[str] | None:
|
|
t = self.config.source_type
|
|
extra = self.config.args
|
|
if t == "journald":
|
|
return ["journalctl", "-f", "--output=json", "--no-pager"] + extra
|
|
if t == "docker":
|
|
if not extra:
|
|
logger.error("docker source %r requires args: [container_name]", self.config.source_id)
|
|
return None
|
|
return ["docker", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
|
if t == "podman":
|
|
if not extra:
|
|
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
|
|
return None
|
|
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
|
if t == "file":
|
|
if not extra:
|
|
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
|
|
return None
|
|
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
|
|
return ["tail", "-F", "-n", "0", extra[0]]
|
|
logger.error("Unknown watch source type: %r", t)
|
|
return None
|
|
|
|
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
|
|
t = self.config.source_type
|
|
sid = self.config.source_id
|
|
|
|
if t == "journald":
|
|
return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
|
|
|
|
if t in ("docker", "podman"):
|
|
# Output: "2024-01-15T12:34:56.789012345Z log line text"
|
|
stripped = [_strip_docker_ts(ln) for ln in lines]
|
|
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
|
|
|
|
if t == "file":
|
|
# Auto-detect format from the first non-empty line
|
|
non_empty = [ln for ln in lines if ln.strip()]
|
|
if not non_empty:
|
|
return []
|
|
fmt = _detect_format(non_empty[0])
|
|
it = iter(non_empty)
|
|
if fmt == "journald":
|
|
return list(journald_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "servarr":
|
|
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "plex":
|
|
return list(plex_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "qbittorrent":
|
|
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "caddy":
|
|
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
|
|
if fmt == "syslog":
|
|
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
|
|
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
|
|
|
|
return []
|
|
|
|
def _drain(self, compiled) -> None:
|
|
"""Read lines from the subprocess and flush to DB periodically."""
|
|
assert self._proc is not None
|
|
buffer: list[str] = []
|
|
flush_count = 0
|
|
last_flush = datetime.now(tz=timezone.utc)
|
|
|
|
while not self._stop.is_set():
|
|
assert self._proc.stdout is not None
|
|
# Non-blocking check with short readline timeout via select
|
|
import select
|
|
ready, _, _ = select.select([self._proc.stdout], [], [], 1.0)
|
|
|
|
if ready:
|
|
line = self._proc.stdout.readline()
|
|
if not line:
|
|
if not self._stop.is_set():
|
|
logger.warning("Watch process exited for %r — will retry in 5s", self.config.source_id)
|
|
self._stop.wait(5)
|
|
break
|
|
line = line.rstrip("\n")
|
|
if line:
|
|
buffer.append(line)
|
|
|
|
elapsed = (datetime.now(tz=timezone.utc) - last_flush).total_seconds()
|
|
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
|
|
|
|
if buffer and should_flush:
|
|
flush_count = self._flush(buffer, compiled, flush_count)
|
|
buffer.clear()
|
|
last_flush = datetime.now(tz=timezone.utc)
|
|
|
|
# Flush remainder
|
|
if buffer:
|
|
self._flush(buffer, compiled, flush_count)
|
|
|
|
def _flush(self, lines: list[str], compiled, flush_count: int) -> int:
|
|
ingest_time = now_iso()
|
|
try:
|
|
entries = self._parse_lines(lines, ingest_time, compiled)
|
|
if entries:
|
|
with get_conn(self.db_path) as conn:
|
|
_write_batch(conn, entries)
|
|
conn.commit()
|
|
self._entry_count += len(entries)
|
|
self._last_event = now_iso()
|
|
if entries:
|
|
self._last_event = entries[-1].timestamp_iso or self._last_event
|
|
|
|
flush_count += 1
|
|
except Exception as exc:
|
|
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
|
|
return flush_count
|
|
|
|
|
|
def _strip_docker_ts(line: str) -> str:
|
|
"""Remove leading RFC3339 timestamp that docker/podman logs -f --timestamps adds."""
|
|
# Format: "2024-01-15T12:34:56.789012345Z actual log text"
|
|
parts = line.split(" ", 1)
|
|
if len(parts) == 2 and "T" in parts[0] and parts[0].endswith("Z"):
|
|
return parts[1]
|
|
return line
|
|
|
|
|
|
# ── Orchestrator ──────────────────────────────────────────────────────────────
|
|
|
|
class Watcher:
|
|
"""Manages all active WatchSource instances."""
|
|
|
|
def __init__(self, db_path: Path, pattern_file: Path) -> None:
|
|
self.db_path = db_path
|
|
self.pattern_file = pattern_file
|
|
self._sources: list[WatchSource] = []
|
|
|
|
def configure(self, configs: list[WatchConfig]) -> None:
|
|
self._sources = [
|
|
WatchSource(c, self.db_path, self.pattern_file)
|
|
for c in configs
|
|
]
|
|
|
|
def start(self) -> None:
|
|
for src in self._sources:
|
|
src.start()
|
|
|
|
def stop(self) -> None:
|
|
for src in self._sources:
|
|
src.stop()
|
|
|
|
@property
|
|
def status(self) -> list[dict]:
|
|
return [src.status for src in self._sources]
|
|
|
|
def is_active(self) -> bool:
|
|
return any(src._thread is not None and src._thread.is_alive() for src in self._sources)
|