turnstone/app/watch/watcher.py
pyr0ball 8efd7f6745 feat: dual-backend SQLite/Postgres + multi-tenant source namespacing
- Add app/db/ abstraction layer: Backend enum, DbConn wrapper,
  dialect helper (q() for ? vs %s paramstyle), get_conn(), tenant_id()
- Auto-detect backend from DATABASE_URL; SQLite remains default when
  unset — no config change for local deployments
- Add tenant_id column to all three logical DBs (main, context, incidents);
  idempotent ALTER TABLE migration runs before schema scripts on existing DBs
- All INSERTs inject tenant_id; SELECTs use (tenant_id = ? OR tenant_id = '')
  for backward compat with pre-namespacing rows
- Add docker-compose.yml with named volume turnstone_pgdata (survives rebuilds)
  and optional external Postgres support via DATABASE_URL override
- Add scripts/migrate_sqlite_to_postgres.py — one-shot idempotent migration
  for existing SQLite data; ON CONFLICT DO NOTHING for safe re-runs
- Fix SSH glean path in pipeline.py to use ensure_schema + get_conn
  (was still using raw sqlite3.connect + old _SCHEMA without tenant_id)
- Fix FTS5 JOIN ambiguity: qualify repeat_count as f.repeat_count in search
- Update all tests to use ensure_*_schema fixtures; add row_factory where needed
- 394/394 tests passing

Closes: #42
Closes: #50
2026-06-08 08:37:54 -07:00

286 lines
11 KiB
Python

