feat: live watch mode — tail journald/docker/podman continuously (#4) #16

Merged
pyr0ball merged 2 commits from feat/live-watch into main 2026-05-11 15:45:30 -07:00
3 changed files with 57 additions and 7 deletions
Showing only changes of commit c6e22304d6 - Show all commits

View file

@ -19,7 +19,9 @@ from typing import Iterator
import yaml import yaml
from app.ingest import journald as journald_parser, syslog as syslog_parser from app.ingest import journald as journald_parser, syslog as syslog_parser
from app.ingest import plaintext as plaintext_parser from app.ingest import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
from app.ingest import qbittorrent as qbit_parser, caddy as caddy_parser
from app.ingest.pipeline import _detect_format
from app.ingest.base import _compile, load_patterns, now_iso from app.ingest.base import _compile, load_patterns, now_iso
from app.ingest.pipeline import _write_batch, _SCHEMA from app.ingest.pipeline import _write_batch, _SCHEMA
from app.services.search import build_fts_index from app.services.search import build_fts_index
@ -147,17 +149,49 @@ class WatchSource:
logger.error("podman source %r requires args: [container_name]", self.config.source_id) logger.error("podman source %r requires args: [container_name]", self.config.source_id)
return None return None
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:] return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
if t == "file":
if not extra:
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
return None
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
return ["tail", "-F", "-n", "0", extra[0]]
logger.error("Unknown watch source type: %r", t) logger.error("Unknown watch source type: %r", t)
return None return None
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]: def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
t = self.config.source_type t = self.config.source_type
sid = self.config.source_id
if t == "journald": if t == "journald":
return list(journald_parser.parse(iter(lines), self.config.source_id, compiled, ingest_time)) return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
# Docker/Podman output: "2024-01-15T12:34:56.789012345Z log line text"
# Strip the timestamp prefix, then treat as plaintext with severity detection if t in ("docker", "podman"):
stripped = [_strip_docker_ts(ln) for ln in lines] # Output: "2024-01-15T12:34:56.789012345Z log line text"
return list(plaintext_parser.parse(iter(stripped), self.config.source_id, compiled, ingest_time)) stripped = [_strip_docker_ts(ln) for ln in lines]
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
if t == "file":
# Auto-detect format from the first non-empty line
non_empty = [ln for ln in lines if ln.strip()]
if not non_empty:
return []
fmt = _detect_format(non_empty[0])
it = iter(non_empty)
if fmt == "journald":
return list(journald_parser.parse(it, sid, compiled, ingest_time))
if fmt == "servarr":
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
if fmt == "plex":
return list(plex_parser.parse(it, sid, compiled, ingest_time))
if fmt == "qbittorrent":
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
if fmt == "caddy":
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
if fmt == "syslog":
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
return []
def _drain(self, conn: sqlite3.Connection, compiled) -> None: def _drain(self, conn: sqlite3.Connection, compiled) -> None:
"""Read lines from the subprocess and flush to DB periodically.""" """Read lines from the subprocess and flush to DB periodically."""

View file

@ -2,7 +2,8 @@
# The watcher starts automatically when Turnstone starts. # The watcher starts automatically when Turnstone starts.
# #
# Source types: # Source types:
# journald — system journal via `journalctl -f -o json` # journald — system journal via `journalctl -f -o json` (requires journalctl in container)
# file — tail a log file by path (handles rotation; auto-detects format)
# docker — container logs via `docker logs -f --timestamps <container>` # docker — container logs via `docker logs -f --timestamps <container>`
# podman — container logs via `podman logs -f --timestamps <container>` # podman — container logs via `podman logs -f --timestamps <container>`
# #

View file

@ -114,6 +114,21 @@ def test_build_command_unknown_type_returns_none():
assert cmd is None assert cmd is None
def test_build_command_file_with_path():
src = _make_source("file", "sonarr", args=["/opt/sonarr/config/logs/sonarr.0.txt"])
cmd = src._build_command()
assert cmd[0] == "tail"
assert "-F" in cmd
assert "-n" in cmd and "0" in cmd
assert "/opt/sonarr/config/logs/sonarr.0.txt" in cmd
def test_build_command_file_no_args_returns_none():
src = _make_source("file", "sonarr")
cmd = src._build_command()
assert cmd is None
# ── Docker timestamp stripping ─────────────────────────────────────────────── # ── Docker timestamp stripping ───────────────────────────────────────────────
def test_strip_docker_ts_removes_rfc3339_prefix(): def test_strip_docker_ts_removes_rfc3339_prefix():