From 82977f365b63c7e74787c8c4e2234d47c5bfb83d Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 20 May 2026 08:57:25 -0700 Subject: [PATCH] feat: periodic ingest scheduler + Orchard submission pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds asyncio-native background scheduler (TURNSTONE_INGEST_INTERVAL, default 900s) that runs batch ingest then pushes pattern-matched entries to a remote CF harvest endpoint (TURNSTONE_SUBMIT_ENDPOINT). - app/tasks/ingest_scheduler.py: IngestState, scheduler_loop, run_once, submit_matched, _query_matched_since — asyncio.Lock prevents concurrent runs - app/rest.py: POST /api/ingest/batch (pre-parsed entry receiver), GET /api/tasks/ingest/status, POST /api/tasks/ingest (manual trigger), TURNSTONE_INGEST_INTERVAL + TURNSTONE_SUBMIT_ENDPOINT env wiring in lifespan - docker-compose.submissions.yml: segregated daniel (8536) + xander (8537) receiving instances on Heimdall, isolated DBs under /devl/docker/turnstone-submissions// - podman-standalone.sh: pass-through for TURNSTONE_SUBMIT_ENDPOINT + TURNSTONE_SOURCE_HOST - app/ingest/mqtt_subscriber.py: MQTT log source adapter - app/ingest/wazuh.py: Wazuh alert JSON adapter - tests/test_ingest_wazuh.py: Wazuh adapter test suite --- .env.example | 4 + README.md | 1 + app/ingest/mqtt_subscriber.py | 166 +++++++++++++++++++++++++++++ app/ingest/pipeline.py | 6 +- app/ingest/wazuh.py | 161 +++++++++++++++++++++++++++++ app/rest.py | 177 ++++++++++++++++++++++++++++++- app/tasks/__init__.py | 0 app/tasks/ingest_scheduler.py | 184 +++++++++++++++++++++++++++++++++ docker-compose.submissions.yml | 74 +++++++++++++ harvester/sources.example.yaml | 7 ++ patterns/sources.yaml | 24 +++++ podman-standalone.sh | 9 ++ tests/test_ingest_wazuh.py | 118 +++++++++++++++++++++ 13 files changed, 929 insertions(+), 2 deletions(-) create mode 100644 app/ingest/mqtt_subscriber.py create mode 100644 app/ingest/wazuh.py create mode 100644 app/tasks/__init__.py create mode 100644 app/tasks/ingest_scheduler.py create mode 100644 docker-compose.submissions.yml create mode 100644 tests/test_ingest_wazuh.py diff --git a/.env.example b/.env.example index 5f9f889..483790b 100644 --- a/.env.example +++ b/.env.example @@ -22,3 +22,7 @@ # --- Bundle endpoint (optional) --- # Remote endpoint to push diagnostic bundles for escalation. # TURNSTONE_BUNDLE_ENDPOINT=https://example.com/api/bundles + +# --- Periodic batch ingest --- +# Seconds between automatic ingest runs from sources.yaml. Set to 0 to disable. +# TURNSTONE_INGEST_INTERVAL=900 diff --git a/README.md b/README.md index db7ef2c..a36ae60 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ Copy `.env.example` to `.env` (or pass as `-e` flags to Docker/Podman). All vari | `TURNSTONE_PATTERNS` | `./patterns` | Pattern directory (default.yaml, sources.yaml, watch.yaml). | | `TURNSTONE_SOURCE_HOST` | `unknown` | Host identifier stamped on ingested entries. | | `TURNSTONE_BUNDLE_ENDPOINT` | — | Remote URL to push diagnostic bundles for escalation. | +| `TURNSTONE_INGEST_INTERVAL` | `900` | Seconds between automatic batch ingest runs. Set to `0` to disable. | --- diff --git a/app/ingest/mqtt_subscriber.py b/app/ingest/mqtt_subscriber.py new file mode 100644 index 0000000..1f00da4 --- /dev/null +++ b/app/ingest/mqtt_subscriber.py @@ -0,0 +1,166 @@ +"""Live MQTT ingest subscriber for Turnstone. + +Reads ``type: mqtt`` entries from sources.yaml and subscribes to each broker +in the background. Incoming messages are normalized to RetrievedEntry and +written to the Turnstone SQLite database as they arrive. + +This runs as an asyncio task alongside the batch ingest scheduler. It is +started from the FastAPI lifespan in rest.py. + +MQTT source config format in sources.yaml:: + + sources: + - id: meshtastic-home + type: mqtt + broker_host: 10.1.10.5 + broker_port: 1883 # optional, default 1883 + broker_username: ~ # optional + broker_password: ~ # optional + topics: + - msh/# # one or more topic patterns + severity: INFO # optional default severity for all messages + + - id: iot-sensors + type: mqtt + broker_host: localhost + topics: + - home/+/temperature + - home/+/humidity +""" +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path + +import yaml + +from app.services.models import RetrievedEntry + +logger = logging.getLogger(__name__) + + +def _load_mqtt_sources(sources_file: Path) -> list[dict]: + """Return only the ``type: mqtt`` entries from sources.yaml.""" + if not sources_file.exists(): + return [] + with sources_file.open() as f: + data = yaml.safe_load(f) or {} + return [s for s in data.get("sources", []) if s.get("type") == "mqtt"] + + +def _make_entry_id(source_id: str, seq: int, text: str) -> str: + h = hashlib.sha1(f"{source_id}:{seq}:{text}".encode()).hexdigest()[:16] + return f"{source_id}:{seq}:{h}" + + +def _write_entry(db_path: Path, entry: RetrievedEntry) -> None: + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + INSERT OR IGNORE INTO log_entries + (id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.entry_id, + entry.source_id, + entry.sequence, + entry.timestamp_raw, + entry.timestamp_iso, + entry.ingest_time, + entry.severity, + entry.repeat_count, + 1 if entry.out_of_order else 0, + json.dumps(entry.matched_patterns), + entry.text, + ), + ) + + +async def _run_source_subscriber(source: dict, db_path: Path) -> None: + """Maintain a subscription to one MQTT source, reconnecting on error.""" + try: + from circuitforge_core.mqtt import MQTTClient, MQTTConfig + except ImportError: + logger.error( + "circuitforge-core[mqtt] is not installed — MQTT source %r skipped. " + "Run: pip install circuitforge-core[mqtt]", + source.get("id"), + ) + return + + source_id: str = source["id"] + host: str = source["broker_host"] + port: int = int(source.get("broker_port", 1883)) + username: str | None = source.get("broker_username") or source.get("username") + password: str | None = source.get("broker_password") or source.get("password") + topics: list[str] = source.get("topics", ["#"]) + default_severity: str = source.get("severity", "INFO") + + cfg = MQTTConfig( + host=host, + port=port, + username=username, + password=password, + client_id=f"turnstone-{source_id}", + ) + client = MQTTClient(cfg) + seq = 0 + + for topic in topics: + @client.on(topic) + async def _handle(msg, _src=source_id, _sev=default_severity): + nonlocal seq + seq += 1 + now = datetime.now(tz=timezone.utc).isoformat() + text = msg.text() + entry = RetrievedEntry( + entry_id=_make_entry_id(_src, seq, text), + source_id=_src, + sequence=seq, + timestamp_raw=now, + timestamp_iso=now, + ingest_time=now, + severity=_sev, + repeat_count=1, + out_of_order=False, + matched_patterns=[], + text=f"[{msg.topic}] {text}", + ) + _write_entry(db_path, entry) + logger.debug("MQTT[%s] %s: %s", _src, msg.topic, text[:120]) + + logger.info("MQTT subscriber starting: %s → %s:%d topics=%s", source_id, host, port, topics) + await client.run() + + +async def run_mqtt_subscribers(sources_file: Path, db_path: Path) -> None: + """Start one subscriber task per MQTT source. Runs until cancelled.""" + sources = _load_mqtt_sources(sources_file) + if not sources: + logger.debug("No MQTT sources configured in %s", sources_file) + return + + logger.info("Starting %d MQTT subscriber(s)", len(sources)) + tasks = [ + asyncio.create_task( + _run_source_subscriber(src, db_path), + name=f"mqtt-{src.get('id', i)}", + ) + for i, src in enumerate(sources) + ] + + try: + await asyncio.gather(*tasks) + except asyncio.CancelledError: + for t in tasks: + t.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + raise diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py index 93def7b..f912be9 100644 --- a/app/ingest/pipeline.py +++ b/app/ingest/pipeline.py @@ -10,7 +10,7 @@ from typing import Iterator import yaml -from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog +from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh from app.ingest.base import _compile, load_patterns, now_iso from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index @@ -137,6 +137,8 @@ def _detect_format(first_line: str) -> str: return "journald" if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): return "docker" + if wazuh.is_wazuh_alert(obj): + return "wazuh" if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): return "caddy" except (json.JSONDecodeError, AttributeError): @@ -178,6 +180,8 @@ def _parse_file( if fmt == "journald": yield from journald.parse(all_lines(), source_id, compiled, ingest_time) + elif fmt == "wazuh": + yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "docker": yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "caddy": diff --git a/app/ingest/wazuh.py b/app/ingest/wazuh.py new file mode 100644 index 0000000..49e808d --- /dev/null +++ b/app/ingest/wazuh.py @@ -0,0 +1,161 @@ +"""Wazuh SIEM alert parser. + +Handles Wazuh's alerts.json format (JSON Lines — one alert object per line): + + /var/ossec/logs/alerts/alerts.json (on the Wazuh manager) + +Each line is a complete JSON object. Key fields used: + timestamp — ISO 8601 with timezone offset ("2024-01-15T10:23:45.123+0000") + rule.level — 1-15 (maps to Turnstone severity) + rule.id — Wazuh rule ID + rule.description — human-readable rule description (primary message text) + rule.groups — list of category tags + agent.name — hostname that generated the original event + agent.ip — agent IP address + full_log — original raw log line that triggered the alert + location — log file or input that was monitored + data — dict of decoded fields (srcip, dstip, url, etc.) +""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, make_entry_id, now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + +# Wazuh rule levels 1-15 → Turnstone severity labels. +# Levels < 4 are normally informational, 7+ begin to matter operationally, +# 10+ correspond to SIEM-worthy events, 13+ are critical. +_LEVEL_SEVERITY: dict[int, str] = { + 1: "DEBUG", 2: "DEBUG", 3: "DEBUG", + 4: "INFO", 5: "INFO", 6: "NOTICE", + 7: "WARN", 8: "WARN", 9: "WARN", + 10: "ERROR", 11: "ERROR", 12: "ERROR", + 13: "CRITICAL", 14: "CRITICAL", 15: "CRITICAL", +} + + +def is_wazuh_alert(obj: dict) -> bool: + """Return True if a parsed JSON object looks like a Wazuh alert.""" + return ( + isinstance(obj.get("rule"), dict) + and isinstance(obj.get("agent"), dict) + and ("timestamp" in obj or "manager" in obj) + ) + + +def _parse_timestamp(raw: str) -> str: + """Convert Wazuh's ISO 8601 timestamp to UTC ISO 8601.""" + if not raw: + return "" + for fmt in ( + "%Y-%m-%dT%H:%M:%S.%f%z", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + ): + try: + dt = datetime.strptime(raw, fmt) + return dt.astimezone(timezone.utc).isoformat() + except ValueError: + continue + return raw + + +def _build_text(alert: dict) -> str: + """Compose a readable, searchable text representation of the alert.""" + rule = alert.get("rule", {}) + agent = alert.get("agent", {}) + + agent_name = agent.get("name", "unknown") + agent_ip = agent.get("ip", "") + rule_id = rule.get("id", "") + rule_desc = rule.get("description", "(no description)") + groups = rule.get("groups", []) + location = alert.get("location", "") + full_log = alert.get("full_log", "") + + parts: list[str] = [] + + # Header line: agent + rule context + agent_tag = f"{agent_name}/{agent_ip}" if agent_ip else agent_name + group_tag = ",".join(groups) if groups else "" + header = f"[wazuh][agent:{agent_tag}][rule:{rule_id}]" + if group_tag: + header += f"[{group_tag}]" + parts.append(f"{header} {rule_desc}") + + if location: + parts.append(f"location: {location}") + + # Extra decoded fields (srcip, dstip, url, user, etc.) + data = alert.get("data", {}) + if isinstance(data, dict) and data: + kv = " | ".join(f"{k}={v}" for k, v in sorted(data.items()) if v) + if kv: + parts.append(kv) + + if full_log and full_log.strip() != rule_desc.strip(): + parts.append(f"raw: {full_log.strip()}") + + return "\n".join(parts) + + +def parse( + lines: Iterator[str], + source_id: str, + compiled_patterns: list[tuple[LogPattern, object]], + ingest_time: str | None = None, +) -> Iterator[RetrievedEntry]: + ingest_time = ingest_time or now_iso() + state = SourceState() + + for raw_line in lines: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + alert = json.loads(raw_line) + except json.JSONDecodeError: + continue + + if not isinstance(alert, dict): + continue + + rule = alert.get("rule", {}) + agent = alert.get("agent", {}) + + ts_raw = alert.get("timestamp", "") + ts_iso = _parse_timestamp(ts_raw) + + level = int(rule.get("level", 0)) + severity = _LEVEL_SEVERITY.get(level, "INFO") + + # Qualify source_id by agent so logs from different hosts stay separate. + agent_name = agent.get("name", "") + src = f"{source_id}:{agent_name}" if agent_name else source_id + + text = _build_text(alert) + if not text: + continue + + repeat, out_of_order = state.observe(text, ts_iso) + matched = apply_patterns(text, compiled_patterns) + + yield RetrievedEntry( + entry_id=make_entry_id(src, state.sequence, text), + source_id=src, + sequence=state.sequence, + timestamp_raw=ts_raw, + timestamp_iso=ts_iso, + ingest_time=ingest_time, + severity=severity, + repeat_count=repeat, + out_of_order=out_of_order, + matched_patterns=matched, + text=text, + ) diff --git a/app/rest.py b/app/rest.py index a60ae95..81601f2 100644 --- a/app/rest.py +++ b/app/rest.py @@ -28,8 +28,9 @@ from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from app.ingest.pipeline import ensure_schema, ingest_file as _ingest_file -from app.ingest.base import load_compiled_patterns +from app.ingest.base import load_compiled_patterns, now_iso from app.ingest.tautulli import parse_webhook as _parse_tautulli +from app.ingest.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh from app.services.blocklist import ( BlocklistCandidate, get_candidate, @@ -73,6 +74,8 @@ from app.context.retriever import retrieve_context as _retrieve_context, format_ from app.ingest.doc_upload import ingest_upload as _ingest_upload from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session from app.context.chunker import UnsupportedDocType, FileTooLarge +from app.tasks.ingest_scheduler import get_state as _ingest_state, run_once as _run_ingest, scheduler_loop as _scheduler_loop, submit_matched as _submit_matched +from app.ingest.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) PREFS_PATH = DB_PATH.parent / "preferences.json" @@ -81,6 +84,8 @@ SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) PATTERN_FILE = PATTERN_DIR / "default.yaml" +INGEST_INTERVAL = int(os.environ.get("TURNSTONE_INGEST_INTERVAL", "900")) +SUBMIT_ENDPOINT = os.environ.get("TURNSTONE_SUBMIT_ENDPOINT", "").rstrip("/") # GPU inference server URL. # Priority: GPU_SERVER_URL → CF_ORCH_URL (backward compat) → orch.circuitforge.tech (Paid+). @@ -111,8 +116,36 @@ async def _lifespan(app: FastAPI): if configs: _watcher.configure(configs) _watcher.start() + + sources_file = PATTERN_DIR / "sources.yaml" + _scheduler_task: asyncio.Task | None = None + if INGEST_INTERVAL > 0 and sources_file.exists(): + _scheduler_task = asyncio.create_task( + _scheduler_loop( + sources_file, DB_PATH, PATTERN_FILE, INGEST_INTERVAL, + submit_endpoint=SUBMIT_ENDPOINT or None, + source_host=SOURCE_HOST, + ), + name="ingest-scheduler", + ) + + _mqtt_task: asyncio.Task | None = None + if sources_file.exists(): + _mqtt_task = asyncio.create_task( + _run_mqtt_subscribers(sources_file, DB_PATH), + name="mqtt-subscribers", + ) + yield + _watcher.stop() + for task in (_scheduler_task, _mqtt_task): + if task: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass app = FastAPI(title="Turnstone API", version="0.5.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) @@ -459,6 +492,148 @@ async def ingest_upload( return {"source_id": sid, "ingested": total, "stats": stats} +class BatchEntry(BaseModel): + id: str + source_id: str + sequence: int + timestamp_raw: str | None = None + timestamp_iso: str | None = None + ingest_time: str + severity: str | None = None + repeat_count: int = 1 + out_of_order: int = 0 + matched_patterns: list[str] = [] + text: str + + +class BatchIngestRequest(BaseModel): + source_host: str = "unknown" + entries: list[BatchEntry] + + +@router.post("/api/ingest/batch") +def ingest_batch(payload: BatchIngestRequest, background_tasks: BackgroundTasks) -> dict: + """Accept pre-parsed log entries from a remote Turnstone instance (submission protocol). + + Used by nodes with TURNSTONE_SUBMIT_ENDPOINT configured to push their + pattern-matched entries to a central receiving instance. + """ + if not payload.entries: + return {"ingested": 0} + conn = sqlite3.connect(str(DB_PATH)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executemany( + """ + INSERT OR IGNORE INTO log_entries + (id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text) + VALUES (?,?,?,?,?,?,?,?,?,?,?) + """, + [ + ( + e.id, + f"{payload.source_host}/{e.source_id}", + e.sequence, + e.timestamp_raw, + e.timestamp_iso, + e.ingest_time, + e.severity, + e.repeat_count, + e.out_of_order, + json.dumps(e.matched_patterns), + e.text, + ) + for e in payload.entries + ], + ) + conn.commit() + conn.close() + background_tasks.add_task(build_fts_index, DB_PATH) + return {"ingested": len(payload.entries), "source_host": payload.source_host} + + +@router.get("/api/tasks/ingest/status") +def ingest_task_status() -> dict: + """Return the current state of the periodic batch ingest scheduler.""" + s = _ingest_state() + return { + "running": s.running, + "run_count": s.run_count, + "last_run_at": s.last_run_at, + "last_duration_s": s.last_duration_s, + "last_stats": s.last_stats, + "last_error": s.last_error, + "next_run_at": s.next_run_at, + "interval_s": INGEST_INTERVAL, + "scheduler_active": INGEST_INTERVAL > 0 and (PATTERN_DIR / "sources.yaml").exists(), + "submit_endpoint": SUBMIT_ENDPOINT or None, + "last_submitted_at": s.last_submitted_at, + "last_submit_count": s.last_submit_count, + "last_submit_error": s.last_submit_error, + } + + +@router.post("/api/tasks/ingest") +async def trigger_ingest() -> dict: + """Manually trigger a batch ingest of all configured sources. No-ops if already running.""" + sources_file = PATTERN_DIR / "sources.yaml" + if not sources_file.exists(): + raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first") + return await _run_ingest( + sources_file, DB_PATH, PATTERN_FILE, + submit_endpoint=SUBMIT_ENDPOINT or None, + source_host=SOURCE_HOST, + ) + + +@router.post("/api/ingest/wazuh/alert") +async def ingest_wazuh_alert( + alert: dict, + source_id: Annotated[str | None, Query(description="Source label (defaults to 'wazuh')")] = None, + background_tasks: BackgroundTasks = None, +) -> dict: + """Accept a single Wazuh alert JSON object pushed by a Wazuh custom integration. + + Configure in Wazuh: ossec.conf → custom-turnstone + pointing to a script that POSTs the alert JSON to this endpoint. + """ + if not _is_wazuh_alert(alert): + from fastapi import HTTPException + raise HTTPException(status_code=422, detail="Not a valid Wazuh alert object") + + sid = source_id or "wazuh" + ingest_time = now_iso() + compiled = load_compiled_patterns(PATTERN_FILE) + entries = list(_parse_wazuh(iter([json.dumps(alert)]), sid, compiled, ingest_time)) + if entries: + conn = sqlite3.connect(str(DB_PATH)) + conn.execute("PRAGMA journal_mode=WAL") + conn.executemany( + """ + INSERT OR IGNORE INTO log_entries + (id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text) + VALUES (?,?,?,?,?,?,?,?,?,?,?) + """, + [ + ( + e.entry_id, e.source_id, e.sequence, + e.timestamp_raw, e.timestamp_iso, e.ingest_time, + e.severity, e.repeat_count, int(e.out_of_order), + json.dumps(list(e.matched_patterns)), e.text, + ) + for e in entries + ], + ) + conn.commit() + conn.close() + if background_tasks is not None: + background_tasks.add_task(build_fts_index, DB_PATH) + return {"ingested": len(entries), "source_id": sid} + + @router.get("/api/watch/status") def watch_status() -> dict: return {"active": _watcher.is_active(), "sources": _watcher.status} diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tasks/ingest_scheduler.py b/app/tasks/ingest_scheduler.py new file mode 100644 index 0000000..b55b152 --- /dev/null +++ b/app/tasks/ingest_scheduler.py @@ -0,0 +1,184 @@ +"""Periodic batch ingest scheduler with optional CF submission. + +Runs ingest_sources on a configurable interval (TURNSTONE_INGEST_INTERVAL env var, +default 900s / 15 min). Set to 0 to disable. + +When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote +Turnstone instance (the CF receiving store) after each ingest run. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import sqlite3 +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +import httpx + +from app.ingest.pipeline import ingest_sources + +logger = logging.getLogger(__name__) + +_lock = asyncio.Lock() + + +@dataclass +class IngestState: + last_run_at: str | None = None + last_duration_s: float | None = None + last_stats: dict[str, int] = field(default_factory=dict) + last_error: str | None = None + run_count: int = 0 + next_run_at: str | None = None + running: bool = False + last_submitted_at: str | None = None + last_submit_count: int = 0 + last_submit_error: str | None = None + + +_state = IngestState() + + +def get_state() -> IngestState: + return _state + + +def _query_matched_since(db_path: Path, since: str | None) -> list[dict]: + """Return entries with non-empty matched_patterns, optionally filtered by ingest_time.""" + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + if since: + rows = conn.execute( + """ + SELECT id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text + FROM log_entries + WHERE matched_patterns != '[]' AND ingest_time > ? + ORDER BY ingest_time + LIMIT 5000 + """, + (since,), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT id, source_id, sequence, timestamp_raw, timestamp_iso, + ingest_time, severity, repeat_count, out_of_order, + matched_patterns, text + FROM log_entries + WHERE matched_patterns != '[]' + ORDER BY ingest_time DESC + LIMIT 5000 + """, + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +async def submit_matched( + db_path: Path, + submit_endpoint: str, + source_host: str, + since: str | None = None, +) -> dict[str, Any]: + """Push pattern-matched entries to the remote CF receiving instance.""" + loop = asyncio.get_running_loop() + entries = await loop.run_in_executor( + None, lambda: _query_matched_since(db_path, since) + ) + if not entries: + return {"ok": True, "submitted": 0, "skipped": True} + + url = f"{submit_endpoint.rstrip('/')}/turnstone/api/ingest/batch" + payload = {"source_host": source_host, "entries": entries} + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(url, json=payload) + resp.raise_for_status() + result = resp.json() + submitted = result.get("ingested", len(entries)) + _state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat() + _state.last_submit_count = submitted + _state.last_submit_error = None + logger.info("Submitted %d matched entries to %s", submitted, submit_endpoint) + return {"ok": True, "submitted": submitted} + except Exception as exc: + _state.last_submit_error = str(exc) + logger.warning("Submission to %s failed: %s", submit_endpoint, exc) + return {"ok": False, "error": str(exc)} + + +async def run_once( + sources_file: Path, + db_path: Path, + pattern_file: Path | None = None, + submit_endpoint: str | None = None, + source_host: str = "unknown", +) -> dict[str, Any]: + """Ingest all sources once, then submit matched entries if configured.""" + if _lock.locked(): + return {"ok": False, "error": "ingest already running", "skipped": True} + + async with _lock: + _state.running = True + started = datetime.now(tz=timezone.utc) + try: + loop = asyncio.get_running_loop() + stats: dict[str, int] = await loop.run_in_executor( + None, + lambda: ingest_sources(sources_file, db_path, pattern_file), + ) + duration = (datetime.now(tz=timezone.utc) - started).total_seconds() + _state.last_run_at = started.isoformat() + _state.last_duration_s = round(duration, 2) + _state.last_stats = stats + _state.last_error = None + _state.run_count += 1 + logger.info("Batch ingest complete in %.1fs — %s", duration, stats) + except Exception as exc: + duration = (datetime.now(tz=timezone.utc) - started).total_seconds() + _state.last_run_at = started.isoformat() + _state.last_duration_s = round(duration, 2) + _state.last_error = str(exc) + _state.run_count += 1 + logger.error("Batch ingest failed: %s", exc) + _state.running = False + return {"ok": False, "error": str(exc)} + finally: + _state.running = False + + if submit_endpoint: + await submit_matched(db_path, submit_endpoint, source_host, since=_state.last_submitted_at) + + return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s} + + +async def scheduler_loop( + sources_file: Path, + db_path: Path, + pattern_file: Path | None, + interval_s: int, + submit_endpoint: str | None = None, + source_host: str = "unknown", +) -> None: + """Run ingest + optional submission every interval_s seconds until cancelled.""" + logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file) + if submit_endpoint: + logger.info("Submission enabled — endpoint: %s", submit_endpoint) + while True: + await run_once(sources_file, db_path, pattern_file, submit_endpoint, source_host) + next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s) + _state.next_run_at = next_run.isoformat() + try: + await asyncio.sleep(interval_s) + except asyncio.CancelledError: + logger.info("Ingest scheduler cancelled") + _state.next_run_at = None + raise diff --git a/docker-compose.submissions.yml b/docker-compose.submissions.yml new file mode 100644 index 0000000..2c5702b --- /dev/null +++ b/docker-compose.submissions.yml @@ -0,0 +1,74 @@ +# Turnstone — CF receiving instances for external node submissions. +# +# These are SEPARATE instances from the main Turnstone deployment. Each node +# that has TURNSTONE_SUBMIT_ENDPOINT configured pushes pattern-matched entries +# here. Each instance has its own isolated database. Avocet reads these +# databases for training data. +# +# Ports: +# 8536 → submissions-daniel (harvest.circuitforge.tech/daniel/*) +# 8537 → submissions-xander (harvest.circuitforge.tech/xander/*) +# +# Deploy on Heimdall: +# docker compose -f docker-compose.submissions.yml up -d +# +# Database locations: +# /devl/docker/turnstone-submissions/daniel/turnstone.db +# /devl/docker/turnstone-submissions/xander/turnstone.db +# +# These instances have TURNSTONE_INGEST_INTERVAL=0 — they only receive POSTs, +# they do not run their own scheduled ingest. + +services: + submissions-daniel: + image: turnstone:latest + container_name: turnstone-submissions-daniel + restart: unless-stopped + ports: + - "8536:8534" + volumes: + - /devl/docker/turnstone-submissions/daniel:/data:z + - /devl/docker/turnstone-submissions/daniel/patterns:/patterns:ro + environment: + TURNSTONE_DB: /data/turnstone.db + TURNSTONE_PATTERNS: /patterns + TURNSTONE_SOURCE_HOST: submissions-daniel + TURNSTONE_INGEST_INTERVAL: "0" + PYTHONUNBUFFERED: "1" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8534/turnstone/health"] + interval: 30s + timeout: 10s + start_period: 20s + retries: 3 + networks: + - caddy-internal + + submissions-xander: + image: turnstone:latest + container_name: turnstone-submissions-xander + restart: unless-stopped + ports: + - "8537:8534" + volumes: + - /devl/docker/turnstone-submissions/xander:/data:z + - /devl/docker/turnstone-submissions/xander/patterns:/patterns:ro + environment: + TURNSTONE_DB: /data/turnstone.db + TURNSTONE_PATTERNS: /patterns + TURNSTONE_SOURCE_HOST: submissions-xander + TURNSTONE_INGEST_INTERVAL: "0" + PYTHONUNBUFFERED: "1" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8534/turnstone/health"] + interval: 30s + timeout: 10s + start_period: 20s + retries: 3 + networks: + - caddy-internal + +networks: + caddy-internal: + name: caddy-proxy_caddy-internal + external: true diff --git a/harvester/sources.example.yaml b/harvester/sources.example.yaml index 4780e3b..508e257 100644 --- a/harvester/sources.example.yaml +++ b/harvester/sources.example.yaml @@ -42,3 +42,10 @@ sources: # Jellyfin # - id: jellyfin # path: /opt/jellyfin/log/jellyfin.log + + # Wazuh SIEM — alerts.json on the Wazuh manager + # Turnstone auto-detects this format; source_id is qualified per agent automatically. + # For push-based ingestion from Wazuh custom integrations, use: + # POST /api/ingest/wazuh/alert (single alert JSON body) + # - id: wazuh + # path: /var/ossec/logs/alerts/alerts.json diff --git a/patterns/sources.yaml b/patterns/sources.yaml index 2676a38..49b89d7 100644 --- a/patterns/sources.yaml +++ b/patterns/sources.yaml @@ -70,3 +70,27 @@ sources: - id: jellyseerr path: /opt/jellyseerr/config/logs/jellyseerr.log + + # ── MQTT / IoT (live — subscribe mode, no path needed) ─────────────────── + # Requires: pip install circuitforge-core[mqtt] + # These sources are handled by the live MQTT subscriber task (not batch ingest). + # Uncomment and configure to enable. + # + # Meshtastic MQTT bridge (node must have MQTT uplink enabled): + # - id: meshtastic-home + # type: mqtt + # broker_host: 10.1.10.5 # IP of your local MQTT broker (e.g. Mosquitto on Huginn) + # broker_port: 1883 + # topics: + # - msh/# # all Meshtastic regions; use msh/us-east/# to narrow + # + # Generic IoT sensors: + # - id: iot-home + # type: mqtt + # broker_host: localhost + # broker_port: 1883 + # topics: + # - home/+/temperature + # - home/+/humidity + # - home/+/motion + # severity: INFO diff --git a/podman-standalone.sh b/podman-standalone.sh index 620f0c1..f937890 100755 --- a/podman-standalone.sh +++ b/podman-standalone.sh @@ -71,6 +71,14 @@ TZ=America/Los_Angeles # export TURNSTONE_BUNDLE_ENDPOINT=https://turnstone.circuitforge.tech/turnstone/api/bundles # bash /opt/turnstone/podman-standalone.sh # +# ── Orchard submission (opt-in telemetry) ──────────────────────────────────── +# Set TURNSTONE_SUBMIT_ENDPOINT to push pattern-matched log entries to a CF +# receiving instance after each ingest run. Only matched entries are sent — +# no raw log content. Used to build Avocet training data. +# +# export TURNSTONE_SUBMIT_ENDPOINT=https://harvest.circuitforge.tech/xander +# bash /opt/turnstone/podman-standalone.sh +# # TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed. # ── Turnstone container ─────────────────────────────────────────────────────── @@ -102,6 +110,7 @@ podman run -d \ -e TURNSTONE_DB=/data/turnstone.db \ -e TURNSTONE_SOURCE_HOST="$(hostname)" \ -e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \ + -e TURNSTONE_SUBMIT_ENDPOINT="${TURNSTONE_SUBMIT_ENDPOINT:-}" \ -e PYTHONUNBUFFERED=1 \ -e TZ="${TZ}" \ --health-cmd="curl -f http://localhost:8534/turnstone/health || exit 1" \ diff --git a/tests/test_ingest_wazuh.py b/tests/test_ingest_wazuh.py new file mode 100644 index 0000000..d218ebd --- /dev/null +++ b/tests/test_ingest_wazuh.py @@ -0,0 +1,118 @@ +"""Tests for the Wazuh alert ingestor.""" +from __future__ import annotations + +import json +from datetime import datetime + +from app.ingest.wazuh import is_wazuh_alert, parse +from app.ingest.pipeline import _detect_format + +_ALERT = { + "timestamp": "2024-01-15T10:23:45.123+0000", + "rule": { + "level": 7, + "description": "SSH authentication failure.", + "id": "5710", + "firedtimes": 1, + "groups": ["syslog", "sshd", "authentication_failed"], + }, + "agent": {"id": "001", "name": "web-server-01", "ip": "192.168.1.100"}, + "manager": {"name": "wazuh-mgr"}, + "id": "1705312125.123456", + "full_log": "Jan 15 10:23:45 web-server-01 sshd[1234]: Failed password for admin from 10.0.0.5", + "location": "/var/log/auth.log", + "data": {"srcip": "10.0.0.5", "srcuser": "admin"}, +} + +_CRITICAL_ALERT = { + "timestamp": "2024-01-15T10:30:00.000+0000", + "rule": {"level": 13, "description": "Rootkit detected.", "id": "510", "groups": ["rootcheck"]}, + "agent": {"id": "002", "name": "db-host", "ip": "192.168.1.200"}, + "manager": {"name": "wazuh-mgr"}, + "full_log": "rootkit patterns found", + "location": "/var/ossec/logs/active-responses.log", +} + + +class TestDetector: + def test_detects_valid_alert(self): + assert is_wazuh_alert(_ALERT) + + def test_detects_minimal_alert(self): + assert is_wazuh_alert({ + "timestamp": "2024-01-15T10:23:45+0000", + "rule": {"level": 5, "description": "test"}, + "agent": {"name": "host"}, + }) + + def test_rejects_journald(self): + assert not is_wazuh_alert({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"}) + + def test_rejects_caddy(self): + assert not is_wazuh_alert({"ts": 1234, "msg": "served", "request": {}}) + + def test_rejects_no_agent(self): + assert not is_wazuh_alert({"rule": {"level": 5}, "timestamp": "2024-01-01T00:00:00Z"}) + + def test_pipeline_routes_to_wazuh(self): + assert _detect_format(json.dumps(_ALERT)) == "wazuh" + + +class TestParser: + def _parse(self, *alerts) -> list: + lines = [json.dumps(a) for a in alerts] + return list(parse(iter(lines), "wazuh", [])) + + def test_single_entry_parsed(self): + entries = self._parse(_ALERT) + assert len(entries) == 1 + + def test_severity_from_level(self): + entries = self._parse(_ALERT) + assert entries[0].severity == "WARN" # level 7 + + def test_critical_severity(self): + entries = self._parse(_CRITICAL_ALERT) + assert entries[0].severity == "CRITICAL" # level 13 + + def test_source_id_includes_agent(self): + entries = self._parse(_ALERT) + assert entries[0].source_id == "wazuh:web-server-01" + + def test_text_contains_rule_description(self): + entries = self._parse(_ALERT) + assert "SSH authentication failure" in entries[0].text + + def test_text_contains_agent_name(self): + entries = self._parse(_ALERT) + assert "web-server-01" in entries[0].text + + def test_text_contains_decoded_data(self): + entries = self._parse(_ALERT) + assert "10.0.0.5" in entries[0].text + + def test_text_contains_full_log(self): + entries = self._parse(_ALERT) + assert "Failed password" in entries[0].text + + def test_timestamp_parsed_to_utc(self): + entries = self._parse(_ALERT) + dt = datetime.fromisoformat(entries[0].timestamp_iso) + assert dt.utcoffset() is not None + assert dt.hour == 10 and dt.minute == 23 and dt.second == 45 + + def test_skips_malformed_json(self): + lines = iter(["not json\n", json.dumps(_ALERT)]) + entries = list(parse(lines, "wazuh", [])) + assert len(entries) == 1 + + def test_skips_empty_lines(self): + lines = iter(["\n", " \n", json.dumps(_ALERT)]) + entries = list(parse(lines, "wazuh", [])) + assert len(entries) == 1 + + def test_multi_alert_sequence(self): + entries = self._parse(_ALERT, _CRITICAL_ALERT) + assert len(entries) == 2 + seqs = [e.sequence for e in entries] + assert seqs == sorted(seqs)