diff --git a/.env.example b/.env.example index 483790b..816b47d 100644 --- a/.env.example +++ b/.env.example @@ -23,6 +23,6 @@ # Remote endpoint to push diagnostic bundles for escalation. # TURNSTONE_BUNDLE_ENDPOINT=https://example.com/api/bundles -# --- Periodic batch ingest --- -# Seconds between automatic ingest runs from sources.yaml. Set to 0 to disable. -# TURNSTONE_INGEST_INTERVAL=900 +# --- Periodic batch glean --- +# Seconds between automatic glean runs from sources.yaml. Set to 0 to disable. +# TURNSTONE_GLEAN_INTERVAL=900 diff --git a/README.md b/README.md index a36ae60..9c8086d 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,8 @@ Service logs (journald, Docker, syslog, Caddy, Plex, arr stack, qBittorrent, dme ## Features -- **Multi-source ingest** — journald, Docker, syslog, Caddy, dmesg, Plex, Servarr (arr stack), qBittorrent, plaintext; paths configured in `patterns/sources.yaml` -- **Pattern tagging** — named regex patterns applied at ingest time (`service_restart`, `auth_failure`, `oom`, `segfault`, `disk_full`, `timeout`, …); extend in `patterns/default.yaml` +- **Multi-source glean** — journald, Docker, syslog, Caddy, dmesg, Plex, Servarr (arr stack), qBittorrent, plaintext; paths configured in `patterns/sources.yaml` +- **Pattern tagging** — named regex patterns applied at glean time (`service_restart`, `auth_failure`, `oom`, `segfault`, `disk_full`, `timeout`, …); extend in `patterns/default.yaml` - **Full-text search** — SQLite FTS5 index across all ingested entries; filter by source, severity, time window - **Natural-language time queries** — "what happened yesterday morning", "show me errors from the last 3 hours"; powered by dateparser - **Incident management** — create, label, and track incidents; attach supporting log entries @@ -101,13 +101,13 @@ sources: path: /var/log/caddy/access.log ``` -For `journald` sources, run `scripts/export_journal.sh` on the host before each ingest (e.g. via cron). Missing paths are skipped with a warning — safe to leave entries for services that are temporarily down. +For `journald` sources, run `scripts/export_journal.sh` on the host before each glean (e.g. via cron). Missing paths are skipped with a warning — safe to leave entries for services that are temporarily down. --- ## Pattern library -Named patterns in `patterns/default.yaml` are matched against every log entry at ingest time. Matched pattern names are stored and used to boost search relevance for diagnostic queries. +Named patterns in `patterns/default.yaml` are matched against every log entry at glean time. Matched pattern names are stored and used to boost search relevance for diagnostic queries. ```yaml patterns: @@ -157,7 +157,7 @@ Copy `.env.example` to `.env` (or pass as `-e` flags to Docker/Podman). All vari | `TURNSTONE_PATTERNS` | `./patterns` | Pattern directory (default.yaml, sources.yaml, watch.yaml). | | `TURNSTONE_SOURCE_HOST` | `unknown` | Host identifier stamped on ingested entries. | | `TURNSTONE_BUNDLE_ENDPOINT` | — | Remote URL to push diagnostic bundles for escalation. | -| `TURNSTONE_INGEST_INTERVAL` | `900` | Seconds between automatic batch ingest runs. Set to `0` to disable. | +| `TURNSTONE_GLEAN_INTERVAL` | `900` | Seconds between automatic batch glean runs. Set to `0` to disable. | --- diff --git a/app/ingest/__init__.py b/app/glean/__init__.py similarity index 100% rename from app/ingest/__init__.py rename to app/glean/__init__.py diff --git a/app/ingest/base.py b/app/glean/base.py similarity index 100% rename from app/ingest/base.py rename to app/glean/base.py diff --git a/app/ingest/caddy.py b/app/glean/caddy.py similarity index 98% rename from app/ingest/caddy.py rename to app/glean/caddy.py index 0cf2319..a7fb494 100644 --- a/app/ingest/caddy.py +++ b/app/glean/caddy.py @@ -4,7 +4,7 @@ from __future__ import annotations import json from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, epoch_float_to_iso, make_entry_id, now_iso, ) diff --git a/app/ingest/dmesg_log.py b/app/glean/dmesg_log.py similarity index 99% rename from app/ingest/dmesg_log.py rename to app/glean/dmesg_log.py index 84058aa..f5e2e11 100644 --- a/app/ingest/dmesg_log.py +++ b/app/glean/dmesg_log.py @@ -18,7 +18,7 @@ import re from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/ingest/doc_upload.py b/app/glean/doc_upload.py similarity index 93% rename from app/ingest/doc_upload.py rename to app/glean/doc_upload.py index 98bb8d7..bc4a3b7 100644 --- a/app/ingest/doc_upload.py +++ b/app/glean/doc_upload.py @@ -10,7 +10,7 @@ from app.context.chunker import process_upload from app.context.store import add_document, add_fact -def ingest_upload(db_path: Path, filename: str, content: bytes) -> dict[str, Any]: +def glean_upload(db_path: Path, filename: str, content: bytes) -> dict[str, Any]: """Process an uploaded file and write to context store. Returns result summary.""" doc_type, facts, chunks = process_upload(filename, content) diff --git a/app/ingest/docker_log.py b/app/glean/docker_log.py similarity index 98% rename from app/ingest/docker_log.py rename to app/glean/docker_log.py index c383571..18f6966 100644 --- a/app/ingest/docker_log.py +++ b/app/glean/docker_log.py @@ -4,7 +4,7 @@ from __future__ import annotations import json from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) diff --git a/app/ingest/journald.py b/app/glean/journald.py similarity index 98% rename from app/ingest/journald.py rename to app/glean/journald.py index 220e9c8..cc03d22 100644 --- a/app/ingest/journald.py +++ b/app/glean/journald.py @@ -4,7 +4,7 @@ from __future__ import annotations import json from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, epoch_micros_to_iso, make_entry_id, now_iso, SYSLOG_PRIORITY, ) diff --git a/app/ingest/mqtt_subscriber.py b/app/glean/mqtt_subscriber.py similarity index 97% rename from app/ingest/mqtt_subscriber.py rename to app/glean/mqtt_subscriber.py index 1f00da4..efa514d 100644 --- a/app/ingest/mqtt_subscriber.py +++ b/app/glean/mqtt_subscriber.py @@ -1,10 +1,10 @@ -"""Live MQTT ingest subscriber for Turnstone. +"""Live MQTT glean subscriber for Turnstone. Reads ``type: mqtt`` entries from sources.yaml and subscribes to each broker in the background. Incoming messages are normalized to RetrievedEntry and written to the Turnstone SQLite database as they arrive. -This runs as an asyncio task alongside the batch ingest scheduler. It is +This runs as an asyncio task alongside the batch glean scheduler. It is started from the FastAPI lifespan in rest.py. MQTT source config format in sources.yaml:: diff --git a/app/ingest/pipeline.py b/app/glean/pipeline.py similarity index 54% rename from app/ingest/pipeline.py rename to app/glean/pipeline.py index f912be9..f842e28 100644 --- a/app/ingest/pipeline.py +++ b/app/glean/pipeline.py @@ -1,4 +1,4 @@ -"""Ingest pipeline: auto-detect format, parse, write to SQLite.""" +"""Glean pipeline: auto-detect format, parse, write to SQLite.""" from __future__ import annotations import json @@ -10,8 +10,17 @@ from typing import Iterator import yaml -from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh -from app.ingest.base import _compile, load_patterns, now_iso +from app.glean import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh +from app.glean.base import _compile, load_patterns, now_iso +from app.glean.ssh import ( + SSHTransport, + SSHConnectionError, + SSHCommandError, + _build_docker_command, + _build_journald_command, + _build_plaintext_command, + _build_syslog_command, +) from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index @@ -221,7 +230,7 @@ def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: ) -def _ingest_files( +def _glean_files( files: list[Path], db_path: Path, pattern_file: Path | None = None, @@ -257,7 +266,7 @@ def _ingest_files( conn.commit() count += len(batch) stats[source_id] = stats.get(source_id, 0) + count - logger.info("Ingested %d entries from %s (source: %s)", count, log_file.name, source_id) + logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id) conn.close() @@ -268,51 +277,192 @@ def _ingest_files( return stats -def ingest( +def _stream_and_write( + transport: SSHTransport, + cmd: str, + parser, + source_id: str, + compiled: list[tuple[LogPattern, object]], + ingest_time: str, + conn: sqlite3.Connection, + batch_size: int, +) -> int: + """Stream *cmd* output through *parser* and write entries to *conn*. + + Catches SSHCommandError per-item so one bad command doesn't abort the rest + of the glean items for this host. Returns the number of entries written. + """ + count = 0 + batch: list[RetrievedEntry] = [] + try: + for entry in parser(transport.exec_stream(cmd), source_id, compiled, ingest_time): + batch.append(entry) + if len(batch) >= batch_size: + _write_batch(conn, batch) + conn.commit() + count += len(batch) + batch.clear() + if batch: + _write_batch(conn, batch) + conn.commit() + count += len(batch) + except SSHCommandError as exc: + logger.warning("SSH command failed for source %r (cmd: %s): %s", source_id, cmd, exc) + logger.info("Gleaned %d entries from SSH source %s", count, source_id) + return count + + +def _glean_ssh_source( + src: dict, # type: ignore[type-arg] + compiled: list[tuple[LogPattern, object]], + ingest_time: str, + conn: sqlite3.Connection, + batch_size: int, +) -> dict[str, int]: + """Open one SSHTransport connection for *src* and glean all its glean items. + + One SSH connection is shared across all items in the ``glean:`` list so + the handshake overhead is paid only once per host per glean run. + + Returns a stats dict mapping ``{source_id: entry_count}`` for each item. + Gracefully skips the entire source on SSHConnectionError. + """ + host_id = src.get("id", src.get("host", "unknown")) + host = src["host"] + user = src["user"] + key_path = str(Path(src["key_path"]).expanduser()) + port = int(src.get("port", 22)) + glean_items: list[dict] = src.get("glean", []) # type: ignore[type-arg] + + stats: dict[str, int] = {} + + try: + with SSHTransport(host=host, user=user, key_path=key_path, port=port) as t: + for item in glean_items: + item_type = item.get("type", "plaintext") + # Per-item source_id — falls back to host_id/type for un-labelled items + item_id = item.get("id") or f"{host_id}/{item_type}" + + if item_type == "journald": + cmd = _build_journald_command(item) + count = _stream_and_write( + t, cmd, journald.parse, item_id, compiled, ingest_time, conn, batch_size + ) + stats[item_id] = stats.get(item_id, 0) + count + + elif item_type == "syslog": + cmd = _build_syslog_command(item) + count = _stream_and_write( + t, cmd, syslog.parse, item_id, compiled, ingest_time, conn, batch_size + ) + stats[item_id] = stats.get(item_id, 0) + count + + elif item_type == "plaintext": + cmd = _build_plaintext_command(item) + count = _stream_and_write( + t, cmd, plaintext.parse, item_id, compiled, ingest_time, conn, batch_size + ) + stats[item_id] = stats.get(item_id, 0) + count + + elif item_type == "docker": + cmds = _build_docker_command(item) + if isinstance(cmds, str): + cmds = [cmds] + containers: list[str] = item.get("containers", []) + for i, cmd in enumerate(cmds): + # Use the container name as the final path segment when available + container_name = containers[i] if i < len(containers) else str(i) + container_id = f"{item_id}/{container_name}" if len(cmds) > 1 else item_id + count = _stream_and_write( + t, cmd, docker_log.parse, container_id, + compiled, ingest_time, conn, batch_size, + ) + stats[container_id] = stats.get(container_id, 0) + count + + else: + logger.warning( + "Unknown SSH glean type %r for source %r — skipping item", + item_type, host_id, + ) + + except SSHConnectionError as exc: + logger.warning("SSH connection failed for source %r: %s", host_id, exc) + + return stats + + +def glean_dir( corpus_dir: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: - """Ingest all .jsonl and .log files from a corpus directory.""" + """Glean all .jsonl and .log files from a corpus directory.""" files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log")) - return _ingest_files(files, db_path, pattern_file, batch_size) + return _glean_files(files, db_path, pattern_file, batch_size) -def ingest_file( +def glean_file( log_file: Path, db_path: Path, pattern_file: Path | None = None, ) -> dict[str, int]: - """Ingest a single log file (any supported format).""" - return _ingest_files([log_file], db_path, pattern_file) + """Glean a single log file (any supported format).""" + return _glean_files([log_file], db_path, pattern_file) -def ingest_sources( +def glean_sources( sources_file: Path, db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, ) -> dict[str, int]: - """Ingest all sources listed in a sources.yaml config file. + """Glean all sources listed in a sources.yaml config file. - sources.yaml format: + Supports two source types: + + Local file sources (default): sources: - id: sonarr path: /opt/sonarr/config/logs/sonarr.0.txt - - id: qbittorrent - path: /opt/qbittorrent/config/data/logs/qbittorrent.log - Missing paths are skipped with a warning so the cron keeps running - when a service is temporarily down. + SSH remote sources (transport: ssh): + sources: + - id: rack01 + transport: ssh + host: 192.168.1.10 + user: admin + key_path: ~/.ssh/id_ed25519 + glean: + - type: journald + args: ["--since", "2 hours ago"] + - type: syslog + path: /var/log/syslog + - type: plaintext + path: /var/log/app/error.log + - type: docker + containers: [myapp, nginx] + + Missing local paths and SSH connection failures are logged as warnings + so the cron keeps running when a source is temporarily down. """ with open(sources_file) as f: config = yaml.safe_load(f) + local_sources: list[dict] = [] # type: ignore[type-arg] + ssh_sources: list[dict] = [] # type: ignore[type-arg] + + for src in config.get("sources", []): + if src.get("transport") == "ssh": + ssh_sources.append(src) + else: + local_sources.append(src) + + # ── Local file sources ───────────────────────────────────────────────── files: list[Path] = [] source_id_map: dict[Path, str] = {} - for src in config.get("sources", []): + for src in local_sources: path = Path(src["path"]) if not path.exists(): logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path) @@ -321,8 +471,40 @@ def ingest_sources( if "id" in src: source_id_map[path] = src["id"] - if not files: - logger.warning("No source files found — check sources.yaml paths") + if not files and not ssh_sources: + logger.warning("No sources found — check sources.yaml paths") return {} - return _ingest_files(files, db_path, pattern_file, batch_size, source_id_map) + stats: dict[str, int] = {} + if files: + stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map)) + + # ── SSH remote sources ───────────────────────────────────────────────── + if not ssh_sources: + return stats + + # Compile patterns once, share across all SSH sources in this run. + effective_pattern_file = pattern_file or Path("patterns/default.yaml") + compiled = _compile(load_patterns(effective_pattern_file)) + ingest_time = now_iso() + + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(_SCHEMA) + conn.commit() + + try: + for src in ssh_sources: + ssh_stats = _glean_ssh_source(src, compiled, ingest_time, conn, batch_size) + for k, v in ssh_stats.items(): + stats[k] = stats.get(k, 0) + v + finally: + conn.close() + + # Rebuild FTS only when SSH sources added entries (_glean_files already + # rebuilds when local sources are present; safe to call again if both ran). + if ssh_sources: + logger.info("Rebuilding FTS index after SSH glean...") + build_fts_index(db_path) + + return stats diff --git a/app/ingest/plaintext.py b/app/glean/plaintext.py similarity index 98% rename from app/ingest/plaintext.py rename to app/glean/plaintext.py index 1bb83d7..a205fc0 100644 --- a/app/ingest/plaintext.py +++ b/app/glean/plaintext.py @@ -10,7 +10,7 @@ import re from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/ingest/plex.py b/app/glean/plex.py similarity index 99% rename from app/ingest/plex.py rename to app/glean/plex.py index 89d7232..5a9ec45 100644 --- a/app/ingest/plex.py +++ b/app/glean/plex.py @@ -12,7 +12,7 @@ import re from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/ingest/qbittorrent.py b/app/glean/qbittorrent.py similarity index 99% rename from app/ingest/qbittorrent.py rename to app/glean/qbittorrent.py index 404c84c..642c419 100644 --- a/app/ingest/qbittorrent.py +++ b/app/glean/qbittorrent.py @@ -18,7 +18,7 @@ import re from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/ingest/servarr.py b/app/glean/servarr.py similarity index 99% rename from app/ingest/servarr.py rename to app/glean/servarr.py index b59471e..bd494c2 100644 --- a/app/ingest/servarr.py +++ b/app/glean/servarr.py @@ -12,7 +12,7 @@ import re from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/ingest/syslog.py b/app/glean/syslog.py similarity index 99% rename from app/ingest/syslog.py rename to app/glean/syslog.py index 81f38b1..341fdf4 100644 --- a/app/ingest/syslog.py +++ b/app/glean/syslog.py @@ -14,7 +14,7 @@ import re from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/ingest/tautulli.py b/app/glean/tautulli.py similarity index 99% rename from app/ingest/tautulli.py rename to app/glean/tautulli.py index dba0cf0..ed7578b 100644 --- a/app/ingest/tautulli.py +++ b/app/glean/tautulli.py @@ -5,7 +5,7 @@ Tautulli sends all template values as strings, so all fields are treated as str. """ from __future__ import annotations -from app.ingest.base import ( +from app.glean.base import ( apply_patterns, epoch_float_to_iso, make_entry_id, diff --git a/app/ingest/wazuh.py b/app/glean/wazuh.py similarity index 99% rename from app/ingest/wazuh.py rename to app/glean/wazuh.py index 49e808d..70b69a7 100644 --- a/app/ingest/wazuh.py +++ b/app/glean/wazuh.py @@ -22,7 +22,7 @@ import json from datetime import datetime, timezone from typing import Iterator -from app.ingest.base import ( +from app.glean.base import ( SourceState, apply_patterns, make_entry_id, now_iso, ) from app.services.models import LogPattern, RetrievedEntry diff --git a/app/mcp_server.py b/app/mcp_server.py index 4047f98..4c30cdf 100644 --- a/app/mcp_server.py +++ b/app/mcp_server.py @@ -94,7 +94,7 @@ def search_logs( severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG. source: Partial match on source_id. Format is 'corpus:host:service'. Example: 'xanderland:caddy' matches all Caddy entries from xanderland. - pattern: Filter by named pattern tag applied at ingest time. + pattern: Filter by named pattern tag applied at glean time. Known tags: auth_failure, connection_lost, oom, segfault, disk_full, timeout, caddy_tls_error, caddy_config_error, caddy_auth_error, caddy_upstream_error, service_restart, service_update, @@ -176,7 +176,7 @@ def list_log_sources() -> str: """ sources = list_sources(DB_PATH) if not sources: - return "No log sources found. Has the corpus been ingested? Run: python scripts/ingest_corpus.py" + return "No log sources found. Has the corpus been gleaned? Run: python scripts/glean_corpus.py" lines = [f"Corpus: {DB_PATH}", f"Sources ({len(sources)} total):\n"] for s in sources: @@ -192,7 +192,7 @@ def list_log_sources() -> str: if __name__ == "__main__": if not DB_PATH.exists(): logger.error("Database not found: %s", DB_PATH) - logger.error("Run: python scripts/ingest_corpus.py ") + logger.error("Run: python scripts/glean_corpus.py ") sys.exit(1) logger.info("Starting Turnstone MCP server (DB: %s)", DB_PATH) mcp.run() diff --git a/app/rest.py b/app/rest.py index 81601f2..ff47979 100644 --- a/app/rest.py +++ b/app/rest.py @@ -27,10 +27,10 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel -from app.ingest.pipeline import ensure_schema, ingest_file as _ingest_file -from app.ingest.base import load_compiled_patterns, now_iso -from app.ingest.tautulli import parse_webhook as _parse_tautulli -from app.ingest.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh +from app.glean.pipeline import ensure_schema, glean_file as _glean_file +from app.glean.base import load_compiled_patterns, now_iso +from app.glean.tautulli import parse_webhook as _parse_tautulli +from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh from app.services.blocklist import ( BlocklistCandidate, get_candidate, @@ -71,11 +71,11 @@ from app.context.store import ( delete_document as _delete_document, ) from app.context.retriever import retrieve_context as _retrieve_context, format_context_block -from app.ingest.doc_upload import ingest_upload as _ingest_upload +from app.glean.doc_upload import glean_upload as _glean_upload from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session from app.context.chunker import UnsupportedDocType, FileTooLarge -from app.tasks.ingest_scheduler import get_state as _ingest_state, run_once as _run_ingest, scheduler_loop as _scheduler_loop, submit_matched as _submit_matched -from app.ingest.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers +from app.tasks.glean_scheduler import get_state as _glean_state, run_once as _run_glean, scheduler_loop as _scheduler_loop, submit_matched as _submit_matched +from app.glean.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) PREFS_PATH = DB_PATH.parent / "preferences.json" @@ -84,7 +84,7 @@ SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) PATTERN_FILE = PATTERN_DIR / "default.yaml" -INGEST_INTERVAL = int(os.environ.get("TURNSTONE_INGEST_INTERVAL", "900")) +GLEAN_INTERVAL = int(os.environ.get("TURNSTONE_GLEAN_INTERVAL", "900")) SUBMIT_ENDPOINT = os.environ.get("TURNSTONE_SUBMIT_ENDPOINT", "").rstrip("/") # GPU inference server URL. @@ -119,14 +119,14 @@ async def _lifespan(app: FastAPI): sources_file = PATTERN_DIR / "sources.yaml" _scheduler_task: asyncio.Task | None = None - if INGEST_INTERVAL > 0 and sources_file.exists(): + if GLEAN_INTERVAL > 0 and sources_file.exists(): _scheduler_task = asyncio.create_task( _scheduler_loop( - sources_file, DB_PATH, PATTERN_FILE, INGEST_INTERVAL, + sources_file, DB_PATH, PATTERN_FILE, GLEAN_INTERVAL, submit_endpoint=SUBMIT_ENDPOINT or None, source_host=SOURCE_HOST, ), - name="ingest-scheduler", + name="glean-scheduler", ) _mqtt_task: asyncio.Task | None = None @@ -448,9 +448,9 @@ def delete_source(source_id: str) -> dict: return {"deleted": deleted, "source_id": source_id} -@router.post("/api/sources/{source_id}/ingest") -def reingest_source(source_id: str, background_tasks: BackgroundTasks) -> dict: - """Trigger a re-ingest for a configured source from sources.yaml.""" +@router.post("/api/sources/{source_id}/glean") +def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict: + """Trigger a re-glean for a configured source from sources.yaml.""" sources_file = PATTERN_DIR / "sources.yaml" if not sources_file.exists(): raise HTTPException(status_code=404, detail="sources.yaml not found") @@ -462,18 +462,18 @@ def reingest_source(source_id: str, background_tasks: BackgroundTasks) -> dict: src_path = Path(matching[0]["path"]) if not src_path.exists(): raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}") - stats = _ingest_file(src_path, DB_PATH, PATTERN_FILE) + stats = _glean_file(src_path, DB_PATH, PATTERN_FILE) background_tasks.add_task(build_fts_index, DB_PATH) - return {"source_id": source_id, "ingested": stats.get(source_id, sum(stats.values()))} + return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))} -@router.post("/api/ingest/upload") -async def ingest_upload( +@router.post("/api/glean/upload") +async def glean_upload( file: UploadFile, source_id: Annotated[str | None, Query(description="Override source ID (defaults to filename)")] = None, background_tasks: BackgroundTasks = None, ) -> dict: - """Accept a multipart log file, auto-detect format, ingest into DB.""" + """Accept a multipart log file, auto-detect format, glean into DB.""" sid = source_id or Path(file.filename or "upload").stem content = await file.read() with tempfile.NamedTemporaryFile( @@ -483,13 +483,13 @@ async def ingest_upload( tmp.write(content) tmp_path = Path(tmp.name) try: - stats = _ingest_file(tmp_path, DB_PATH, PATTERN_FILE) + stats = _glean_file(tmp_path, DB_PATH, PATTERN_FILE) finally: tmp_path.unlink(missing_ok=True) if background_tasks is not None: background_tasks.add_task(build_fts_index, DB_PATH) total = sum(stats.values()) - return {"source_id": sid, "ingested": total, "stats": stats} + return {"source_id": sid, "gleaned": total, "stats": stats} class BatchEntry(BaseModel): @@ -506,20 +506,20 @@ class BatchEntry(BaseModel): text: str -class BatchIngestRequest(BaseModel): +class BatchGleanRequest(BaseModel): source_host: str = "unknown" entries: list[BatchEntry] -@router.post("/api/ingest/batch") -def ingest_batch(payload: BatchIngestRequest, background_tasks: BackgroundTasks) -> dict: +@router.post("/api/glean/batch") +def glean_batch(payload: BatchGleanRequest, background_tasks: BackgroundTasks) -> dict: """Accept pre-parsed log entries from a remote Turnstone instance (submission protocol). Used by nodes with TURNSTONE_SUBMIT_ENDPOINT configured to push their pattern-matched entries to a central receiving instance. """ if not payload.entries: - return {"ingested": 0} + return {"gleaned": 0} conn = sqlite3.connect(str(DB_PATH)) conn.execute("PRAGMA journal_mode=WAL") conn.executemany( @@ -550,13 +550,13 @@ def ingest_batch(payload: BatchIngestRequest, background_tasks: BackgroundTasks) conn.commit() conn.close() background_tasks.add_task(build_fts_index, DB_PATH) - return {"ingested": len(payload.entries), "source_host": payload.source_host} + return {"gleaned": len(payload.entries), "source_host": payload.source_host} -@router.get("/api/tasks/ingest/status") -def ingest_task_status() -> dict: - """Return the current state of the periodic batch ingest scheduler.""" - s = _ingest_state() +@router.get("/api/tasks/glean/status") +def glean_task_status() -> dict: + """Return the current state of the periodic glean scheduler.""" + s = _glean_state() return { "running": s.running, "run_count": s.run_count, @@ -565,8 +565,8 @@ def ingest_task_status() -> dict: "last_stats": s.last_stats, "last_error": s.last_error, "next_run_at": s.next_run_at, - "interval_s": INGEST_INTERVAL, - "scheduler_active": INGEST_INTERVAL > 0 and (PATTERN_DIR / "sources.yaml").exists(), + "interval_s": GLEAN_INTERVAL, + "scheduler_active": GLEAN_INTERVAL > 0 and (PATTERN_DIR / "sources.yaml").exists(), "submit_endpoint": SUBMIT_ENDPOINT or None, "last_submitted_at": s.last_submitted_at, "last_submit_count": s.last_submit_count, @@ -574,21 +574,21 @@ def ingest_task_status() -> dict: } -@router.post("/api/tasks/ingest") -async def trigger_ingest() -> dict: - """Manually trigger a batch ingest of all configured sources. No-ops if already running.""" +@router.post("/api/tasks/glean") +async def trigger_glean() -> dict: + """Manually trigger a glean of all configured sources. No-ops if already running.""" sources_file = PATTERN_DIR / "sources.yaml" if not sources_file.exists(): raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first") - return await _run_ingest( + return await _run_glean( sources_file, DB_PATH, PATTERN_FILE, submit_endpoint=SUBMIT_ENDPOINT or None, source_host=SOURCE_HOST, ) -@router.post("/api/ingest/wazuh/alert") -async def ingest_wazuh_alert( +@router.post("/api/glean/wazuh/alert") +async def glean_wazuh_alert( alert: dict, source_id: Annotated[str | None, Query(description="Source label (defaults to 'wazuh')")] = None, background_tasks: BackgroundTasks = None, @@ -769,8 +769,8 @@ def _tautulli_write_entry(conn: sqlite3.Connection, entry) -> None: ) -@router.post("/api/ingest/tautulli") -def ingest_tautulli( +@router.post("/api/glean/tautulli") +def glean_tautulli( payload: dict, request: Request, background_tasks: BackgroundTasks, diff --git a/app/services/incidents.py b/app/services/incidents.py index 9699ba0..dd758c1 100644 --- a/app/services/incidents.py +++ b/app/services/incidents.py @@ -6,7 +6,7 @@ import sqlite3 import uuid from pathlib import Path -from app.ingest.base import now_iso +from app.glean.base import now_iso from app.services.models import Incident, ReceivedBundle from app.services.search import SearchResult, entries_in_window, search diff --git a/app/services/models.py b/app/services/models.py index e551135..a0d5df5 100644 --- a/app/services/models.py +++ b/app/services/models.py @@ -10,7 +10,7 @@ class RetrievedEntry: entry_id: str source_id: str # log file path or service name - sequence: int # original line number — ingest order, not wall-clock order + sequence: int # original line number — glean order, not wall-clock order timestamp_raw: str | None # timestamp as it appeared in the log timestamp_iso: str | None # parsed to ISO 8601 for sorting; None if unparseable ingest_time: str # when Turnstone indexed this entry (wall clock) @@ -25,7 +25,7 @@ class RetrievedEntry: @dataclass(frozen=True) class LogPattern: - """A named regex pattern for tagging entries at ingest time.""" + """A named regex pattern for tagging entries at glean time.""" name: str # e.g. "device_disconnect", "auth_failure" pattern: str # regex string diff --git a/app/services/search.py b/app/services/search.py index d7bf12c..7252272 100644 --- a/app/services/search.py +++ b/app/services/search.py @@ -451,9 +451,8 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis else: suppressed += 1 - # When did we last ingest anything? last_row = conn.execute("SELECT MAX(ingest_time) AS t FROM log_entries").fetchone() - last_ingested: str | None = last_row["t"] if last_row else None + last_gleaned: str | None = last_row["t"] if last_row else None conn.close() @@ -465,7 +464,7 @@ def stats_summary(db_path: Path, window_hours: int = 24, severity_overrides: lis "source_health": source_health, "recent_criticals": recent_criticals, "suppressed_criticals": suppressed, - "last_ingested": last_ingested, + "last_gleaned": last_gleaned, } diff --git a/app/tasks/ingest_scheduler.py b/app/tasks/glean_scheduler.py similarity index 88% rename from app/tasks/ingest_scheduler.py rename to app/tasks/glean_scheduler.py index b55b152..6d95137 100644 --- a/app/tasks/ingest_scheduler.py +++ b/app/tasks/glean_scheduler.py @@ -1,10 +1,10 @@ -"""Periodic batch ingest scheduler with optional CF submission. +"""Periodic batch glean scheduler with optional CF submission. -Runs ingest_sources on a configurable interval (TURNSTONE_INGEST_INTERVAL env var, +Runs glean_sources on a configurable interval (TURNSTONE_GLEAN_INTERVAL env var, default 900s / 15 min). Set to 0 to disable. When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote -Turnstone instance (the CF receiving store) after each ingest run. +Turnstone instance (the CF receiving store) after each glean run. """ from __future__ import annotations @@ -19,7 +19,7 @@ from typing import Any import httpx -from app.ingest.pipeline import ingest_sources +from app.glean.pipeline import glean_sources logger = logging.getLogger(__name__) @@ -96,14 +96,14 @@ async def submit_matched( if not entries: return {"ok": True, "submitted": 0, "skipped": True} - url = f"{submit_endpoint.rstrip('/')}/turnstone/api/ingest/batch" + url = f"{submit_endpoint.rstrip('/')}/turnstone/api/glean/batch" payload = {"source_host": source_host, "entries": entries} try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(url, json=payload) resp.raise_for_status() result = resp.json() - submitted = result.get("ingested", len(entries)) + submitted = result.get("gleaned", len(entries)) _state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat() _state.last_submit_count = submitted _state.last_submit_error = None @@ -124,7 +124,7 @@ async def run_once( ) -> dict[str, Any]: """Ingest all sources once, then submit matched entries if configured.""" if _lock.locked(): - return {"ok": False, "error": "ingest already running", "skipped": True} + return {"ok": False, "error": "glean already running", "skipped": True} async with _lock: _state.running = True @@ -133,7 +133,7 @@ async def run_once( loop = asyncio.get_running_loop() stats: dict[str, int] = await loop.run_in_executor( None, - lambda: ingest_sources(sources_file, db_path, pattern_file), + lambda: glean_sources(sources_file, db_path, pattern_file), ) duration = (datetime.now(tz=timezone.utc) - started).total_seconds() _state.last_run_at = started.isoformat() @@ -141,14 +141,14 @@ async def run_once( _state.last_stats = stats _state.last_error = None _state.run_count += 1 - logger.info("Batch ingest complete in %.1fs — %s", duration, stats) + logger.info("Batch glean complete in %.1fs — %s", duration, stats) except Exception as exc: duration = (datetime.now(tz=timezone.utc) - started).total_seconds() _state.last_run_at = started.isoformat() _state.last_duration_s = round(duration, 2) _state.last_error = str(exc) _state.run_count += 1 - logger.error("Batch ingest failed: %s", exc) + logger.error("Batch glean failed: %s", exc) _state.running = False return {"ok": False, "error": str(exc)} finally: @@ -168,7 +168,7 @@ async def scheduler_loop( submit_endpoint: str | None = None, source_host: str = "unknown", ) -> None: - """Run ingest + optional submission every interval_s seconds until cancelled.""" + """Run glean + optional submission every interval_s seconds until cancelled.""" logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file) if submit_endpoint: logger.info("Submission enabled — endpoint: %s", submit_endpoint) diff --git a/app/watch/watcher.py b/app/watch/watcher.py index e12038b..a9490d1 100644 --- a/app/watch/watcher.py +++ b/app/watch/watcher.py @@ -1,4 +1,4 @@ -"""Live watch: tail active log sources and ingest entries in near-real-time. +"""Live watch: tail active log sources and glean entries in near-real-time. Each WatchSource runs a subprocess (journalctl -f, podman/docker logs -f) in a daemon thread and pipes lines through the existing ingestors into SQLite. @@ -18,12 +18,12 @@ from typing import Iterator import yaml -from app.ingest import journald as journald_parser, syslog as syslog_parser -from app.ingest import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser -from app.ingest import qbittorrent as qbit_parser, caddy as caddy_parser -from app.ingest.pipeline import _detect_format -from app.ingest.base import _compile, load_patterns, now_iso -from app.ingest.pipeline import _write_batch, _SCHEMA +from app.glean import journald as journald_parser, syslog as syslog_parser +from app.glean import plaintext as plaintext_parser, servarr as servarr_parser, plex as plex_parser +from app.glean import qbittorrent as qbit_parser, caddy as caddy_parser +from app.glean.pipeline import _detect_format +from app.glean.base import _compile, load_patterns, now_iso +from app.glean.pipeline import _write_batch, _SCHEMA from app.services.search import build_fts_index from app.services.models import RetrievedEntry @@ -85,7 +85,7 @@ class WatchSource: "source_id": self.config.source_id, "type": self.config.source_type, "running": self._thread is not None and self._thread.is_alive(), - "entries_ingested": self._entry_count, + "entries_gleaned": self._entry_count, "last_event": self._last_event, "error": self._error, } diff --git a/docs/tautulli-setup.md b/docs/tautulli-setup.md index 5d719a8..0b61180 100644 --- a/docs/tautulli-setup.md +++ b/docs/tautulli-setup.md @@ -39,7 +39,7 @@ notification agent: ## Webhook URL ``` -http://:8534/turnstone/api/ingest/tautulli +http://:8534/turnstone/api/glean/tautulli ``` Replace `` with the hostname or IP of the machine running diff --git a/harvester/harvester.py b/harvester/harvester.py index 4f8370a..9b18867 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -2,7 +2,7 @@ """Turnstone Harvester — collect logs and ship them to a Turnstone instance. Subcommands: - push Read sources.yaml, POST each log file to Turnstone /api/ingest/upload + push Read sources.yaml, POST each log file to Turnstone /api/glean/upload incident Tag an incident on the remote Turnstone instance Usage: @@ -97,8 +97,8 @@ def cmd_push(args: argparse.Namespace) -> int: logger.warning("No sources defined in %s", sources_path) return 0 - upload_url = args.url.rstrip("/") + "/turnstone/api/ingest/upload" - total_ingested = 0 + upload_url = args.url.rstrip("/") + "/turnstone/api/glean/upload" + total_gleaned = 0 errors = 0 for src in sources: @@ -110,9 +110,9 @@ def cmd_push(args: argparse.Namespace) -> int: logger.info("Pushing %s (%s) ...", src_id, src_path) try: result = _post_file(upload_url, src_path, src_id) - count = result.get("ingested", 0) - total_ingested += count - logger.info(" %s: %d entries ingested", src_id, count) + count = result.get("gleaned", 0) + total_gleaned += count + logger.info(" %s: %d entries gleaned", src_id, count) except urllib.error.HTTPError as exc: logger.error(" %s: HTTP %d — %s", src_id, exc.code, exc.read().decode(errors="replace")) errors += 1 @@ -120,7 +120,7 @@ def cmd_push(args: argparse.Namespace) -> int: logger.error(" %s: %s", src_id, exc) errors += 1 - logger.info("Done. Total ingested: %d entries, errors: %d", total_ingested, errors) + logger.info("Done. Total gleaned: %d entries, errors: %d", total_gleaned, errors) return 1 if errors else 0 diff --git a/harvester/sources.example.yaml b/harvester/sources.example.yaml index 508e257..c521854 100644 --- a/harvester/sources.example.yaml +++ b/harvester/sources.example.yaml @@ -46,6 +46,6 @@ sources: # Wazuh SIEM — alerts.json on the Wazuh manager # Turnstone auto-detects this format; source_id is qualified per agent automatically. # For push-based ingestion from Wazuh custom integrations, use: - # POST /api/ingest/wazuh/alert (single alert JSON body) + # POST /api/glean/wazuh/alert (single alert JSON body) # - id: wazuh # path: /var/ossec/logs/alerts/alerts.json diff --git a/manage.sh b/manage.sh index 7e72b01..19a5b96 100755 --- a/manage.sh +++ b/manage.sh @@ -120,9 +120,9 @@ usage() { echo -e " ${GREEN}dev${NC} uvicorn --reload (:${API_PORT}) + Vite HMR (:${VITE_PORT})" echo "" echo " Data:" - echo -e " ${GREEN}ingest PATH [DB]${NC} Ingest a log file or corpus directory" - echo -e " ${GREEN}ingest-plex [HOST]${NC} Pull Plex log from Cass (or HOST) and ingest" - echo -e " ${GREEN}ingest-qbit [HOST]${NC} Pull qBittorrent log locally or from HOST via SSH" + echo -e " ${GREEN}glean PATH [DB]${NC} Glean a log file or corpus directory" + echo -e " ${GREEN}glean-plex [HOST]${NC} Pull Plex log from Cass (or HOST) and glean" + echo -e " ${GREEN}glean-qbit [HOST]${NC} Pull qBittorrent log locally or from HOST via SSH" echo -e " ${GREEN}build-fts${NC} Rebuild the FTS search index" echo "" echo " Tests:" @@ -134,8 +134,8 @@ usage() { echo " Examples:" echo " ./manage.sh start" echo " ./manage.sh dev" - echo " ./manage.sh ingest corpus/raw/" - echo " ./manage.sh ingest corpus/raw/ data/custom.db" + echo " ./manage.sh glean corpus/raw/" + echo " ./manage.sh glean corpus/raw/ data/custom.db" echo "" } @@ -231,15 +231,15 @@ case "$CMD" in (cd web && npm run dev -- --port "$VITE_PORT") ;; - ingest) + glean) if [[ $# -lt 1 ]]; then - error "Usage: ./manage.sh ingest [DB_PATH]" + error "Usage: ./manage.sh glean [DB_PATH]" fi - info "Ingesting $1 → ${2:-$DB}…" - "$PYTHON" scripts/ingest_corpus.py "$1" "${2:-$DB}" + info "Gleaning $1 → ${2:-$DB}…" + "$PYTHON" scripts/glean_corpus.py "$1" "${2:-$DB}" ;; - ingest-plex) + glean-plex) PLEX_HOST="${1:-cass}" PLEX_LOG_DIR="/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs" TMP_DIR="/tmp/turnstone-plex-$$" @@ -264,16 +264,16 @@ case "$CMD" in ssh "$PLEX_HOST" "cat '${remote_path}'" > "$local_path" done - info "Ingesting ${#REMOTE_LOGS[@]} log file(s) into ${DB}…" + info "Gleaning ${#REMOTE_LOGS[@]} log file(s) into ${DB}…" for f in "$TMP_DIR"/*.log; do - "$PYTHON" scripts/ingest_corpus.py "$f" "$DB" + "$PYTHON" scripts/glean_corpus.py "$f" "$DB" done rm -rf "$TMP_DIR" info "Done. Restarting server…" exec bash "$0" restart ;; - ingest-qbit) + glean-qbit) QBIT_HOST="${1:-}" # Default log locations in priority order QBIT_LOG_PATHS=( @@ -316,8 +316,8 @@ case "$CMD" in info " ← ${LOCAL_LOG}" fi - info "Ingesting into ${DB}…" - "$PYTHON" scripts/ingest_corpus.py "${TMP_DIR}"/*.log "$DB" + info "Gleaning into ${DB}…" + "$PYTHON" scripts/glean_corpus.py "${TMP_DIR}"/*.log "$DB" rm -rf "$TMP_DIR" info "Done. Restarting server…" exec bash "$0" restart diff --git a/patterns/default.yaml b/patterns/default.yaml index c125aaa..6fd3450 100644 --- a/patterns/default.yaml +++ b/patterns/default.yaml @@ -1,4 +1,4 @@ -# Turnstone pattern library — named regex patterns for log tagging at ingest time. +# Turnstone pattern library — named regex patterns for log tagging at glean time. # Each matched pattern name is stored on RetrievedEntry.matched_patterns and # used to boost retrieval relevance for diagnostic queries. # @@ -128,6 +128,21 @@ patterns: severity: ERROR description: NFS mount or RPC timeout + - name: service_crash_loop + pattern: "(restart counter is at [0-9]|start request repeated too quickly|Restart limit hit)" + severity: WARN + description: systemd service crash-looping — restart counter incrementing or rate-limit hit; check for DNS resolution failures, missing dependencies, or bad config + + - name: pkg_daemon_restart + pattern: "(invoke-rc\\.d|Unit process.*(apt-get|dpkg|preinst).*remains running after unit stopped|Stopped.*service.*openssh|Restarting.*OpenBSD Secure Shell)" + severity: WARN + description: Package manager restarted a system daemon — active SSH or service sessions may have been interrupted + + - name: ssh_forward_conflict + pattern: "(channel_setup_fwd_listener_tcpip: cannot listen to port|error: bind.*Address already in use)" + severity: WARN + description: SSH port-forward conflict — previous session port still bound; stale sessions accumulating or rapid reconnects + # Add device/service-specific patterns below this line: - name: qbit_tracker_error diff --git a/patterns/sources-cluster.yaml b/patterns/sources-cluster.yaml index 1d742f4..0dafe22 100644 --- a/patterns/sources-cluster.yaml +++ b/patterns/sources-cluster.yaml @@ -1,15 +1,15 @@ -# Turnstone log sources — Heimdall cluster ingest. +# Turnstone log sources — Heimdall cluster glean. # Covers: Heimdall (local), Navi, Sif, Cass, Strahl (SSH-collected), # Docker services on Heimdall, and network device syslog. # -# Collected by scripts/collect_cluster_logs.sh before each ingest run. +# Collected by scripts/collect_cluster_logs.sh before each glean run. # All paths are container-side (/data/ = bind-mount of /devl/turnstone-cluster/data/). # -# Cron (collect + ingest, every 15 min): +# Cron (collect + glean, every 15 min): # */15 * * * * bash /Library/Development/CircuitForge/turnstone/scripts/collect_cluster_logs.sh && \ -# docker exec turnstone-cluster python scripts/ingest_corpus.py \ +# docker exec turnstone-cluster python scripts/glean_corpus.py \ # --sources /patterns/sources-cluster.yaml --db /data/turnstone.db \ -# >> /var/log/turnstone-cluster-ingest.log 2>&1 +# >> /var/log/turnstone-cluster-glean.log 2>&1 sources: # ── Heimdall (local) ───────────────────────────────────────────────────────── diff --git a/patterns/sources.yaml b/patterns/sources.yaml index 49b89d7..c299f53 100644 --- a/patterns/sources.yaml +++ b/patterns/sources.yaml @@ -1,8 +1,8 @@ # Turnstone log sources — edit this file to add or remove services. # NOTE: the system-journal entry requires export_journal.sh to run on the HOST -# before the container ingest step. See crontab setup instructions in the README. -# Run ingest manually: -# sudo podman exec turnstone python scripts/ingest_corpus.py \ +# before the container glean step. See crontab setup instructions in the README. +# Run glean manually: +# sudo podman exec turnstone python scripts/glean_corpus.py \ # --sources /patterns/sources.yaml --db /data/turnstone.db # # Paths here are container-side paths under the /opt bind mount. @@ -12,7 +12,7 @@ sources: # ── System (exported by export_journal.sh on the host) ─────────────────── # journal-export.jsonl and dmesg-export.txt are written to /opt/turnstone/data/ - # by the export script before each ingest run. + # by the export script before each glean run. - id: system-journal path: /data/journal-export.jsonl @@ -73,7 +73,7 @@ sources: # ── MQTT / IoT (live — subscribe mode, no path needed) ─────────────────── # Requires: pip install circuitforge-core[mqtt] - # These sources are handled by the live MQTT subscriber task (not batch ingest). + # These sources are handled by the live MQTT subscriber task (not batch glean). # Uncomment and configure to enable. # # Meshtastic MQTT bridge (node must have MQTT uplink enabled): diff --git a/podman-standalone.sh b/podman-standalone.sh index f937890..68c77cb 100755 --- a/podman-standalone.sh +++ b/podman-standalone.sh @@ -2,7 +2,7 @@ # podman-standalone.sh — Turnstone rootful Podman setup (no Compose) # # For hosts running system Podman (non-rootless) with systemd. -# Turnstone is a diagnostic log intelligence layer — ingest service logs, +# Turnstone is a diagnostic log intelligence layer — glean service logs, # search by symptom, and view incidents in a lightweight web UI. # # ── Prerequisites ──────────────────────────────────────────────────────────── @@ -28,18 +28,18 @@ # sudo systemctl daemon-reload # sudo systemctl enable --now turnstone # -# ── Ingesting logs ──────────────────────────────────────────────────────────── +# ── Gleaning logs ───────────────────────────────────────────────────────────── # All service logs under /opt are accessible inside the container. # Sources are configured in patterns/sources.yaml (bind-mounted at /patterns/). # -# To ingest all sources (run manually or via cron): +# To glean all sources (run manually or via cron): # -# sudo podman exec turnstone python scripts/ingest_corpus.py \ +# sudo podman exec turnstone python scripts/glean_corpus.py \ # --sources /patterns/sources.yaml --db /data/turnstone.db # # Example cron (every 15 minutes, add to root's crontab with: sudo crontab -e): -# */15 * * * * podman exec turnstone python scripts/ingest_corpus.py \ -# --sources /patterns/sources.yaml --db /data/turnstone.db >> /var/log/turnstone-ingest.log 2>&1 +# */15 * * * * podman exec turnstone python scripts/glean_corpus.py \ +# --sources /patterns/sources.yaml --db /data/turnstone.db >> /var/log/turnstone-glean.log 2>&1 # # To add a new log source: edit /opt/turnstone/patterns/sources.yaml — no restart needed. # @@ -73,7 +73,7 @@ TZ=America/Los_Angeles # # ── Orchard submission (opt-in telemetry) ──────────────────────────────────── # Set TURNSTONE_SUBMIT_ENDPOINT to push pattern-matched log entries to a CF -# receiving instance after each ingest run. Only matched entries are sent — +# receiving instance after each glean run. Only matched entries are sent — # no raw log content. Used to build Avocet training data. # # export TURNSTONE_SUBMIT_ENDPOINT=https://harvest.circuitforge.tech/xander @@ -142,8 +142,8 @@ echo "Check container health with:" echo " sudo podman ps" echo " sudo podman logs turnstone" echo "" -echo "To ingest all sources now:" -echo " sudo podman exec turnstone python scripts/ingest_corpus.py \\" +echo "To glean all sources now:" +echo " sudo podman exec turnstone python scripts/glean_corpus.py \\" echo " --sources /patterns/sources.yaml --db /data/turnstone.db" echo "" echo "To add a new source: edit /opt/turnstone/patterns/sources.yaml — no restart needed." diff --git a/requirements.txt b/requirements.txt index 66b35f2..b5abda4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ aiofiles>=23.0.0 python-multipart>=0.0.9 dateparser>=1.2.0 httpx>=0.27.0 +paramiko diff --git a/scripts/build_fts_index.py b/scripts/build_fts_index.py index d0d4677..37f731f 100644 --- a/scripts/build_fts_index.py +++ b/scripts/build_fts_index.py @@ -1,4 +1,4 @@ -"""CLI: build (or update) the FTS5 full-text search index after ingest.""" +"""CLI: build (or update) the FTS5 full-text search index after glean.""" from __future__ import annotations import sys @@ -13,7 +13,7 @@ if __name__ == "__main__": if not db_path.exists(): print(f"ERROR: database not found: {db_path}", file=sys.stderr) - print("Run ingest first: python scripts/ingest_corpus.py", file=sys.stderr) + print("Run glean first: python scripts/glean_corpus.py", file=sys.stderr) sys.exit(1) print(f"Building FTS index for {db_path} ...") diff --git a/scripts/collect_cluster_logs.sh b/scripts/collect_cluster_logs.sh index 2c8b0f6..9c900e0 100644 --- a/scripts/collect_cluster_logs.sh +++ b/scripts/collect_cluster_logs.sh @@ -20,7 +20,7 @@ SSH_OPTS="-o ConnectTimeout=5 -o BatchMode=yes -o StrictHostKeyChecking=no" PYTHON=/devl/miniconda3/envs/cf/bin/python INGEST="${PYTHON} /Library/Development/CircuitForge/turnstone/scripts/ingest_corpus.py" DB=/devl/turnstone-cluster/data/turnstone.db -LOG=/devl/turnstone-cluster/data/ingest.log +LOG=/devl/turnstone-cluster/data/glean.log mkdir -p "${DATA_DIR}" @@ -141,7 +141,7 @@ fi # Remote journals (explicit source IDs via YAML) ${INGEST} --sources /devl/turnstone-cluster/patterns/sources-cluster.yaml --db "${DB}" - # Docker and Plex logs (source IDs derived from filenames by directory ingest) + # Docker and Plex logs (source IDs derived from filenames by directory glean) for dir in "${HEIMDALL_DIR}" "${NAVI_DIR}" "${STRAHL_DIR}" "${PLEX_DIR}"; do [[ -d "${dir}" ]] && ls "${dir}"/*.jsonl "${dir}"/*.log 2>/dev/null | grep -q . && \ ${INGEST} "${dir}" "${DB}" || true diff --git a/scripts/export_journal.sh b/scripts/export_journal.sh index 941ab70..e94a594 100644 --- a/scripts/export_journal.sh +++ b/scripts/export_journal.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Export recent system messages to files the Turnstone container can ingest. +# Export recent system messages to files the Turnstone container can glean. # # Exports: # journal-export.jsonl — journald (if journalctl is available) @@ -11,11 +11,11 @@ # Usage (standalone): # sudo bash /opt/turnstone/scripts/export_journal.sh # -# Cron (combined with ingest): +# Cron (combined with glean): # */15 * * * * bash /opt/turnstone/scripts/export_journal.sh && \ # podman exec turnstone python scripts/ingest_corpus.py \ # --sources /patterns/sources.yaml --db /data/turnstone.db \ -# >> /var/log/turnstone-ingest.log 2>&1 +# >> /var/log/turnstone-glean.log 2>&1 set -euo pipefail diff --git a/scripts/ingest_corpus.py b/scripts/glean_corpus.py similarity index 64% rename from scripts/ingest_corpus.py rename to scripts/glean_corpus.py index ca12ae6..e3d14db 100644 --- a/scripts/ingest_corpus.py +++ b/scripts/glean_corpus.py @@ -1,11 +1,11 @@ -"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database. +"""CLI: glean a log file or corpus directory into the Turnstone SQLite database. Usage: # Single file or directory (legacy) - python scripts/ingest_corpus.py [db_path] + python scripts/glean_corpus.py [db_path] # Sources config (multi-service) - python scripts/ingest_corpus.py --sources [--db ] + python scripts/glean_corpus.py --sources [--db ] """ from __future__ import annotations @@ -17,7 +17,7 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") sys.path.insert(0, str(Path(__file__).parent.parent)) -from app.ingest.pipeline import ingest, ingest_file, ingest_sources +from app.glean.pipeline import glean_dir, glean_file, glean_sources def _print_stats(stats: dict[str, int]) -> None: @@ -33,33 +33,33 @@ if __name__ == "__main__": if not args: print( "Usage:\n" - " ingest_corpus.py [db_path]\n" - " ingest_corpus.py --sources [--db ]", + " glean_corpus.py [db_path]\n" + " glean_corpus.py --sources [--db ]", file=sys.stderr, ) sys.exit(1) if args[0] == "--sources": if len(args) < 2: - print("Usage: ingest_corpus.py --sources [--db ]", file=sys.stderr) + print("Usage: glean_corpus.py --sources [--db ]", file=sys.stderr) sys.exit(1) sources_file = Path(args[1]) db_path = Path("data/turnstone.db") if "--db" in args: db_path = Path(args[args.index("--db") + 1]) db_path.parent.mkdir(parents=True, exist_ok=True) - print(f"Ingesting sources from {sources_file} → {db_path}") - stats = ingest_sources(sources_file, db_path) + print(f"Gleaning sources from {sources_file} → {db_path}") + stats = glean_sources(sources_file, db_path) _print_stats(stats) else: target = Path(args[0]) db_path = Path(args[1]) if len(args) > 1 else Path("data/turnstone.db") db_path.parent.mkdir(parents=True, exist_ok=True) - print(f"Ingesting {target} → {db_path}") + print(f"Gleaning {target} → {db_path}") if target.is_file(): - stats = ingest_file(target, db_path) + stats = glean_file(target, db_path) elif target.is_dir(): - stats = ingest(target, db_path) + stats = glean_dir(target, db_path) else: print(f"Error: {target} is not a file or directory", file=sys.stderr) sys.exit(1) diff --git a/tests/context/test_doc_upload.py b/tests/context/test_doc_upload.py index 9986d62..162f6f5 100644 --- a/tests/context/test_doc_upload.py +++ b/tests/context/test_doc_upload.py @@ -3,7 +3,7 @@ import sqlite3 import pytest from pathlib import Path -from app.ingest.doc_upload import ingest_upload +from app.glean.doc_upload import glean_upload from app.context.store import list_facts, list_documents from app.context.chunker import UnsupportedDocType @@ -40,7 +40,7 @@ services: ports: - "32400:32400" """ - result = ingest_upload(db, "docker-compose.yml", yaml_bytes) + result = glean_upload(db, "docker-compose.yml", yaml_bytes) assert result["doc_type"] == "yaml" assert result["facts_written"] >= 1 assert result["chunks_written"] >= 1 @@ -53,7 +53,7 @@ services: def test_ingest_markdown_no_facts(db): md = b"# Runbook\n\nRestart plex with `systemctl restart plex`." - result = ingest_upload(db, "runbook.md", md) + result = glean_upload(db, "runbook.md", md) assert result["doc_type"] == "markdown" assert result["facts_written"] == 0 assert result["chunks_written"] >= 1 @@ -61,4 +61,4 @@ def test_ingest_markdown_no_facts(db): def test_ingest_raises_on_bad_type(db): with pytest.raises(UnsupportedDocType): - ingest_upload(db, "report.pdf", b"data") + glean_upload(db, "report.pdf", b"data") diff --git a/tests/context/test_schema.py b/tests/context/test_schema.py index 69b0327..ea71812 100644 --- a/tests/context/test_schema.py +++ b/tests/context/test_schema.py @@ -2,7 +2,7 @@ import sqlite3 from pathlib import Path import pytest -from app.ingest.pipeline import ensure_schema +from app.glean.pipeline import ensure_schema def test_context_tables_created(tmp_path): diff --git a/tests/test_blocklist_endpoints.py b/tests/test_blocklist_endpoints.py index 1c4289a..938042f 100644 --- a/tests/test_blocklist_endpoints.py +++ b/tests/test_blocklist_endpoints.py @@ -9,7 +9,7 @@ from unittest.mock import MagicMock, patch @pytest.fixture def client(tmp_path): from fastapi.testclient import TestClient - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema import app.rest as rest_module db = tmp_path / "test.db" @@ -25,7 +25,7 @@ def client(tmp_path): @pytest.fixture def client_with_candidate(tmp_path): from fastapi.testclient import TestClient - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema import app.rest as rest_module import sqlite3, uuid diff --git a/tests/test_ingest_dmesg.py b/tests/test_glean_dmesg.py similarity index 97% rename from tests/test_ingest_dmesg.py rename to tests/test_glean_dmesg.py index 9e64c09..ff4fdbe 100644 --- a/tests/test_ingest_dmesg.py +++ b/tests/test_glean_dmesg.py @@ -1,7 +1,7 @@ -"""Tests for the dmesg log ingestor.""" +"""Tests for the dmesg log gleaner.""" from __future__ import annotations -from app.ingest.dmesg_log import is_dmesg_log, parse +from app.glean.dmesg_log import is_dmesg_log, parse RELATIVE_SAMPLE = """\ [ 0.000000] Linux version 6.8.0-65-generic diff --git a/tests/test_ingest_qbittorrent.py b/tests/test_glean_qbittorrent.py similarity index 98% rename from tests/test_ingest_qbittorrent.py rename to tests/test_glean_qbittorrent.py index 4b3c874..5c5d5bf 100644 --- a/tests/test_ingest_qbittorrent.py +++ b/tests/test_glean_qbittorrent.py @@ -1,9 +1,9 @@ -"""Tests for the qBittorrent log ingestor.""" +"""Tests for the qBittorrent log gleaner.""" from __future__ import annotations import pytest -from app.ingest.qbittorrent import is_qbit_log, parse +from app.glean.qbittorrent import is_qbit_log, parse # --------------------------------------------------------------------------- # Classic format sample (pre-5.x GUI builds) diff --git a/tests/test_ingest_syslog.py b/tests/test_glean_syslog.py similarity index 96% rename from tests/test_ingest_syslog.py rename to tests/test_glean_syslog.py index cde2d43..b7e0846 100644 --- a/tests/test_ingest_syslog.py +++ b/tests/test_glean_syslog.py @@ -1,7 +1,7 @@ -"""Tests for the syslog (RFC 3164) ingestor.""" +"""Tests for the syslog (RFC 3164) gleaner.""" from __future__ import annotations -from app.ingest.syslog import is_syslog, parse +from app.glean.syslog import is_syslog, parse SYSLOG_SAMPLE = """\ May 11 14:23:01 xanderland sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2 diff --git a/tests/test_ingest_tautulli.py b/tests/test_glean_tautulli.py similarity index 96% rename from tests/test_ingest_tautulli.py rename to tests/test_glean_tautulli.py index a3820f8..4b12b08 100644 --- a/tests/test_ingest_tautulli.py +++ b/tests/test_glean_tautulli.py @@ -1,10 +1,10 @@ -"""Tests for the Tautulli webhook ingestor.""" +"""Tests for the Tautulli webhook gleaner.""" from __future__ import annotations import pytest from unittest.mock import patch -from app.ingest.tautulli import is_tautulli_payload, parse_webhook +from app.glean.tautulli import is_tautulli_payload, parse_webhook # --------------------------------------------------------------------------- @@ -253,7 +253,7 @@ class TestEndpoint: @pytest.fixture def client(self, tmp_path): from fastapi.testclient import TestClient - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema import app.rest as rest_module db = tmp_path / "test.db" @@ -267,14 +267,14 @@ class TestEndpoint: def test_missing_action_returns_400(self, client): resp = client.post( - "/turnstone/api/ingest/tautulli", + "/turnstone/api/glean/tautulli", json={"session_key": "x"}, ) assert resp.status_code == 400 def test_wrong_token_returns_403(self, tmp_path): from fastapi.testclient import TestClient - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema import app.rest as rest_module db = tmp_path / "test.db" @@ -288,7 +288,7 @@ class TestEndpoint: patch.object(rest_module, "_compiled_patterns", []): with TestClient(rest_module.app, raise_server_exceptions=True) as c: resp = c.post( - "/turnstone/api/ingest/tautulli", + "/turnstone/api/glean/tautulli", json=_ERROR_PAYLOAD, headers={"X-Tautulli-Token": "wrong"}, ) @@ -296,7 +296,7 @@ class TestEndpoint: def test_valid_payload_returns_200(self, client): resp = client.post( - "/turnstone/api/ingest/tautulli", + "/turnstone/api/glean/tautulli", json=_ERROR_PAYLOAD, ) assert resp.status_code == 200 diff --git a/tests/test_ingest_wazuh.py b/tests/test_glean_wazuh.py similarity index 96% rename from tests/test_ingest_wazuh.py rename to tests/test_glean_wazuh.py index d218ebd..98c0458 100644 --- a/tests/test_ingest_wazuh.py +++ b/tests/test_glean_wazuh.py @@ -1,11 +1,11 @@ -"""Tests for the Wazuh alert ingestor.""" +"""Tests for the Wazuh alert gleaner.""" from __future__ import annotations import json from datetime import datetime -from app.ingest.wazuh import is_wazuh_alert, parse -from app.ingest.pipeline import _detect_format +from app.glean.wazuh import is_wazuh_alert, parse +from app.glean.pipeline import _detect_format _ALERT = { "timestamp": "2024-01-15T10:23:45.123+0000", diff --git a/tests/test_service_blocklist.py b/tests/test_service_blocklist.py index 893a076..f64d6e6 100644 --- a/tests/test_service_blocklist.py +++ b/tests/test_service_blocklist.py @@ -8,7 +8,7 @@ from pathlib import Path class TestSchema: def test_blocklist_candidates_table_exists(self, tmp_path): - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) @@ -16,7 +16,7 @@ class TestSchema: assert "blocklist_candidates" in tables def test_blocklist_candidates_columns(self, tmp_path): - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) conn = sqlite3.connect(str(db)) @@ -28,7 +28,7 @@ class TestSchema: } def test_status_default_is_pending(self, tmp_path): - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema import uuid db = tmp_path / "test.db" ensure_schema(db) @@ -89,7 +89,7 @@ class TestTelemetry: class TestExtraction: @pytest.fixture def db(self, tmp_path): - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema p = tmp_path / "test.db" ensure_schema(p) return p @@ -195,7 +195,7 @@ class TestExtraction: class TestCandidateManagement: @pytest.fixture def db_with_candidate(self, tmp_path): - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema import sqlite3, uuid db = tmp_path / "test.db" ensure_schema(db) diff --git a/tests/test_services_diagnose.py b/tests/test_services_diagnose.py index 2cb0e01..e50debb 100644 --- a/tests/test_services_diagnose.py +++ b/tests/test_services_diagnose.py @@ -54,7 +54,7 @@ def test_keywords_cleaned_of_extra_spaces(): def test_diagnose_with_explicit_window_sets_time_detected(tmp_path): - from app.ingest.pipeline import ensure_schema + from app.glean.pipeline import ensure_schema db = tmp_path / "test.db" ensure_schema(db) result = diagnose(db, query="plex", since="2026-05-11T14:00:00+00:00", until="2026-05-11T15:00:00+00:00") diff --git a/web/src/components/QuickCapture.vue b/web/src/components/QuickCapture.vue index 1d71b14..0d40398 100644 --- a/web/src/components/QuickCapture.vue +++ b/web/src/components/QuickCapture.vue @@ -104,7 +104,7 @@