"""Live watch: tail active log sources and glean entries in near-real-time.
Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f)
in a daemon thread and pipes lines through the existing ingestors into SQLite.
FTS is synced incrementally after each flush.
"""
from __future__ import annotations
import json
import logging
import subprocess
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import yaml
from app.glean import journald as journald_parser, syslog as syslog_parser
from app.glean import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
from app.glean import qbittorrent as qbit_parser, caddy as caddy_parser
from app.db import get_conn
from app.db.schema import ensure_schema
from app.glean.pipeline import _detect_format, _write_batch
from app.glean.base import _compile, load_patterns, now_iso
from app.services.search import build_fts_index
from app.services.models import RetrievedEntry
logger = logging.getLogger(__name__)
FLUSH_INTERVAL_SEC = 10
FLUSH_BATCH_SIZE = 100
FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load
# ── Config ────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class WatchConfig:
source_type: str # "journald" | "docker" | "podman" | "file"
source_id: str
args: list[str] = field(default_factory=list) # extra CLI args
def load_watch_config(path: Path) -> list[WatchConfig]:
"""Load watch.yaml; return empty list if file absent."""
if not path.exists():
return []
raw = yaml.safe_load(path.read_text()) or {}
sources = []
for src in raw.get("sources", []):
sources.append(WatchConfig(
source_type=src["type"],
source_id=src["id"],
args=src.get("args", []),
))
return sources
# ── Per-source runner ─────────────────────────────────────────────────────────
class WatchSource:
"""Tails a single log source in a background daemon thread."""
def __init__(
self,
config: WatchConfig,
db_path: Path,
pattern_file: Path,
) -> None:
self.config = config
self.db_path = db_path
self.pattern_file = pattern_file
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self._proc: subprocess.Popen | None = None
self._last_event: str | None = None
self._entry_count: int = 0
self._error: str | None = None
@property
def status(self) -> dict:
return {
"source_id": self.config.source_id,
"type": self.config.source_type,
"running": self._thread is not None and self._thread.is_alive(),
"entries_gleaned": self._entry_count,
"last_event": self._last_event,
"error": self._error,
}
def start(self) -> None:
self._stop.clear()
self._thread = threading.Thread(target=self._run, daemon=True, name=f"watch:{self.config.source_id}")
self._thread.start()
logger.info("Watch source started: %s (%s)", self.config.source_id, self.config.source_type)
def stop(self) -> None:
self._stop.set()
if self._proc:
try:
self._proc.terminate()
except OSError:
pass
if self._thread:
self._thread.join(timeout=5)
logger.info("Watch source stopped: %s", self.config.source_id)
def _run(self) -> None:
patterns = load_patterns(self.pattern_file)
compiled = _compile(patterns)
ensure_schema(self.db_path)
with get_conn(self.db_path) as conn:
try:
cmd = self._build_command()
if not cmd:
return
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
self._drain(conn, compiled)
except Exception as exc:
self._error = str(exc)
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
def _build_command(self) -> list[str] | None:
t = self.config.source_type
extra = self.config.args
if t == "journald":
return ["journalctl", "-f", "--output=json", "--no-pager"] + extra
if t == "docker":
if not extra:
logger.error("docker source %r requires args: [container_name]", self.config.source_id)
return None
return ["docker", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
if t == "podman":
if not extra:
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
return None
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
if t == "file":
if not extra:
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
return None
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
return ["tail", "-F", "-n", "0", extra[0]]
logger.error("Unknown watch source type: %r", t)
return None
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
t = self.config.source_type
sid = self.config.source_id
if t == "journald":
return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
if t in ("docker", "podman"):
# Output: "2024-01-15T12:34:56.789012345Z log line text"
stripped = [_strip_docker_ts(ln) for ln in lines]
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
if t == "file":
# Auto-detect format from the first non-empty line
non_empty = [ln for ln in lines if ln.strip()]
if not non_empty:
return []
fmt = _detect_format(non_empty[0])
it = iter(non_empty)
if fmt == "journald":
return list(journald_parser.parse(it, sid, compiled, ingest_time))
if fmt == "servarr":
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
if fmt == "plex":
return list(plex_parser.parse(it, sid, compiled, ingest_time))
if fmt == "qbittorrent":
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
if fmt == "caddy":
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
if fmt == "syslog":
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
return []
def _drain(self, conn, compiled) -> None:
"""Read lines from the subprocess and flush to DB periodically."""
assert self._proc is not None
buffer: list[str] = []
flush_count = 0
last_flush = datetime.now(tz=timezone.utc)
while not self._stop.is_set():
assert self._proc.stdout is not None
# Non-blocking check with short readline timeout via select
import select
ready, _, _ = select.select([self._proc.stdout], [], [], 1.0)
if ready:
line = self._proc.stdout.readline()
if not line:
if not self._stop.is_set():
logger.warning("Watch process exited for %r — will retry in 5s", self.config.source_id)
self._stop.wait(5)
break
line = line.rstrip("\n")
if line:
buffer.append(line)
elapsed = (datetime.now(tz=timezone.utc) - last_flush).total_seconds()
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
if buffer and should_flush:
flush_count = self._flush(conn, buffer, compiled, flush_count)
buffer.clear()
last_flush = datetime.now(tz=timezone.utc)
# Flush remainder
if buffer:
self._flush(conn, buffer, compiled, flush_count)
def _flush(self, conn, lines: list[str], compiled, flush_count: int) -> int:
ingest_time = now_iso()
try:
entries = self._parse_lines(lines, ingest_time, compiled)
if entries:
_write_batch(conn, entries)
conn.commit()
self._entry_count += len(entries)
self._last_event = now_iso()
if entries:
self._last_event = entries[-1].timestamp_iso or self._last_event
flush_count += 1
if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0:
build_fts_index(self.db_path)
except Exception as exc:
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
return flush_count
def _strip_docker_ts(line: str) -> str:
"""Remove leading RFC3339 timestamp that docker/podman logs -f --timestamps adds."""
# Format: "2024-01-15T12:34:56.789012345Z actual log text"
parts = line.split(" ", 1)
if len(parts) == 2 and "T" in parts[0] and parts[0].endswith("Z"):
return parts[1]
return line
# ── Orchestrator ──────────────────────────────────────────────────────────────
class Watcher:
"""Manages all active WatchSource instances."""
def __init__(self, db_path: Path, pattern_file: Path) -> None:
self.db_path = db_path
self.pattern_file = pattern_file
self._sources: list[WatchSource] = []
def configure(self, configs: list[WatchConfig]) -> None:
self._sources = [
WatchSource(c, self.db_path, self.pattern_file)
for c in configs
]
def start(self) -> None:
for src in self._sources:
src.start()
def stop(self) -> None:
for src in self._sources:
src.stop()
@property
def status(self) -> list[dict]:
return [src.status for src in self._sources]
def is_active(self) -> bool:
return any(src._thread is not None and src._thread.is_alive() for src in self._sources)