feat: multi-source ingest via sources.yaml + servarr parser

- Add servarr.py parser for all *arr services (sonarr/radarr/lidarr/
  prowlarr/readarr/whisparr/bazarr) — pipe-delimited format with
  component prefix prepended for searchability
- Add ingest_sources() to pipeline.py; reads sources.yaml, skips
  missing paths with a warning so cron keeps running if a service
  is down
- Add --sources mode to ingest_corpus.py CLI; legacy positional args
  unchanged for backward compat
- Add patterns/sources.yaml with all of Xander's discovered service
  log paths (qbit, 7 servarr services, nzbget, tautulli, jellyseerr)
- Replace per-service volume mounts in podman-standalone.sh with
  /opt:/opt:ro + /var/log:/var/log:ro; adding a new source now
  requires only editing sources.yaml — no container restart
This commit is contained in:
pyr0ball 2026-05-11 06:26:32 -07:00
parent afe3513e09
commit f9691277d8
5 changed files with 274 additions and 43 deletions

View file

@ -8,7 +8,9 @@ import sqlite3
from pathlib import Path from pathlib import Path
from typing import Iterator from typing import Iterator
from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent import yaml
from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent, servarr
from app.ingest.base import _compile, load_patterns, now_iso from app.ingest.base import _compile, load_patterns, now_iso
from app.services.models import LogPattern, RetrievedEntry from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index from app.services.search import build_fts_index
@ -95,6 +97,8 @@ def _detect_format(first_line: str) -> str:
return "plex" return "plex"
if qbittorrent.is_qbit_log(first_line): if qbittorrent.is_qbit_log(first_line):
return "qbittorrent" return "qbittorrent"
if servarr.is_servarr_log(first_line):
return "servarr"
return "plaintext" return "plaintext"
@ -102,8 +106,9 @@ def _parse_file(
path: Path, path: Path,
compiled: list[tuple[LogPattern, object]], compiled: list[tuple[LogPattern, object]],
ingest_time: str, ingest_time: str,
source_id: str | None = None,
) -> Iterator[RetrievedEntry]: ) -> Iterator[RetrievedEntry]:
source_id = path.stem source_id = source_id or path.stem
with path.open("r", errors="replace") as f: with path.open("r", errors="replace") as f:
lines = iter(f) lines = iter(f)
@ -129,6 +134,8 @@ def _parse_file(
yield from plex.parse(all_lines(), source_id, compiled, ingest_time) yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "qbittorrent": elif fmt == "qbittorrent":
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time) yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "servarr":
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
else: else:
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
@ -159,11 +166,13 @@ def _ingest_files(
db_path: Path, db_path: Path,
pattern_file: Path | None = None, pattern_file: Path | None = None,
batch_size: int = 1000, batch_size: int = 1000,
source_id_map: dict[Path, str] | None = None,
) -> dict[str, int]: ) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml") pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file) patterns = load_patterns(pattern_file)
compiled = _compile(patterns) compiled = _compile(patterns)
ingest_time = now_iso() ingest_time = now_iso()
source_id_map = source_id_map or {}
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
@ -173,9 +182,10 @@ def _ingest_files(
stats: dict[str, int] = {} stats: dict[str, int] = {}
for log_file in files: for log_file in files:
source_id = source_id_map.get(log_file, log_file.stem)
count = 0 count = 0
batch: list[RetrievedEntry] = [] batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time): for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
batch.append(entry) batch.append(entry)
if len(batch) >= batch_size: if len(batch) >= batch_size:
_write_batch(conn, batch) _write_batch(conn, batch)
@ -186,8 +196,8 @@ def _ingest_files(
_write_batch(conn, batch) _write_batch(conn, batch)
conn.commit() conn.commit()
count += len(batch) count += len(batch)
stats[log_file.name] = count stats[source_id] = stats.get(source_id, 0) + count
logger.info("Ingested %d entries from %s", count, log_file.name) logger.info("Ingested %d entries from %s (source: %s)", count, log_file.name, source_id)
conn.close() conn.close()
@ -216,3 +226,43 @@ def ingest_file(
) -> dict[str, int]: ) -> dict[str, int]:
"""Ingest a single log file (any supported format).""" """Ingest a single log file (any supported format)."""
return _ingest_files([log_file], db_path, pattern_file) return _ingest_files([log_file], db_path, pattern_file)
def ingest_sources(
sources_file: Path,
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
) -> dict[str, int]:
"""Ingest all sources listed in a sources.yaml config file.
sources.yaml format:
sources:
- id: sonarr
path: /opt/sonarr/config/logs/sonarr.0.txt
- id: qbittorrent
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
Missing paths are skipped with a warning so the cron keeps running
when a service is temporarily down.
"""
with open(sources_file) as f:
config = yaml.safe_load(f)
files: list[Path] = []
source_id_map: dict[Path, str] = {}
for src in config.get("sources", []):
path = Path(src["path"])
if not path.exists():
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
continue
files.append(path)
if "id" in src:
source_id_map[path] = src["id"]
if not files:
logger.warning("No source files found — check sources.yaml paths")
return {}
return _ingest_files(files, db_path, pattern_file, batch_size, source_id_map)

99
app/ingest/servarr.py Normal file
View file

@ -0,0 +1,99 @@
"""Servarr (*arr) log parser.
Handles the pipe-delimited format used by Sonarr, Radarr, Lidarr,
Prowlarr, Readarr, Whisparr, and Bazarr:
2026-05-11 02:31:51.5|Info|ComponentName|Message text
2024-05-09 00:02:25|INFO |root |Message text
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
_LINE_RE = re.compile(
r"^(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)"
r"\|(?P<level>[^|]+)"
r"\|(?P<component>[^|]*)"
r"\|(?P<msg>.*)$"
)
_LEVEL_MAP: dict[str, str | None] = {
"trace": None,
"debug": "DEBUG",
"info": "INFO",
"warn": "WARN",
"warning": "WARN",
"error": "ERROR",
"fatal": "CRITICAL",
}
def _parse_ts(ts_str: str) -> tuple[str, str]:
base = ts_str.split(".")[0]
try:
dt = datetime.strptime(base, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
return ts_str, dt.isoformat()
except ValueError:
return ts_str, ""
def is_servarr_log(first_line: str) -> bool:
return bool(_LINE_RE.match(first_line.strip()))
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
pending_text: str | None = None
pending_meta: dict = {}
def _emit(text: str, meta: dict) -> RetrievedEntry:
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
matched = apply_patterns(text, compiled_patterns)
return RetrievedEntry(
entry_id=make_entry_id(source_id, state.sequence, text),
source_id=source_id,
sequence=state.sequence,
timestamp_raw=meta.get("ts_raw", ""),
timestamp_iso=meta.get("ts_iso", ""),
ingest_time=ingest_time,
severity=meta.get("severity"),
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)
for raw_line in lines:
line = raw_line.rstrip("\n")
m = _LINE_RE.match(line)
if m:
if pending_text is not None:
yield _emit(pending_text, pending_meta)
ts_raw, ts_iso = _parse_ts(m.group("ts"))
level_key = m.group("level").strip().lower()
severity = _LEVEL_MAP.get(level_key, detect_severity(m.group("msg")))
component = m.group("component").strip()
msg = m.group("msg")
# Prepend component so it's searchable without needing a separate column
text = f"[{component}] {msg}" if component else msg
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
pending_text = text
elif pending_text is not None:
pending_text += "\n" + line.strip()
if pending_text is not None:
yield _emit(pending_text, pending_meta)

46
patterns/sources.yaml Normal file
View file

@ -0,0 +1,46 @@
# Turnstone log sources — edit this file to add or remove services.
# Run ingest manually:
# sudo podman exec turnstone python scripts/ingest_corpus.py \
# --sources /patterns/sources.yaml --db /data/turnstone.db
#
# Paths here are container-side paths under the /opt bind mount.
# Missing paths are skipped with a warning — safe to leave entries for
# services that are temporarily down.
sources:
# ── Download ─────────────────────────────────────────────────────────────
- id: qbittorrent
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
# ── Servarr stack ─────────────────────────────────────────────────────────
- id: sonarr
path: /opt/sonarr/config/logs/sonarr.0.txt
- id: radarr
path: /opt/radarr/config/logs/radarr.0.txt
- id: lidarr
path: /opt/lidarr/config/logs/Lidarr.0.txt
- id: readarr
path: /opt/readarr/config/logs/readarr.0.txt
- id: whisparr
path: /opt/whisparr/config/logs/whisparr.0.txt
- id: prowlarr
path: /opt/prowlarr/config/logs/prowlarr.0.txt
- id: bazarr
path: /opt/bazarr/config/log/bazarr.log
# ── Usenet ────────────────────────────────────────────────────────────────
- id: nzbget
path: /opt/nzbget/config/nzbget.log
# ── Media / Requests ─────────────────────────────────────────────────────
- id: tautulli
path: /opt/tautulli/config/logs/tautulli.log
- id: jellyseerr
path: /opt/jellyseerr/config/logs/jellyseerr.log

View file

@ -13,10 +13,11 @@
# 2. Build the image (requires Docker or Podman with BuildKit/multi-stage support): # 2. Build the image (requires Docker or Podman with BuildKit/multi-stage support):
# cd /opt/turnstone && podman build -t localhost/turnstone:latest . # cd /opt/turnstone && podman build -t localhost/turnstone:latest .
# #
# 3. Create data and custom patterns directories: # 3. Create data and patterns directories, then copy config files:
# mkdir -p /opt/turnstone/{data,patterns} # mkdir -p /opt/turnstone/{data,patterns}
# # Optionally copy default patterns as a starting point:
# cp /opt/turnstone/patterns/default.yaml /opt/turnstone/patterns/ # cp /opt/turnstone/patterns/default.yaml /opt/turnstone/patterns/
# cp /opt/turnstone/patterns/sources.yaml /opt/turnstone/patterns/
# # Edit sources.yaml if any paths differ on this host.
# #
# 4. Run this script: # 4. Run this script:
# bash /opt/turnstone/podman-standalone.sh # bash /opt/turnstone/podman-standalone.sh
@ -28,15 +29,19 @@
# sudo systemctl enable --now turnstone # sudo systemctl enable --now turnstone
# #
# ── Ingesting logs ──────────────────────────────────────────────────────────── # ── Ingesting logs ────────────────────────────────────────────────────────────
# Log files on the host are bind-mounted read-only under /logs/ in the # All service logs under /opt are accessible inside the container.
# container. To ingest (run manually or via cron): # Sources are configured in patterns/sources.yaml (bind-mounted at /patterns/).
# #
# podman exec turnstone python scripts/ingest_corpus.py \ # To ingest all sources (run manually or via cron):
# /logs/qbittorrent/qbittorrent.log /data/turnstone.db
# #
# Example cron (every 15 minutes): # sudo podman exec turnstone python scripts/ingest_corpus.py \
# --sources /patterns/sources.yaml --db /data/turnstone.db
#
# Example cron (every 15 minutes, add to root's crontab with: sudo crontab -e):
# */15 * * * * podman exec turnstone python scripts/ingest_corpus.py \ # */15 * * * * podman exec turnstone python scripts/ingest_corpus.py \
# /logs/qbittorrent/qbittorrent.log /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1 # --sources /patterns/sources.yaml --db /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1
#
# To add a new log source: edit /opt/turnstone/patterns/sources.yaml — no restart needed.
# #
# ── Adding Caddy reverse proxy ──────────────────────────────────────────────── # ── Adding Caddy reverse proxy ────────────────────────────────────────────────
# Add to /etc/caddy/Caddyfile: # Add to /etc/caddy/Caddyfile:
@ -68,17 +73,15 @@ TZ=America/Los_Angeles
# #
# TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed. # TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed.
# ── Log source bind mounts ────────────────────────────────────────────────────
# Add or remove mount flags below for each service whose logs you want to ingest.
# Inside the container, paths appear under /logs/<service>/
#
QBIT_LOGS=/opt/qbittorrent/config/data/logs
# ── Turnstone container ─────────────────────────────────────────────────────── # ── Turnstone container ───────────────────────────────────────────────────────
# Image is built locally — no registry auto-update label. # Image is built locally — no registry auto-update label.
# To update: sudo podman build -t localhost/turnstone:latest /opt/turnstone # To update: sudo podman build -t localhost/turnstone:latest /opt/turnstone
# sudo podman restart turnstone # sudo podman restart turnstone
# #
# /opt is mounted read-only so all service logs under /opt/*/config/logs/ are
# accessible without per-service mounts. Add new sources to patterns/sources.yaml
# — no container restart needed.
#
# Must be run as root (sudo bash podman-standalone.sh) — rootful Podman only. # Must be run as root (sudo bash podman-standalone.sh) — rootful Podman only.
# #
# Remove existing container if present (safe re-run) # Remove existing container if present (safe re-run)
@ -90,7 +93,8 @@ podman run -d \
--net=host \ --net=host \
-v "${DATA_DIR}:/data:Z" \ -v "${DATA_DIR}:/data:Z" \
-v "${PATTERNS_DIR}:/patterns:Z" \ -v "${PATTERNS_DIR}:/patterns:Z" \
-v "${QBIT_LOGS}:/logs/qbittorrent:ro" \ -v /opt:/opt:ro \
-v /var/log:/var/log:ro \
-e TURNSTONE_DB=/data/turnstone.db \ -e TURNSTONE_DB=/data/turnstone.db \
-e TURNSTONE_SOURCE_HOST="$(hostname)" \ -e TURNSTONE_SOURCE_HOST="$(hostname)" \
-e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \ -e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \
@ -117,6 +121,8 @@ echo " | sudo tee /etc/systemd/system/turnstone.service"
echo " sudo systemctl daemon-reload" echo " sudo systemctl daemon-reload"
echo " sudo systemctl enable --now turnstone" echo " sudo systemctl enable --now turnstone"
echo "" echo ""
echo "To ingest qBittorrent logs now:" echo "To ingest all sources now:"
echo " sudo podman exec turnstone python scripts/ingest_corpus.py \\" echo " sudo podman exec turnstone python scripts/ingest_corpus.py \\"
echo " /logs/qbittorrent/qbittorrent.log /data/turnstone.db" echo " --sources /patterns/sources.yaml --db /data/turnstone.db"
echo ""
echo "To add a new source: edit /opt/turnstone/patterns/sources.yaml — no restart needed."

View file

@ -1,4 +1,12 @@
"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database.""" """CLI: ingest a log file or corpus directory into the Turnstone SQLite database.
Usage:
# Single file or directory (legacy)
python scripts/ingest_corpus.py <file_or_dir> [db_path]
# Sources config (multi-service)
python scripts/ingest_corpus.py --sources <sources.yaml> [--db <db_path>]
"""
from __future__ import annotations from __future__ import annotations
import logging import logging
@ -9,19 +17,45 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from app.ingest.pipeline import ingest, ingest_file from app.ingest.pipeline import ingest, ingest_file, ingest_sources
def _print_stats(stats: dict[str, int]) -> None:
total = sum(stats.values())
for source, count in sorted(stats.items()):
print(f" {source}: {count:,}")
print(f" TOTAL: {total:,} entries")
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: args = sys.argv[1:]
print("Usage: ingest_corpus.py <file_or_dir> [db_path]", file=sys.stderr)
if not args:
print(
"Usage:\n"
" ingest_corpus.py <file_or_dir> [db_path]\n"
" ingest_corpus.py --sources <sources.yaml> [--db <db_path>]",
file=sys.stderr,
)
sys.exit(1) sys.exit(1)
target = Path(sys.argv[1]) if args[0] == "--sources":
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db") if len(args) < 2:
print("Usage: ingest_corpus.py --sources <sources.yaml> [--db <db_path>]", file=sys.stderr)
sys.exit(1)
sources_file = Path(args[1])
db_path = Path("data/turnstone.db")
if "--db" in args:
db_path = Path(args[args.index("--db") + 1])
db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Ingesting sources from {sources_file}{db_path}")
stats = ingest_sources(sources_file, db_path)
_print_stats(stats)
else:
target = Path(args[0])
db_path = Path(args[1]) if len(args) > 1 else Path("data/turnstone.db")
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Ingesting {target}{db_path}") print(f"Ingesting {target}{db_path}")
if target.is_file(): if target.is_file():
stats = ingest_file(target, db_path) stats = ingest_file(target, db_path)
elif target.is_dir(): elif target.is_dir():
@ -29,8 +63,4 @@ if __name__ == "__main__":
else: else:
print(f"Error: {target} is not a file or directory", file=sys.stderr) print(f"Error: {target} is not a file or directory", file=sys.stderr)
sys.exit(1) sys.exit(1)
_print_stats(stats)
total = sum(stats.values())
for fname, count in sorted(stats.items()):
print(f" {fname}: {count:,}")
print(f" TOTAL: {total:,} entries")