From ec931598cafc095e1d26dd2c232cf1362a1837a4 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 11 May 2026 06:26:32 -0700 Subject: [PATCH] feat: multi-source ingest via sources.yaml + servarr parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add servarr.py parser for all *arr services (sonarr/radarr/lidarr/ prowlarr/readarr/whisparr/bazarr) — pipe-delimited format with component prefix prepended for searchability - Add ingest_sources() to pipeline.py; reads sources.yaml, skips missing paths with a warning so cron keeps running if a service is down - Add --sources mode to ingest_corpus.py CLI; legacy positional args unchanged for backward compat - Add patterns/sources.yaml with all of Xander's discovered service log paths (qbit, 7 servarr services, nzbget, tautulli, jellyseerr) - Replace per-service volume mounts in podman-standalone.sh with /opt:/opt:ro + /var/log:/var/log:ro; adding a new source now requires only editing sources.yaml — no container restart --- app/ingest/pipeline.py | 60 ++++++++++++++++++++++-- app/ingest/servarr.py | 99 ++++++++++++++++++++++++++++++++++++++++ patterns/sources.yaml | 46 +++++++++++++++++++ podman-standalone.sh | 40 +++++++++------- scripts/ingest_corpus.py | 72 ++++++++++++++++++++--------- 5 files changed, 274 insertions(+), 43 deletions(-) create mode 100644 app/ingest/servarr.py create mode 100644 patterns/sources.yaml diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py index cbd9fee..ea58e65 100644 --- a/app/ingest/pipeline.py +++ b/app/ingest/pipeline.py @@ -8,7 +8,9 @@ import sqlite3 from pathlib import Path from typing import Iterator -from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent +import yaml + +from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent, servarr from app.ingest.base import _compile, load_patterns, now_iso from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index @@ -95,6 +97,8 @@ def _detect_format(first_line: str) -> str: return "plex" if qbittorrent.is_qbit_log(first_line): return "qbittorrent" + if servarr.is_servarr_log(first_line): + return "servarr" return "plaintext" @@ -102,8 +106,9 @@ def _parse_file( path: Path, compiled: list[tuple[LogPattern, object]], ingest_time: str, + source_id: str | None = None, ) -> Iterator[RetrievedEntry]: - source_id = path.stem + source_id = source_id or path.stem with path.open("r", errors="replace") as f: lines = iter(f) @@ -129,6 +134,8 @@ def _parse_file( yield from plex.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "qbittorrent": yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) + elif fmt == "servarr": + yield from servarr.parse(all_lines(), source_id, compiled, ingest_time) else: yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) @@ -159,11 +166,13 @@ def _ingest_files( db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, + source_id_map: dict[Path, str] | None = None, ) -> dict[str, int]: pattern_file = pattern_file or Path("patterns/default.yaml") patterns = load_patterns(pattern_file) compiled = _compile(patterns) ingest_time = now_iso() + source_id_map = source_id_map or {} conn = sqlite3.connect(str(db_path)) conn.execute("PRAGMA journal_mode=WAL") @@ -173,9 +182,10 @@ def _ingest_files( stats: dict[str, int] = {} for log_file in files: + source_id = source_id_map.get(log_file, log_file.stem) count = 0 batch: list[RetrievedEntry] = [] - for entry in _parse_file(log_file, compiled, ingest_time): + for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) @@ -186,8 +196,8 @@ def _ingest_files( _write_batch(conn, batch) conn.commit() count += len(batch) - stats[log_file.name] = count - logger.info("Ingested %d entries from %s", count, log_file.name) + stats[source_id] = stats.get(source_id, 0) + count + logger.info("Ingested %d entries from %s (source: %s)", count, log_file.name, source_id) conn.close() @@ -216,3 +226,43 @@ def ingest_file( ) -> dict[str, int]: """Ingest a single log file (any supported format).""" return _ingest_files([log_file], db_path, pattern_file) + + +def ingest_sources( + sources_file: Path, + db_path: Path, + pattern_file: Path | None = None, + batch_size: int = 1000, +) -> dict[str, int]: + """Ingest all sources listed in a sources.yaml config file. + + sources.yaml format: + sources: + - id: sonarr + path: /opt/sonarr/config/logs/sonarr.0.txt + - id: qbittorrent + path: /opt/qbittorrent/config/data/logs/qbittorrent.log + + Missing paths are skipped with a warning so the cron keeps running + when a service is temporarily down. + """ + with open(sources_file) as f: + config = yaml.safe_load(f) + + files: list[Path] = [] + source_id_map: dict[Path, str] = {} + + for src in config.get("sources", []): + path = Path(src["path"]) + if not path.exists(): + logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path) + continue + files.append(path) + if "id" in src: + source_id_map[path] = src["id"] + + if not files: + logger.warning("No source files found — check sources.yaml paths") + return {} + + return _ingest_files(files, db_path, pattern_file, batch_size, source_id_map) diff --git a/app/ingest/servarr.py b/app/ingest/servarr.py new file mode 100644 index 0000000..357d5bd --- /dev/null +++ b/app/ingest/servarr.py @@ -0,0 +1,99 @@ +"""Servarr (*arr) log parser. + +Handles the pipe-delimited format used by Sonarr, Radarr, Lidarr, +Prowlarr, Readarr, Whisparr, and Bazarr: + + 2026-05-11 02:31:51.5|Info|ComponentName|Message text + 2024-05-09 00:02:25|INFO |root |Message text +""" +from __future__ import annotations + +import re +from datetime import datetime, timezone +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + +_LINE_RE = re.compile( + r"^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)" + r"\|(?P[^|]+)" + r"\|(?P[^|]*)" + r"\|(?P.*)$" +) + +_LEVEL_MAP: dict[str, str | None] = { + "trace": None, + "debug": "DEBUG", + "info": "INFO", + "warn": "WARN", + "warning": "WARN", + "error": "ERROR", + "fatal": "CRITICAL", +} + + +def _parse_ts(ts_str: str) -> tuple[str, str]: + base = ts_str.split(".")[0] + try: + dt = datetime.strptime(base, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) + return ts_str, dt.isoformat() + except ValueError: + return ts_str, "" + + +def is_servarr_log(first_line: str) -> bool: + return bool(_LINE_RE.match(first_line.strip())) + + +def parse( + lines: Iterator[str], + source_id: str, + compiled_patterns: list[tuple[LogPattern, object]], + ingest_time: str | None = None, +) -> Iterator[RetrievedEntry]: + ingest_time = ingest_time or now_iso() + state = SourceState() + pending_text: str | None = None + pending_meta: dict = {} + + def _emit(text: str, meta: dict) -> RetrievedEntry: + repeat, out_of_order = state.observe(text, meta.get("ts_iso")) + matched = apply_patterns(text, compiled_patterns) + return RetrievedEntry( + entry_id=make_entry_id(source_id, state.sequence, text), + source_id=source_id, + sequence=state.sequence, + timestamp_raw=meta.get("ts_raw", ""), + timestamp_iso=meta.get("ts_iso", ""), + ingest_time=ingest_time, + severity=meta.get("severity"), + repeat_count=repeat, + out_of_order=out_of_order, + matched_patterns=matched, + text=text, + ) + + for raw_line in lines: + line = raw_line.rstrip("\n") + m = _LINE_RE.match(line) + if m: + if pending_text is not None: + yield _emit(pending_text, pending_meta) + + ts_raw, ts_iso = _parse_ts(m.group("ts")) + level_key = m.group("level").strip().lower() + severity = _LEVEL_MAP.get(level_key, detect_severity(m.group("msg"))) + component = m.group("component").strip() + msg = m.group("msg") + # Prepend component so it's searchable without needing a separate column + text = f"[{component}] {msg}" if component else msg + pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity} + pending_text = text + elif pending_text is not None: + pending_text += "\n" + line.strip() + + if pending_text is not None: + yield _emit(pending_text, pending_meta) diff --git a/patterns/sources.yaml b/patterns/sources.yaml new file mode 100644 index 0000000..1421c7d --- /dev/null +++ b/patterns/sources.yaml @@ -0,0 +1,46 @@ +# Turnstone log sources — edit this file to add or remove services. +# Run ingest manually: +# sudo podman exec turnstone python scripts/ingest_corpus.py \ +# --sources /patterns/sources.yaml --db /data/turnstone.db +# +# Paths here are container-side paths under the /opt bind mount. +# Missing paths are skipped with a warning — safe to leave entries for +# services that are temporarily down. + +sources: + # ── Download ───────────────────────────────────────────────────────────── + - id: qbittorrent + path: /opt/qbittorrent/config/data/logs/qbittorrent.log + + # ── Servarr stack ───────────────────────────────────────────────────────── + - id: sonarr + path: /opt/sonarr/config/logs/sonarr.0.txt + + - id: radarr + path: /opt/radarr/config/logs/radarr.0.txt + + - id: lidarr + path: /opt/lidarr/config/logs/Lidarr.0.txt + + - id: readarr + path: /opt/readarr/config/logs/readarr.0.txt + + - id: whisparr + path: /opt/whisparr/config/logs/whisparr.0.txt + + - id: prowlarr + path: /opt/prowlarr/config/logs/prowlarr.0.txt + + - id: bazarr + path: /opt/bazarr/config/log/bazarr.log + + # ── Usenet ──────────────────────────────────────────────────────────────── + - id: nzbget + path: /opt/nzbget/config/nzbget.log + + # ── Media / Requests ───────────────────────────────────────────────────── + - id: tautulli + path: /opt/tautulli/config/logs/tautulli.log + + - id: jellyseerr + path: /opt/jellyseerr/config/logs/jellyseerr.log diff --git a/podman-standalone.sh b/podman-standalone.sh index fc5f08c..8be540b 100755 --- a/podman-standalone.sh +++ b/podman-standalone.sh @@ -13,10 +13,11 @@ # 2. Build the image (requires Docker or Podman with BuildKit/multi-stage support): # cd /opt/turnstone && podman build -t localhost/turnstone:latest . # -# 3. Create data and custom patterns directories: +# 3. Create data and patterns directories, then copy config files: # mkdir -p /opt/turnstone/{data,patterns} -# # Optionally copy default patterns as a starting point: # cp /opt/turnstone/patterns/default.yaml /opt/turnstone/patterns/ +# cp /opt/turnstone/patterns/sources.yaml /opt/turnstone/patterns/ +# # Edit sources.yaml if any paths differ on this host. # # 4. Run this script: # bash /opt/turnstone/podman-standalone.sh @@ -28,15 +29,19 @@ # sudo systemctl enable --now turnstone # # ── Ingesting logs ──────────────────────────────────────────────────────────── -# Log files on the host are bind-mounted read-only under /logs/ in the -# container. To ingest (run manually or via cron): +# All service logs under /opt are accessible inside the container. +# Sources are configured in patterns/sources.yaml (bind-mounted at /patterns/). # -# podman exec turnstone python scripts/ingest_corpus.py \ -# /logs/qbittorrent/qbittorrent.log /data/turnstone.db +# To ingest all sources (run manually or via cron): # -# Example cron (every 15 minutes): +# sudo podman exec turnstone python scripts/ingest_corpus.py \ +# --sources /patterns/sources.yaml --db /data/turnstone.db +# +# Example cron (every 15 minutes, add to root's crontab with: sudo crontab -e): # */15 * * * * podman exec turnstone python scripts/ingest_corpus.py \ -# /logs/qbittorrent/qbittorrent.log /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1 +# --sources /patterns/sources.yaml --db /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1 +# +# To add a new log source: edit /opt/turnstone/patterns/sources.yaml — no restart needed. # # ── Adding Caddy reverse proxy ──────────────────────────────────────────────── # Add to /etc/caddy/Caddyfile: @@ -68,17 +73,15 @@ TZ=America/Los_Angeles # # TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed. -# ── Log source bind mounts ──────────────────────────────────────────────────── -# Add or remove mount flags below for each service whose logs you want to ingest. -# Inside the container, paths appear under /logs// -# -QBIT_LOGS=/opt/qbittorrent/config/data/logs - # ── Turnstone container ─────────────────────────────────────────────────────── # Image is built locally — no registry auto-update label. # To update: sudo podman build -t localhost/turnstone:latest /opt/turnstone # sudo podman restart turnstone # +# /opt is mounted read-only so all service logs under /opt/*/config/logs/ are +# accessible without per-service mounts. Add new sources to patterns/sources.yaml +# — no container restart needed. +# # Must be run as root (sudo bash podman-standalone.sh) — rootful Podman only. # # Remove existing container if present (safe re-run) @@ -90,7 +93,8 @@ podman run -d \ --net=host \ -v "${DATA_DIR}:/data:Z" \ -v "${PATTERNS_DIR}:/patterns:Z" \ - -v "${QBIT_LOGS}:/logs/qbittorrent:ro" \ + -v /opt:/opt:ro \ + -v /var/log:/var/log:ro \ -e TURNSTONE_DB=/data/turnstone.db \ -e TURNSTONE_SOURCE_HOST="$(hostname)" \ -e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \ @@ -117,6 +121,8 @@ echo " | sudo tee /etc/systemd/system/turnstone.service" echo " sudo systemctl daemon-reload" echo " sudo systemctl enable --now turnstone" echo "" -echo "To ingest qBittorrent logs now:" +echo "To ingest all sources now:" echo " sudo podman exec turnstone python scripts/ingest_corpus.py \\" -echo " /logs/qbittorrent/qbittorrent.log /data/turnstone.db" +echo " --sources /patterns/sources.yaml --db /data/turnstone.db" +echo "" +echo "To add a new source: edit /opt/turnstone/patterns/sources.yaml — no restart needed." diff --git a/scripts/ingest_corpus.py b/scripts/ingest_corpus.py index 73a80f4..ca12ae6 100644 --- a/scripts/ingest_corpus.py +++ b/scripts/ingest_corpus.py @@ -1,4 +1,12 @@ -"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database.""" +"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database. + +Usage: + # Single file or directory (legacy) + python scripts/ingest_corpus.py [db_path] + + # Sources config (multi-service) + python scripts/ingest_corpus.py --sources [--db ] +""" from __future__ import annotations import logging @@ -9,28 +17,50 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") sys.path.insert(0, str(Path(__file__).parent.parent)) -from app.ingest.pipeline import ingest, ingest_file +from app.ingest.pipeline import ingest, ingest_file, ingest_sources + + +def _print_stats(stats: dict[str, int]) -> None: + total = sum(stats.values()) + for source, count in sorted(stats.items()): + print(f" {source}: {count:,}") + print(f" TOTAL: {total:,} entries") + if __name__ == "__main__": - if len(sys.argv) < 2: - print("Usage: ingest_corpus.py [db_path]", file=sys.stderr) + args = sys.argv[1:] + + if not args: + print( + "Usage:\n" + " ingest_corpus.py [db_path]\n" + " ingest_corpus.py --sources [--db ]", + file=sys.stderr, + ) sys.exit(1) - target = Path(sys.argv[1]) - db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db") - db_path.parent.mkdir(parents=True, exist_ok=True) - - print(f"Ingesting {target} → {db_path}") - - if target.is_file(): - stats = ingest_file(target, db_path) - elif target.is_dir(): - stats = ingest(target, db_path) + if args[0] == "--sources": + if len(args) < 2: + print("Usage: ingest_corpus.py --sources [--db ]", file=sys.stderr) + sys.exit(1) + sources_file = Path(args[1]) + db_path = Path("data/turnstone.db") + if "--db" in args: + db_path = Path(args[args.index("--db") + 1]) + db_path.parent.mkdir(parents=True, exist_ok=True) + print(f"Ingesting sources from {sources_file} → {db_path}") + stats = ingest_sources(sources_file, db_path) + _print_stats(stats) else: - print(f"Error: {target} is not a file or directory", file=sys.stderr) - sys.exit(1) - - total = sum(stats.values()) - for fname, count in sorted(stats.items()): - print(f" {fname}: {count:,}") - print(f" TOTAL: {total:,} entries") + target = Path(args[0]) + db_path = Path(args[1]) if len(args) > 1 else Path("data/turnstone.db") + db_path.parent.mkdir(parents=True, exist_ok=True) + print(f"Ingesting {target} → {db_path}") + if target.is_file(): + stats = ingest_file(target, db_path) + elif target.is_dir(): + stats = ingest(target, db_path) + else: + print(f"Error: {target} is not a file or directory", file=sys.stderr) + sys.exit(1) + _print_stats(stats)