feat: add file tail source type; configure example-node watchers
- type: file uses tail -F (handles rotation) with auto-format detection - _parse_lines dispatches to journald/servarr/qbit/caddy/syslog/plaintext based on first-line format detection — same logic as batch ingest - watch.yaml updated with file type docs and example-node-specific example - scripts/journal-bridge.sh + .service written directly to example-node Xander's watch.yaml covers: system-journal-live (via bridge file), sonarr, radarr, lidarr, prowlarr, bazarr, qbittorrent, nzbget, tautulli
This commit is contained in:
parent
3ebdd4aef0
commit
4151c98f23
3 changed files with 57 additions and 7 deletions
|
|
@ -19,7 +19,9 @@ from typing import Iterator
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from app.ingest import journald as journald_parser, syslog as syslog_parser
|
from app.ingest import journald as journald_parser, syslog as syslog_parser
|
||||||
from app.ingest import plaintext as plaintext_parser
|
from app.ingest import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser
|
||||||
|
from app.ingest import qbittorrent as qbit_parser, caddy as caddy_parser
|
||||||
|
from app.ingest.pipeline import _detect_format
|
||||||
from app.ingest.base import _compile, load_patterns, now_iso
|
from app.ingest.base import _compile, load_patterns, now_iso
|
||||||
from app.ingest.pipeline import _write_batch, _SCHEMA
|
from app.ingest.pipeline import _write_batch, _SCHEMA
|
||||||
from app.services.search import build_fts_index
|
from app.services.search import build_fts_index
|
||||||
|
|
@ -147,17 +149,49 @@ class WatchSource:
|
||||||
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
|
logger.error("podman source %r requires args: [container_name]", self.config.source_id)
|
||||||
return None
|
return None
|
||||||
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
return ["podman", "logs", "-f", "--timestamps", extra[0]] + extra[1:]
|
||||||
|
if t == "file":
|
||||||
|
if not extra:
|
||||||
|
logger.error("file source %r requires args: [/path/to/log]", self.config.source_id)
|
||||||
|
return None
|
||||||
|
# -F: follow by name (handles rotation); -n 0: start from end, don't replay old data
|
||||||
|
return ["tail", "-F", "-n", "0", extra[0]]
|
||||||
logger.error("Unknown watch source type: %r", t)
|
logger.error("Unknown watch source type: %r", t)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
|
def _parse_lines(self, lines: Iterator[str], ingest_time: str, compiled) -> list[RetrievedEntry]:
|
||||||
t = self.config.source_type
|
t = self.config.source_type
|
||||||
|
sid = self.config.source_id
|
||||||
|
|
||||||
if t == "journald":
|
if t == "journald":
|
||||||
return list(journald_parser.parse(iter(lines), self.config.source_id, compiled, ingest_time))
|
return list(journald_parser.parse(iter(lines), sid, compiled, ingest_time))
|
||||||
# Docker/Podman output: "2024-01-15T12:34:56.789012345Z log line text"
|
|
||||||
# Strip the timestamp prefix, then treat as plaintext with severity detection
|
if t in ("docker", "podman"):
|
||||||
|
# Output: "2024-01-15T12:34:56.789012345Z log line text"
|
||||||
stripped = [_strip_docker_ts(ln) for ln in lines]
|
stripped = [_strip_docker_ts(ln) for ln in lines]
|
||||||
return list(plaintext_parser.parse(iter(stripped), self.config.source_id, compiled, ingest_time))
|
return list(plaintext_parser.parse(iter(stripped), sid, compiled, ingest_time))
|
||||||
|
|
||||||
|
if t == "file":
|
||||||
|
# Auto-detect format from the first non-empty line
|
||||||
|
non_empty = [ln for ln in lines if ln.strip()]
|
||||||
|
if not non_empty:
|
||||||
|
return []
|
||||||
|
fmt = _detect_format(non_empty[0])
|
||||||
|
it = iter(non_empty)
|
||||||
|
if fmt == "journald":
|
||||||
|
return list(journald_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "servarr":
|
||||||
|
return list(servarr_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "plex":
|
||||||
|
return list(plex_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "qbittorrent":
|
||||||
|
return list(qbit_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "caddy":
|
||||||
|
return list(caddy_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
if fmt == "syslog":
|
||||||
|
return list(syslog_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
return list(plaintext_parser.parse(it, sid, compiled, ingest_time))
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
def _drain(self, conn: sqlite3.Connection, compiled) -> None:
|
def _drain(self, conn: sqlite3.Connection, compiled) -> None:
|
||||||
"""Read lines from the subprocess and flush to DB periodically."""
|
"""Read lines from the subprocess and flush to DB periodically."""
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,8 @@
|
||||||
# The watcher starts automatically when Turnstone starts.
|
# The watcher starts automatically when Turnstone starts.
|
||||||
#
|
#
|
||||||
# Source types:
|
# Source types:
|
||||||
# journald — system journal via `journalctl -f -o json`
|
# journald — system journal via `journalctl -f -o json` (requires journalctl in container)
|
||||||
|
# file — tail a log file by path (handles rotation; auto-detects format)
|
||||||
# docker — container logs via `docker logs -f --timestamps <container>`
|
# docker — container logs via `docker logs -f --timestamps <container>`
|
||||||
# podman — container logs via `podman logs -f --timestamps <container>`
|
# podman — container logs via `podman logs -f --timestamps <container>`
|
||||||
#
|
#
|
||||||
|
|
|
||||||
|
|
@ -114,6 +114,21 @@ def test_build_command_unknown_type_returns_none():
|
||||||
assert cmd is None
|
assert cmd is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_file_with_path():
|
||||||
|
src = _make_source("file", "sonarr", args=["/opt/sonarr/config/logs/sonarr.0.txt"])
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd[0] == "tail"
|
||||||
|
assert "-F" in cmd
|
||||||
|
assert "-n" in cmd and "0" in cmd
|
||||||
|
assert "/opt/sonarr/config/logs/sonarr.0.txt" in cmd
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_file_no_args_returns_none():
|
||||||
|
src = _make_source("file", "sonarr")
|
||||||
|
cmd = src._build_command()
|
||||||
|
assert cmd is None
|
||||||
|
|
||||||
|
|
||||||
# ── Docker timestamp stripping ───────────────────────────────────────────────
|
# ── Docker timestamp stripping ───────────────────────────────────────────────
|
||||||
|
|
||||||
def test_strip_docker_ts_removes_rfc3339_prefix():
|
def test_strip_docker_ts_removes_rfc3339_prefix():
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue