feat: multi-source ingest via sources.yaml + servarr parser
- Add servarr.py parser for all *arr services (sonarr/radarr/lidarr/ prowlarr/readarr/whisparr/bazarr) — pipe-delimited format with component prefix prepended for searchability - Add ingest_sources() to pipeline.py; reads sources.yaml, skips missing paths with a warning so cron keeps running if a service is down - Add --sources mode to ingest_corpus.py CLI; legacy positional args unchanged for backward compat - Add patterns/sources.yaml with all of Contributor2's discovered service log paths (qbit, 7 servarr services, nzbget, tautulli, jellyseerr) - Replace per-service volume mounts in podman-standalone.sh with /opt:/opt:ro + /var/log:/var/log:ro; adding a new source now requires only editing sources.yaml — no container restart
This commit is contained in:
parent
30729826a4
commit
f46aba8165
5 changed files with 274 additions and 43 deletions
|
|
@ -8,7 +8,9 @@ import sqlite3
|
|||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent
|
||||
import yaml
|
||||
|
||||
from app.ingest import caddy, docker_log, journald, plaintext, plex, qbittorrent, servarr
|
||||
from app.ingest.base import _compile, load_patterns, now_iso
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
from app.services.search import build_fts_index
|
||||
|
|
@ -95,6 +97,8 @@ def _detect_format(first_line: str) -> str:
|
|||
return "plex"
|
||||
if qbittorrent.is_qbit_log(first_line):
|
||||
return "qbittorrent"
|
||||
if servarr.is_servarr_log(first_line):
|
||||
return "servarr"
|
||||
return "plaintext"
|
||||
|
||||
|
||||
|
|
@ -102,8 +106,9 @@ def _parse_file(
|
|||
path: Path,
|
||||
compiled: list[tuple[LogPattern, object]],
|
||||
ingest_time: str,
|
||||
source_id: str | None = None,
|
||||
) -> Iterator[RetrievedEntry]:
|
||||
source_id = path.stem
|
||||
source_id = source_id or path.stem
|
||||
|
||||
with path.open("r", errors="replace") as f:
|
||||
lines = iter(f)
|
||||
|
|
@ -129,6 +134,8 @@ def _parse_file(
|
|||
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
elif fmt == "qbittorrent":
|
||||
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
elif fmt == "servarr":
|
||||
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
else:
|
||||
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
||||
|
||||
|
|
@ -159,11 +166,13 @@ def _ingest_files(
|
|||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
source_id_map: dict[Path, str] | None = None,
|
||||
) -> dict[str, int]:
|
||||
pattern_file = pattern_file or Path("patterns/default.yaml")
|
||||
patterns = load_patterns(pattern_file)
|
||||
compiled = _compile(patterns)
|
||||
ingest_time = now_iso()
|
||||
source_id_map = source_id_map or {}
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
|
@ -173,9 +182,10 @@ def _ingest_files(
|
|||
stats: dict[str, int] = {}
|
||||
|
||||
for log_file in files:
|
||||
source_id = source_id_map.get(log_file, log_file.stem)
|
||||
count = 0
|
||||
batch: list[RetrievedEntry] = []
|
||||
for entry in _parse_file(log_file, compiled, ingest_time):
|
||||
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
||||
batch.append(entry)
|
||||
if len(batch) >= batch_size:
|
||||
_write_batch(conn, batch)
|
||||
|
|
@ -186,8 +196,8 @@ def _ingest_files(
|
|||
_write_batch(conn, batch)
|
||||
conn.commit()
|
||||
count += len(batch)
|
||||
stats[log_file.name] = count
|
||||
logger.info("Ingested %d entries from %s", count, log_file.name)
|
||||
stats[source_id] = stats.get(source_id, 0) + count
|
||||
logger.info("Ingested %d entries from %s (source: %s)", count, log_file.name, source_id)
|
||||
|
||||
conn.close()
|
||||
|
||||
|
|
@ -216,3 +226,43 @@ def ingest_file(
|
|||
) -> dict[str, int]:
|
||||
"""Ingest a single log file (any supported format)."""
|
||||
return _ingest_files([log_file], db_path, pattern_file)
|
||||
|
||||
|
||||
def ingest_sources(
|
||||
sources_file: Path,
|
||||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
) -> dict[str, int]:
|
||||
"""Ingest all sources listed in a sources.yaml config file.
|
||||
|
||||
sources.yaml format:
|
||||
sources:
|
||||
- id: sonarr
|
||||
path: /opt/sonarr/config/logs/sonarr.0.txt
|
||||
- id: qbittorrent
|
||||
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
|
||||
|
||||
Missing paths are skipped with a warning so the cron keeps running
|
||||
when a service is temporarily down.
|
||||
"""
|
||||
with open(sources_file) as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
files: list[Path] = []
|
||||
source_id_map: dict[Path, str] = {}
|
||||
|
||||
for src in config.get("sources", []):
|
||||
path = Path(src["path"])
|
||||
if not path.exists():
|
||||
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
|
||||
continue
|
||||
files.append(path)
|
||||
if "id" in src:
|
||||
source_id_map[path] = src["id"]
|
||||
|
||||
if not files:
|
||||
logger.warning("No source files found — check sources.yaml paths")
|
||||
return {}
|
||||
|
||||
return _ingest_files(files, db_path, pattern_file, batch_size, source_id_map)
|
||||
|
|
|
|||
99
app/ingest/servarr.py
Normal file
99
app/ingest/servarr.py
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
"""Servarr (*arr) log parser.
|
||||
|
||||
Handles the pipe-delimited format used by Sonarr, Radarr, Lidarr,
|
||||
Prowlarr, Readarr, Whisparr, and Bazarr:
|
||||
|
||||
2026-05-11 02:31:51.5|Info|ComponentName|Message text
|
||||
2024-05-09 00:02:25|INFO |root |Message text
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from typing import Iterator
|
||||
|
||||
from app.ingest.base import (
|
||||
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
|
||||
)
|
||||
from app.services.models import LogPattern, RetrievedEntry
|
||||
|
||||
_LINE_RE = re.compile(
|
||||
r"^(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?)"
|
||||
r"\|(?P<level>[^|]+)"
|
||||
r"\|(?P<component>[^|]*)"
|
||||
r"\|(?P<msg>.*)$"
|
||||
)
|
||||
|
||||
_LEVEL_MAP: dict[str, str | None] = {
|
||||
"trace": None,
|
||||
"debug": "DEBUG",
|
||||
"info": "INFO",
|
||||
"warn": "WARN",
|
||||
"warning": "WARN",
|
||||
"error": "ERROR",
|
||||
"fatal": "CRITICAL",
|
||||
}
|
||||
|
||||
|
||||
def _parse_ts(ts_str: str) -> tuple[str, str]:
|
||||
base = ts_str.split(".")[0]
|
||||
try:
|
||||
dt = datetime.strptime(base, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
||||
return ts_str, dt.isoformat()
|
||||
except ValueError:
|
||||
return ts_str, ""
|
||||
|
||||
|
||||
def is_servarr_log(first_line: str) -> bool:
|
||||
return bool(_LINE_RE.match(first_line.strip()))
|
||||
|
||||
|
||||
def parse(
|
||||
lines: Iterator[str],
|
||||
source_id: str,
|
||||
compiled_patterns: list[tuple[LogPattern, object]],
|
||||
ingest_time: str | None = None,
|
||||
) -> Iterator[RetrievedEntry]:
|
||||
ingest_time = ingest_time or now_iso()
|
||||
state = SourceState()
|
||||
pending_text: str | None = None
|
||||
pending_meta: dict = {}
|
||||
|
||||
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
||||
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
||||
matched = apply_patterns(text, compiled_patterns)
|
||||
return RetrievedEntry(
|
||||
entry_id=make_entry_id(source_id, state.sequence, text),
|
||||
source_id=source_id,
|
||||
sequence=state.sequence,
|
||||
timestamp_raw=meta.get("ts_raw", ""),
|
||||
timestamp_iso=meta.get("ts_iso", ""),
|
||||
ingest_time=ingest_time,
|
||||
severity=meta.get("severity"),
|
||||
repeat_count=repeat,
|
||||
out_of_order=out_of_order,
|
||||
matched_patterns=matched,
|
||||
text=text,
|
||||
)
|
||||
|
||||
for raw_line in lines:
|
||||
line = raw_line.rstrip("\n")
|
||||
m = _LINE_RE.match(line)
|
||||
if m:
|
||||
if pending_text is not None:
|
||||
yield _emit(pending_text, pending_meta)
|
||||
|
||||
ts_raw, ts_iso = _parse_ts(m.group("ts"))
|
||||
level_key = m.group("level").strip().lower()
|
||||
severity = _LEVEL_MAP.get(level_key, detect_severity(m.group("msg")))
|
||||
component = m.group("component").strip()
|
||||
msg = m.group("msg")
|
||||
# Prepend component so it's searchable without needing a separate column
|
||||
text = f"[{component}] {msg}" if component else msg
|
||||
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
|
||||
pending_text = text
|
||||
elif pending_text is not None:
|
||||
pending_text += "\n" + line.strip()
|
||||
|
||||
if pending_text is not None:
|
||||
yield _emit(pending_text, pending_meta)
|
||||
46
patterns/sources.yaml
Normal file
46
patterns/sources.yaml
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
# Turnstone log sources — edit this file to add or remove services.
|
||||
# Run ingest manually:
|
||||
# sudo podman exec turnstone python scripts/ingest_corpus.py \
|
||||
# --sources /patterns/sources.yaml --db /data/turnstone.db
|
||||
#
|
||||
# Paths here are container-side paths under the /opt bind mount.
|
||||
# Missing paths are skipped with a warning — safe to leave entries for
|
||||
# services that are temporarily down.
|
||||
|
||||
sources:
|
||||
# ── Download ─────────────────────────────────────────────────────────────
|
||||
- id: qbittorrent
|
||||
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
|
||||
|
||||
# ── Servarr stack ─────────────────────────────────────────────────────────
|
||||
- id: sonarr
|
||||
path: /opt/sonarr/config/logs/sonarr.0.txt
|
||||
|
||||
- id: radarr
|
||||
path: /opt/radarr/config/logs/radarr.0.txt
|
||||
|
||||
- id: lidarr
|
||||
path: /opt/lidarr/config/logs/Lidarr.0.txt
|
||||
|
||||
- id: readarr
|
||||
path: /opt/readarr/config/logs/readarr.0.txt
|
||||
|
||||
- id: whisparr
|
||||
path: /opt/whisparr/config/logs/whisparr.0.txt
|
||||
|
||||
- id: prowlarr
|
||||
path: /opt/prowlarr/config/logs/prowlarr.0.txt
|
||||
|
||||
- id: bazarr
|
||||
path: /opt/bazarr/config/log/bazarr.log
|
||||
|
||||
# ── Usenet ────────────────────────────────────────────────────────────────
|
||||
- id: nzbget
|
||||
path: /opt/nzbget/config/nzbget.log
|
||||
|
||||
# ── Media / Requests ─────────────────────────────────────────────────────
|
||||
- id: tautulli
|
||||
path: /opt/tautulli/config/logs/tautulli.log
|
||||
|
||||
- id: jellyseerr
|
||||
path: /opt/jellyseerr/config/logs/jellyseerr.log
|
||||
|
|
@ -13,10 +13,11 @@
|
|||
# 2. Build the image (requires Docker or Podman with BuildKit/multi-stage support):
|
||||
# cd /opt/turnstone && podman build -t localhost/turnstone:latest .
|
||||
#
|
||||
# 3. Create data and custom patterns directories:
|
||||
# 3. Create data and patterns directories, then copy config files:
|
||||
# mkdir -p /opt/turnstone/{data,patterns}
|
||||
# # Optionally copy default patterns as a starting point:
|
||||
# cp /opt/turnstone/patterns/default.yaml /opt/turnstone/patterns/
|
||||
# cp /opt/turnstone/patterns/sources.yaml /opt/turnstone/patterns/
|
||||
# # Edit sources.yaml if any paths differ on this host.
|
||||
#
|
||||
# 4. Run this script:
|
||||
# bash /opt/turnstone/podman-standalone.sh
|
||||
|
|
@ -28,15 +29,19 @@
|
|||
# sudo systemctl enable --now turnstone
|
||||
#
|
||||
# ── Ingesting logs ────────────────────────────────────────────────────────────
|
||||
# Log files on the host are bind-mounted read-only under /logs/ in the
|
||||
# container. To ingest (run manually or via cron):
|
||||
# All service logs under /opt are accessible inside the container.
|
||||
# Sources are configured in patterns/sources.yaml (bind-mounted at /patterns/).
|
||||
#
|
||||
# podman exec turnstone python scripts/ingest_corpus.py \
|
||||
# /logs/qbittorrent/qbittorrent.log /data/turnstone.db
|
||||
# To ingest all sources (run manually or via cron):
|
||||
#
|
||||
# Example cron (every 15 minutes):
|
||||
# sudo podman exec turnstone python scripts/ingest_corpus.py \
|
||||
# --sources /patterns/sources.yaml --db /data/turnstone.db
|
||||
#
|
||||
# Example cron (every 15 minutes, add to root's crontab with: sudo crontab -e):
|
||||
# */15 * * * * podman exec turnstone python scripts/ingest_corpus.py \
|
||||
# /logs/qbittorrent/qbittorrent.log /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1
|
||||
# --sources /patterns/sources.yaml --db /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1
|
||||
#
|
||||
# To add a new log source: edit /opt/turnstone/patterns/sources.yaml — no restart needed.
|
||||
#
|
||||
# ── Adding Caddy reverse proxy ────────────────────────────────────────────────
|
||||
# Add to /etc/caddy/Caddyfile:
|
||||
|
|
@ -68,17 +73,15 @@ TZ=America/Los_Angeles
|
|||
#
|
||||
# TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed.
|
||||
|
||||
# ── Log source bind mounts ────────────────────────────────────────────────────
|
||||
# Add or remove mount flags below for each service whose logs you want to ingest.
|
||||
# Inside the container, paths appear under /logs/<service>/
|
||||
#
|
||||
QBIT_LOGS=/opt/qbittorrent/config/data/logs
|
||||
|
||||
# ── Turnstone container ───────────────────────────────────────────────────────
|
||||
# Image is built locally — no registry auto-update label.
|
||||
# To update: sudo podman build -t localhost/turnstone:latest /opt/turnstone
|
||||
# sudo podman restart turnstone
|
||||
#
|
||||
# /opt is mounted read-only so all service logs under /opt/*/config/logs/ are
|
||||
# accessible without per-service mounts. Add new sources to patterns/sources.yaml
|
||||
# — no container restart needed.
|
||||
#
|
||||
# Must be run as root (sudo bash podman-standalone.sh) — rootful Podman only.
|
||||
#
|
||||
# Remove existing container if present (safe re-run)
|
||||
|
|
@ -90,7 +93,8 @@ podman run -d \
|
|||
--net=host \
|
||||
-v "${DATA_DIR}:/data:Z" \
|
||||
-v "${PATTERNS_DIR}:/patterns:Z" \
|
||||
-v "${QBIT_LOGS}:/logs/qbittorrent:ro" \
|
||||
-v /opt:/opt:ro \
|
||||
-v /var/log:/var/log:ro \
|
||||
-e TURNSTONE_DB=/data/turnstone.db \
|
||||
-e TURNSTONE_SOURCE_HOST="$(hostname)" \
|
||||
-e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \
|
||||
|
|
@ -117,6 +121,8 @@ echo " | sudo tee /etc/systemd/system/turnstone.service"
|
|||
echo " sudo systemctl daemon-reload"
|
||||
echo " sudo systemctl enable --now turnstone"
|
||||
echo ""
|
||||
echo "To ingest qBittorrent logs now:"
|
||||
echo "To ingest all sources now:"
|
||||
echo " sudo podman exec turnstone python scripts/ingest_corpus.py \\"
|
||||
echo " /logs/qbittorrent/qbittorrent.log /data/turnstone.db"
|
||||
echo " --sources /patterns/sources.yaml --db /data/turnstone.db"
|
||||
echo ""
|
||||
echo "To add a new source: edit /opt/turnstone/patterns/sources.yaml — no restart needed."
|
||||
|
|
|
|||
|
|
@ -1,4 +1,12 @@
|
|||
"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database."""
|
||||
"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database.
|
||||
|
||||
Usage:
|
||||
# Single file or directory (legacy)
|
||||
python scripts/ingest_corpus.py <file_or_dir> [db_path]
|
||||
|
||||
# Sources config (multi-service)
|
||||
python scripts/ingest_corpus.py --sources <sources.yaml> [--db <db_path>]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
|
@ -9,19 +17,45 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from app.ingest.pipeline import ingest, ingest_file
|
||||
from app.ingest.pipeline import ingest, ingest_file, ingest_sources
|
||||
|
||||
|
||||
def _print_stats(stats: dict[str, int]) -> None:
|
||||
total = sum(stats.values())
|
||||
for source, count in sorted(stats.items()):
|
||||
print(f" {source}: {count:,}")
|
||||
print(f" TOTAL: {total:,} entries")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: ingest_corpus.py <file_or_dir> [db_path]", file=sys.stderr)
|
||||
args = sys.argv[1:]
|
||||
|
||||
if not args:
|
||||
print(
|
||||
"Usage:\n"
|
||||
" ingest_corpus.py <file_or_dir> [db_path]\n"
|
||||
" ingest_corpus.py --sources <sources.yaml> [--db <db_path>]",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
target = Path(sys.argv[1])
|
||||
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db")
|
||||
if args[0] == "--sources":
|
||||
if len(args) < 2:
|
||||
print("Usage: ingest_corpus.py --sources <sources.yaml> [--db <db_path>]", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
sources_file = Path(args[1])
|
||||
db_path = Path("data/turnstone.db")
|
||||
if "--db" in args:
|
||||
db_path = Path(args[args.index("--db") + 1])
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
print(f"Ingesting sources from {sources_file} → {db_path}")
|
||||
stats = ingest_sources(sources_file, db_path)
|
||||
_print_stats(stats)
|
||||
else:
|
||||
target = Path(args[0])
|
||||
db_path = Path(args[1]) if len(args) > 1 else Path("data/turnstone.db")
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"Ingesting {target} → {db_path}")
|
||||
|
||||
if target.is_file():
|
||||
stats = ingest_file(target, db_path)
|
||||
elif target.is_dir():
|
||||
|
|
@ -29,8 +63,4 @@ if __name__ == "__main__":
|
|||
else:
|
||||
print(f"Error: {target} is not a file or directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
total = sum(stats.values())
|
||||
for fname, count in sorted(stats.items()):
|
||||
print(f" {fname}: {count:,}")
|
||||
print(f" TOTAL: {total:,} entries")
|
||||
_print_stats(stats)
|
||||
|
|
|
|||
Loading…
Reference in a new issue