No {{ severityFilter }} entries in this result set.

diff --git a/web/src/views/DashboardView.vue b/web/src/views/DashboardView.vue index 8ca5720..98a9c4f 100644 --- a/web/src/views/DashboardView.vue +++ b/web/src/views/DashboardView.vue @@ -10,7 +10,7 @@ class="w-2 h-2 rounded-full flex-shrink-0" > - {{ watchActive ? `Live — ${watchSources.length} source${watchSources.length !== 1 ? 's' : ''} watched` : 'Manual ingest mode' }} + {{ watchActive ? `Live — ${watchSources.length} source${watchSources.length !== 1 ? 's' : ''} watched` : 'Manual glean mode' }} @@ -20,8 +20,8 @@ class="flex items-center gap-2 rounded border border-surface-border bg-surface-raised px-4 py-2.5 text-xs text-text-dim" > - Live watch active — last event: {{ shortTs(stats.last_ingested) }}. Waiting for new entries to arrive. - Last ingested: {{ shortTs(stats.last_ingested) }} — 24h counts reflect this window, not today. + Live watch active — last event: {{ shortTs(stats.last_gleaned) }}. Waiting for new entries to arrive. + Last gleaned: {{ shortTs(stats.last_gleaned) }} — 24h counts reflect this window, not today. @@ -171,7 +171,7 @@ interface StatsResponse { criticals_24h: number errors_24h: number suppressed_criticals: number - last_ingested: string | null + last_gleaned: string | null source_health: SourceHealth[] recent_criticals: Array<{ entry_id: string @@ -186,7 +186,7 @@ interface WatchSourceStatus { source_id: string type: string running: boolean - entries_ingested: number + entries_gleaned: number last_event: string | null error: string | null } @@ -211,8 +211,8 @@ const watchActive = computed(() => ) const isStale = computed(() => { - if (!stats.value?.last_ingested) return false - const age = Date.now() - new Date(stats.value.last_ingested).getTime() + if (!stats.value?.last_gleaned) return false + const age = Date.now() - new Date(stats.value.last_gleaned).getTime() return age > 25 * 60 * 60 * 1000 // older than 25h }) diff --git a/web/src/views/LogSearchView.vue b/web/src/views/LogSearchView.vue index 1f03371..f7ea3b2 100644 --- a/web/src/views/LogSearchView.vue +++ b/web/src/views/LogSearchView.vue @@ -106,7 +106,7 @@

No results for "{{ store.query }}"

-

Try broader terms or check the Sources tab to confirm data is ingested.

+

Try broader terms or check the Sources tab to confirm data is gleaned.

diff --git a/web/src/views/SourcesView.vue b/web/src/views/SourcesView.vue index 81029ef..7290956 100644 --- a/web/src/views/SourcesView.vue +++ b/web/src/views/SourcesView.vue @@ -3,7 +3,7 @@

Log Sources

-

All hosts and services in the ingested corpus.

+

All hosts and services in the gleaned corpus.