feat: periodic ingest scheduler + Orchard submission pipeline

Adds asyncio-native background scheduler (TURNSTONE_INGEST_INTERVAL,
default 900s) that runs batch ingest then pushes pattern-matched entries
to a remote CF harvest endpoint (TURNSTONE_SUBMIT_ENDPOINT).

- app/tasks/ingest_scheduler.py: IngestState, scheduler_loop, run_once,
  submit_matched, _query_matched_since — asyncio.Lock prevents concurrent runs
- app/rest.py: POST /api/ingest/batch (pre-parsed entry receiver),
  GET /api/tasks/ingest/status, POST /api/tasks/ingest (manual trigger),
  TURNSTONE_INGEST_INTERVAL + TURNSTONE_SUBMIT_ENDPOINT env wiring in lifespan
- docker-compose.submissions.yml: segregated daniel (8536) + xander (8537)
  receiving instances on Heimdall, isolated DBs under
  /devl/docker/turnstone-submissions/<node>/
- podman-standalone.sh: pass-through for TURNSTONE_SUBMIT_ENDPOINT +
  TURNSTONE_SOURCE_HOST
- app/ingest/mqtt_subscriber.py: MQTT log source adapter
- app/ingest/wazuh.py: Wazuh alert JSON adapter
- tests/test_ingest_wazuh.py: Wazuh adapter test suite
This commit is contained in:
pyr0ball 2026-05-20 08:57:25 -07:00
parent 7c7bc86d00
commit 0f86d35062
13 changed files with 929 additions and 2 deletions

View file

@ -22,3 +22,7 @@
# --- Bundle endpoint (optional) --- # --- Bundle endpoint (optional) ---
# Remote endpoint to push diagnostic bundles for escalation. # Remote endpoint to push diagnostic bundles for escalation.
# TURNSTONE_BUNDLE_ENDPOINT=https://example.com/api/bundles # TURNSTONE_BUNDLE_ENDPOINT=https://example.com/api/bundles
# --- Periodic batch ingest ---
# Seconds between automatic ingest runs from sources.yaml. Set to 0 to disable.
# TURNSTONE_INGEST_INTERVAL=900

View file

@ -157,6 +157,7 @@ Copy `.env.example` to `.env` (or pass as `-e` flags to Docker/Podman). All vari
| `TURNSTONE_PATTERNS` | `./patterns` | Pattern directory (default.yaml, sources.yaml, watch.yaml). | | `TURNSTONE_PATTERNS` | `./patterns` | Pattern directory (default.yaml, sources.yaml, watch.yaml). |
| `TURNSTONE_SOURCE_HOST` | `unknown` | Host identifier stamped on ingested entries. | | `TURNSTONE_SOURCE_HOST` | `unknown` | Host identifier stamped on ingested entries. |
| `TURNSTONE_BUNDLE_ENDPOINT` | — | Remote URL to push diagnostic bundles for escalation. | | `TURNSTONE_BUNDLE_ENDPOINT` | — | Remote URL to push diagnostic bundles for escalation. |
| `TURNSTONE_INGEST_INTERVAL` | `900` | Seconds between automatic batch ingest runs. Set to `0` to disable. |
--- ---

View file

@ -0,0 +1,166 @@
"""Live MQTT ingest subscriber for Turnstone.
Reads ``type: mqtt`` entries from sources.yaml and subscribes to each broker
in the background. Incoming messages are normalized to RetrievedEntry and
written to the Turnstone SQLite database as they arrive.
This runs as an asyncio task alongside the batch ingest scheduler. It is
started from the FastAPI lifespan in rest.py.
MQTT source config format in sources.yaml::
sources:
- id: meshtastic-home
type: mqtt
broker_host: 10.1.10.5
broker_port: 1883 # optional, default 1883
broker_username: ~ # optional
broker_password: ~ # optional
topics:
- msh/# # one or more topic patterns
severity: INFO # optional default severity for all messages
- id: iot-sensors
type: mqtt
broker_host: localhost
topics:
- home/+/temperature
- home/+/humidity
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
import yaml
from app.services.models import RetrievedEntry
logger = logging.getLogger(__name__)
def _load_mqtt_sources(sources_file: Path) -> list[dict]:
"""Return only the ``type: mqtt`` entries from sources.yaml."""
if not sources_file.exists():
return []
with sources_file.open() as f:
data = yaml.safe_load(f) or {}
return [s for s in data.get("sources", []) if s.get("type") == "mqtt"]
def _make_entry_id(source_id: str, seq: int, text: str) -> str:
h = hashlib.sha1(f"{source_id}:{seq}:{text}".encode()).hexdigest()[:16]
return f"{source_id}:{seq}:{h}"
def _write_entry(db_path: Path, entry: RetrievedEntry) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry.entry_id,
entry.source_id,
entry.sequence,
entry.timestamp_raw,
entry.timestamp_iso,
entry.ingest_time,
entry.severity,
entry.repeat_count,
1 if entry.out_of_order else 0,
json.dumps(entry.matched_patterns),
entry.text,
),
)
async def _run_source_subscriber(source: dict, db_path: Path) -> None:
"""Maintain a subscription to one MQTT source, reconnecting on error."""
try:
from circuitforge_core.mqtt import MQTTClient, MQTTConfig
except ImportError:
logger.error(
"circuitforge-core[mqtt] is not installed — MQTT source %r skipped. "
"Run: pip install circuitforge-core[mqtt]",
source.get("id"),
)
return
source_id: str = source["id"]
host: str = source["broker_host"]
port: int = int(source.get("broker_port", 1883))
username: str | None = source.get("broker_username") or source.get("username")
password: str | None = source.get("broker_password") or source.get("password")
topics: list[str] = source.get("topics", ["#"])
default_severity: str = source.get("severity", "INFO")
cfg = MQTTConfig(
host=host,
port=port,
username=username,
password=password,
client_id=f"turnstone-{source_id}",
)
client = MQTTClient(cfg)
seq = 0
for topic in topics:
@client.on(topic)
async def _handle(msg, _src=source_id, _sev=default_severity):
nonlocal seq
seq += 1
now = datetime.now(tz=timezone.utc).isoformat()
text = msg.text()
entry = RetrievedEntry(
entry_id=_make_entry_id(_src, seq, text),
source_id=_src,
sequence=seq,
timestamp_raw=now,
timestamp_iso=now,
ingest_time=now,
severity=_sev,
repeat_count=1,
out_of_order=False,
matched_patterns=[],
text=f"[{msg.topic}] {text}",
)
_write_entry(db_path, entry)
logger.debug("MQTT[%s] %s: %s", _src, msg.topic, text[:120])
logger.info("MQTT subscriber starting: %s%s:%d topics=%s", source_id, host, port, topics)
await client.run()
async def run_mqtt_subscribers(sources_file: Path, db_path: Path) -> None:
"""Start one subscriber task per MQTT source. Runs until cancelled."""
sources = _load_mqtt_sources(sources_file)
if not sources:
logger.debug("No MQTT sources configured in %s", sources_file)
return
logger.info("Starting %d MQTT subscriber(s)", len(sources))
tasks = [
asyncio.create_task(
_run_source_subscriber(src, db_path),
name=f"mqtt-{src.get('id', i)}",
)
for i, src in enumerate(sources)
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise

View file

@ -10,7 +10,7 @@ from typing import Iterator
import yaml import yaml
from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog, wazuh
from app.ingest.base import _compile, load_patterns, now_iso from app.ingest.base import _compile, load_patterns, now_iso
from app.services.models import LogPattern, RetrievedEntry from app.services.models import LogPattern, RetrievedEntry
from app.services.search import build_fts_index from app.services.search import build_fts_index
@ -137,6 +137,8 @@ def _detect_format(first_line: str) -> str:
return "journald" return "journald"
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"): if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
return "docker" return "docker"
if wazuh.is_wazuh_alert(obj):
return "wazuh"
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj): if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
return "caddy" return "caddy"
except (json.JSONDecodeError, AttributeError): except (json.JSONDecodeError, AttributeError):
@ -178,6 +180,8 @@ def _parse_file(
if fmt == "journald": if fmt == "journald":
yield from journald.parse(all_lines(), source_id, compiled, ingest_time) yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "wazuh":
yield from wazuh.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "docker": elif fmt == "docker":
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
elif fmt == "caddy": elif fmt == "caddy":

161
app/ingest/wazuh.py Normal file
View file

@ -0,0 +1,161 @@
"""Wazuh SIEM alert parser.
Handles Wazuh's alerts.json format (JSON Lines — one alert object per line):
/var/ossec/logs/alerts/alerts.json (on the Wazuh manager)
Each line is a complete JSON object. Key fields used:
timestamp ISO 8601 with timezone offset ("2024-01-15T10:23:45.123+0000")
rule.level 1-15 (maps to Turnstone severity)
rule.id Wazuh rule ID
rule.description human-readable rule description (primary message text)
rule.groups list of category tags
agent.name hostname that generated the original event
agent.ip agent IP address
full_log original raw log line that triggered the alert
location log file or input that was monitored
data dict of decoded fields (srcip, dstip, url, etc.)
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Iterator
from app.ingest.base import (
SourceState, apply_patterns, make_entry_id, now_iso,
)
from app.services.models import LogPattern, RetrievedEntry
# Wazuh rule levels 1-15 → Turnstone severity labels.
# Levels < 4 are normally informational, 7+ begin to matter operationally,
# 10+ correspond to SIEM-worthy events, 13+ are critical.
_LEVEL_SEVERITY: dict[int, str] = {
1: "DEBUG", 2: "DEBUG", 3: "DEBUG",
4: "INFO", 5: "INFO", 6: "NOTICE",
7: "WARN", 8: "WARN", 9: "WARN",
10: "ERROR", 11: "ERROR", 12: "ERROR",
13: "CRITICAL", 14: "CRITICAL", 15: "CRITICAL",
}
def is_wazuh_alert(obj: dict) -> bool:
"""Return True if a parsed JSON object looks like a Wazuh alert."""
return (
isinstance(obj.get("rule"), dict)
and isinstance(obj.get("agent"), dict)
and ("timestamp" in obj or "manager" in obj)
)
def _parse_timestamp(raw: str) -> str:
"""Convert Wazuh's ISO 8601 timestamp to UTC ISO 8601."""
if not raw:
return ""
for fmt in (
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
):
try:
dt = datetime.strptime(raw, fmt)
return dt.astimezone(timezone.utc).isoformat()
except ValueError:
continue
return raw
def _build_text(alert: dict) -> str:
"""Compose a readable, searchable text representation of the alert."""
rule = alert.get("rule", {})
agent = alert.get("agent", {})
agent_name = agent.get("name", "unknown")
agent_ip = agent.get("ip", "")
rule_id = rule.get("id", "")
rule_desc = rule.get("description", "(no description)")
groups = rule.get("groups", [])
location = alert.get("location", "")
full_log = alert.get("full_log", "")
parts: list[str] = []
# Header line: agent + rule context
agent_tag = f"{agent_name}/{agent_ip}" if agent_ip else agent_name
group_tag = ",".join(groups) if groups else ""
header = f"[wazuh][agent:{agent_tag}][rule:{rule_id}]"
if group_tag:
header += f"[{group_tag}]"
parts.append(f"{header} {rule_desc}")
if location:
parts.append(f"location: {location}")
# Extra decoded fields (srcip, dstip, url, user, etc.)
data = alert.get("data", {})
if isinstance(data, dict) and data:
kv = " | ".join(f"{k}={v}" for k, v in sorted(data.items()) if v)
if kv:
parts.append(kv)
if full_log and full_log.strip() != rule_desc.strip():
parts.append(f"raw: {full_log.strip()}")
return "\n".join(parts)
def parse(
lines: Iterator[str],
source_id: str,
compiled_patterns: list[tuple[LogPattern, object]],
ingest_time: str | None = None,
) -> Iterator[RetrievedEntry]:
ingest_time = ingest_time or now_iso()
state = SourceState()
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
alert = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(alert, dict):
continue
rule = alert.get("rule", {})
agent = alert.get("agent", {})
ts_raw = alert.get("timestamp", "")
ts_iso = _parse_timestamp(ts_raw)
level = int(rule.get("level", 0))
severity = _LEVEL_SEVERITY.get(level, "INFO")
# Qualify source_id by agent so logs from different hosts stay separate.
agent_name = agent.get("name", "")
src = f"{source_id}:{agent_name}" if agent_name else source_id
text = _build_text(alert)
if not text:
continue
repeat, out_of_order = state.observe(text, ts_iso)
matched = apply_patterns(text, compiled_patterns)
yield RetrievedEntry(
entry_id=make_entry_id(src, state.sequence, text),
source_id=src,
sequence=state.sequence,
timestamp_raw=ts_raw,
timestamp_iso=ts_iso,
ingest_time=ingest_time,
severity=severity,
repeat_count=repeat,
out_of_order=out_of_order,
matched_patterns=matched,
text=text,
)

View file

@ -28,8 +28,9 @@ from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
from app.ingest.pipeline import ensure_schema, ingest_file as _ingest_file from app.ingest.pipeline import ensure_schema, ingest_file as _ingest_file
from app.ingest.base import load_compiled_patterns from app.ingest.base import load_compiled_patterns, now_iso
from app.ingest.tautulli import parse_webhook as _parse_tautulli from app.ingest.tautulli import parse_webhook as _parse_tautulli
from app.ingest.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
from app.services.blocklist import ( from app.services.blocklist import (
BlocklistCandidate, BlocklistCandidate,
get_candidate, get_candidate,
@ -73,6 +74,8 @@ from app.context.retriever import retrieve_context as _retrieve_context, format_
from app.ingest.doc_upload import ingest_upload as _ingest_upload from app.ingest.doc_upload import ingest_upload as _ingest_upload
from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session from app.context.wizard import get_schema as _wizard_schema, advance_step, is_complete, apply_session
from app.context.chunker import UnsupportedDocType, FileTooLarge from app.context.chunker import UnsupportedDocType, FileTooLarge
from app.tasks.ingest_scheduler import get_state as _ingest_state, run_once as _run_ingest, scheduler_loop as _scheduler_loop, submit_matched as _submit_matched
from app.ingest.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
PREFS_PATH = DB_PATH.parent / "preferences.json" PREFS_PATH = DB_PATH.parent / "preferences.json"
@ -81,6 +84,8 @@ SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "") BUNDLE_ENDPOINT = os.environ.get("TURNSTONE_BUNDLE_ENDPOINT", "")
PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns")) PATTERN_DIR = Path(os.environ.get("TURNSTONE_PATTERNS", Path(__file__).parent.parent / "patterns"))
PATTERN_FILE = PATTERN_DIR / "default.yaml" PATTERN_FILE = PATTERN_DIR / "default.yaml"
INGEST_INTERVAL = int(os.environ.get("TURNSTONE_INGEST_INTERVAL", "900"))
SUBMIT_ENDPOINT = os.environ.get("TURNSTONE_SUBMIT_ENDPOINT", "").rstrip("/")
# GPU inference server URL. # GPU inference server URL.
# Priority: GPU_SERVER_URL → CF_ORCH_URL (backward compat) → orch.circuitforge.tech (Paid+). # Priority: GPU_SERVER_URL → CF_ORCH_URL (backward compat) → orch.circuitforge.tech (Paid+).
@ -111,8 +116,36 @@ async def _lifespan(app: FastAPI):
if configs: if configs:
_watcher.configure(configs) _watcher.configure(configs)
_watcher.start() _watcher.start()
sources_file = PATTERN_DIR / "sources.yaml"
_scheduler_task: asyncio.Task | None = None
if INGEST_INTERVAL > 0 and sources_file.exists():
_scheduler_task = asyncio.create_task(
_scheduler_loop(
sources_file, DB_PATH, PATTERN_FILE, INGEST_INTERVAL,
submit_endpoint=SUBMIT_ENDPOINT or None,
source_host=SOURCE_HOST,
),
name="ingest-scheduler",
)
_mqtt_task: asyncio.Task | None = None
if sources_file.exists():
_mqtt_task = asyncio.create_task(
_run_mqtt_subscribers(sources_file, DB_PATH),
name="mqtt-subscribers",
)
yield yield
_watcher.stop() _watcher.stop()
for task in (_scheduler_task, _mqtt_task):
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(title="Turnstone API", version="0.5.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan) app = FastAPI(title="Turnstone API", version="0.5.0", docs_url="/turnstone/docs", redoc_url=None, lifespan=_lifespan)
@ -459,6 +492,148 @@ async def ingest_upload(
return {"source_id": sid, "ingested": total, "stats": stats} return {"source_id": sid, "ingested": total, "stats": stats}
class BatchEntry(BaseModel):
id: str
source_id: str
sequence: int
timestamp_raw: str | None = None
timestamp_iso: str | None = None
ingest_time: str
severity: str | None = None
repeat_count: int = 1
out_of_order: int = 0
matched_patterns: list[str] = []
text: str
class BatchIngestRequest(BaseModel):
source_host: str = "unknown"
entries: list[BatchEntry]
@router.post("/api/ingest/batch")
def ingest_batch(payload: BatchIngestRequest, background_tasks: BackgroundTasks) -> dict:
"""Accept pre-parsed log entries from a remote Turnstone instance (submission protocol).
Used by nodes with TURNSTONE_SUBMIT_ENDPOINT configured to push their
pattern-matched entries to a central receiving instance.
"""
if not payload.entries:
return {"ingested": 0}
conn = sqlite3.connect(str(DB_PATH))
conn.execute("PRAGMA journal_mode=WAL")
conn.executemany(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
[
(
e.id,
f"{payload.source_host}/{e.source_id}",
e.sequence,
e.timestamp_raw,
e.timestamp_iso,
e.ingest_time,
e.severity,
e.repeat_count,
e.out_of_order,
json.dumps(e.matched_patterns),
e.text,
)
for e in payload.entries
],
)
conn.commit()
conn.close()
background_tasks.add_task(build_fts_index, DB_PATH)
return {"ingested": len(payload.entries), "source_host": payload.source_host}
@router.get("/api/tasks/ingest/status")
def ingest_task_status() -> dict:
"""Return the current state of the periodic batch ingest scheduler."""
s = _ingest_state()
return {
"running": s.running,
"run_count": s.run_count,
"last_run_at": s.last_run_at,
"last_duration_s": s.last_duration_s,
"last_stats": s.last_stats,
"last_error": s.last_error,
"next_run_at": s.next_run_at,
"interval_s": INGEST_INTERVAL,
"scheduler_active": INGEST_INTERVAL > 0 and (PATTERN_DIR / "sources.yaml").exists(),
"submit_endpoint": SUBMIT_ENDPOINT or None,
"last_submitted_at": s.last_submitted_at,
"last_submit_count": s.last_submit_count,
"last_submit_error": s.last_submit_error,
}
@router.post("/api/tasks/ingest")
async def trigger_ingest() -> dict:
"""Manually trigger a batch ingest of all configured sources. No-ops if already running."""
sources_file = PATTERN_DIR / "sources.yaml"
if not sources_file.exists():
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
return await _run_ingest(
sources_file, DB_PATH, PATTERN_FILE,
submit_endpoint=SUBMIT_ENDPOINT or None,
source_host=SOURCE_HOST,
)
@router.post("/api/ingest/wazuh/alert")
async def ingest_wazuh_alert(
alert: dict,
source_id: Annotated[str | None, Query(description="Source label (defaults to 'wazuh')")] = None,
background_tasks: BackgroundTasks = None,
) -> dict:
"""Accept a single Wazuh alert JSON object pushed by a Wazuh custom integration.
Configure in Wazuh: ossec.conf <integration><name>custom-turnstone</name>
pointing to a script that POSTs the alert JSON to this endpoint.
"""
if not _is_wazuh_alert(alert):
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="Not a valid Wazuh alert object")
sid = source_id or "wazuh"
ingest_time = now_iso()
compiled = load_compiled_patterns(PATTERN_FILE)
entries = list(_parse_wazuh(iter([json.dumps(alert)]), sid, compiled, ingest_time))
if entries:
conn = sqlite3.connect(str(DB_PATH))
conn.execute("PRAGMA journal_mode=WAL")
conn.executemany(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""",
[
(
e.entry_id, e.source_id, e.sequence,
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
e.severity, e.repeat_count, int(e.out_of_order),
json.dumps(list(e.matched_patterns)), e.text,
)
for e in entries
],
)
conn.commit()
conn.close()
if background_tasks is not None:
background_tasks.add_task(build_fts_index, DB_PATH)
return {"ingested": len(entries), "source_id": sid}
@router.get("/api/watch/status") @router.get("/api/watch/status")
def watch_status() -> dict: def watch_status() -> dict:
return {"active": _watcher.is_active(), "sources": _watcher.status} return {"active": _watcher.is_active(), "sources": _watcher.status}

0
app/tasks/__init__.py Normal file
View file

View file

@ -0,0 +1,184 @@
"""Periodic batch ingest scheduler with optional CF submission.
Runs ingest_sources on a configurable interval (TURNSTONE_INGEST_INTERVAL env var,
default 900s / 15 min). Set to 0 to disable.
When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote
Turnstone instance (the CF receiving store) after each ingest run.
"""
from __future__ import annotations
import asyncio
import json
import logging
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import httpx
from app.ingest.pipeline import ingest_sources
logger = logging.getLogger(__name__)
_lock = asyncio.Lock()
@dataclass
class IngestState:
last_run_at: str | None = None
last_duration_s: float | None = None
last_stats: dict[str, int] = field(default_factory=dict)
last_error: str | None = None
run_count: int = 0
next_run_at: str | None = None
running: bool = False
last_submitted_at: str | None = None
last_submit_count: int = 0
last_submit_error: str | None = None
_state = IngestState()
def get_state() -> IngestState:
return _state
def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
"""Return entries with non-empty matched_patterns, optionally filtered by ingest_time."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
if since:
rows = conn.execute(
"""
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text
FROM log_entries
WHERE matched_patterns != '[]' AND ingest_time > ?
ORDER BY ingest_time
LIMIT 5000
""",
(since,),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text
FROM log_entries
WHERE matched_patterns != '[]'
ORDER BY ingest_time DESC
LIMIT 5000
""",
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
async def submit_matched(
db_path: Path,
submit_endpoint: str,
source_host: str,
since: str | None = None,
) -> dict[str, Any]:
"""Push pattern-matched entries to the remote CF receiving instance."""
loop = asyncio.get_running_loop()
entries = await loop.run_in_executor(
None, lambda: _query_matched_since(db_path, since)
)
if not entries:
return {"ok": True, "submitted": 0, "skipped": True}
url = f"{submit_endpoint.rstrip('/')}/turnstone/api/ingest/batch"
payload = {"source_host": source_host, "entries": entries}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
result = resp.json()
submitted = result.get("ingested", len(entries))
_state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat()
_state.last_submit_count = submitted
_state.last_submit_error = None
logger.info("Submitted %d matched entries to %s", submitted, submit_endpoint)
return {"ok": True, "submitted": submitted}
except Exception as exc:
_state.last_submit_error = str(exc)
logger.warning("Submission to %s failed: %s", submit_endpoint, exc)
return {"ok": False, "error": str(exc)}
async def run_once(
sources_file: Path,
db_path: Path,
pattern_file: Path | None = None,
submit_endpoint: str | None = None,
source_host: str = "unknown",
) -> dict[str, Any]:
"""Ingest all sources once, then submit matched entries if configured."""
if _lock.locked():
return {"ok": False, "error": "ingest already running", "skipped": True}
async with _lock:
_state.running = True
started = datetime.now(tz=timezone.utc)
try:
loop = asyncio.get_running_loop()
stats: dict[str, int] = await loop.run_in_executor(
None,
lambda: ingest_sources(sources_file, db_path, pattern_file),
)
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()
_state.last_duration_s = round(duration, 2)
_state.last_stats = stats
_state.last_error = None
_state.run_count += 1
logger.info("Batch ingest complete in %.1fs — %s", duration, stats)
except Exception as exc:
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()
_state.last_duration_s = round(duration, 2)
_state.last_error = str(exc)
_state.run_count += 1
logger.error("Batch ingest failed: %s", exc)
_state.running = False
return {"ok": False, "error": str(exc)}
finally:
_state.running = False
if submit_endpoint:
await submit_matched(db_path, submit_endpoint, source_host, since=_state.last_submitted_at)
return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s}
async def scheduler_loop(
sources_file: Path,
db_path: Path,
pattern_file: Path | None,
interval_s: int,
submit_endpoint: str | None = None,
source_host: str = "unknown",
) -> None:
"""Run ingest + optional submission every interval_s seconds until cancelled."""
logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file)
if submit_endpoint:
logger.info("Submission enabled — endpoint: %s", submit_endpoint)
while True:
await run_once(sources_file, db_path, pattern_file, submit_endpoint, source_host)
next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s)
_state.next_run_at = next_run.isoformat()
try:
await asyncio.sleep(interval_s)
except asyncio.CancelledError:
logger.info("Ingest scheduler cancelled")
_state.next_run_at = None
raise

View file

@ -0,0 +1,74 @@
# Turnstone — CF receiving instances for external node submissions.
#
# These are SEPARATE instances from the main Turnstone deployment. Each node
# that has TURNSTONE_SUBMIT_ENDPOINT configured pushes pattern-matched entries
# here. Each instance has its own isolated database. Avocet reads these
# databases for training data.
#
# Ports:
# 8536 → submissions-daniel (harvest.circuitforge.tech/daniel/*)
# 8537 → submissions-xander (harvest.circuitforge.tech/xander/*)
#
# Deploy on Heimdall:
# docker compose -f docker-compose.submissions.yml up -d
#
# Database locations:
# /devl/docker/turnstone-submissions/daniel/turnstone.db
# /devl/docker/turnstone-submissions/xander/turnstone.db
#
# These instances have TURNSTONE_INGEST_INTERVAL=0 — they only receive POSTs,
# they do not run their own scheduled ingest.
services:
submissions-daniel:
image: turnstone:latest
container_name: turnstone-submissions-daniel
restart: unless-stopped
ports:
- "8536:8534"
volumes:
- /devl/docker/turnstone-submissions/daniel:/data:z
- /devl/docker/turnstone-submissions/daniel/patterns:/patterns:ro
environment:
TURNSTONE_DB: /data/turnstone.db
TURNSTONE_PATTERNS: /patterns
TURNSTONE_SOURCE_HOST: submissions-daniel
TURNSTONE_INGEST_INTERVAL: "0"
PYTHONUNBUFFERED: "1"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8534/turnstone/health"]
interval: 30s
timeout: 10s
start_period: 20s
retries: 3
networks:
- caddy-internal
submissions-xander:
image: turnstone:latest
container_name: turnstone-submissions-xander
restart: unless-stopped
ports:
- "8537:8534"
volumes:
- /devl/docker/turnstone-submissions/xander:/data:z
- /devl/docker/turnstone-submissions/xander/patterns:/patterns:ro
environment:
TURNSTONE_DB: /data/turnstone.db
TURNSTONE_PATTERNS: /patterns
TURNSTONE_SOURCE_HOST: submissions-xander
TURNSTONE_INGEST_INTERVAL: "0"
PYTHONUNBUFFERED: "1"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8534/turnstone/health"]
interval: 30s
timeout: 10s
start_period: 20s
retries: 3
networks:
- caddy-internal
networks:
caddy-internal:
name: caddy-proxy_caddy-internal
external: true

View file

@ -42,3 +42,10 @@ sources:
# Jellyfin # Jellyfin
# - id: jellyfin # - id: jellyfin
# path: /opt/jellyfin/log/jellyfin.log # path: /opt/jellyfin/log/jellyfin.log
# Wazuh SIEM — alerts.json on the Wazuh manager
# Turnstone auto-detects this format; source_id is qualified per agent automatically.
# For push-based ingestion from Wazuh custom integrations, use:
# POST /api/ingest/wazuh/alert (single alert JSON body)
# - id: wazuh
# path: /var/ossec/logs/alerts/alerts.json

View file

@ -70,3 +70,27 @@ sources:
- id: jellyseerr - id: jellyseerr
path: /opt/jellyseerr/config/logs/jellyseerr.log path: /opt/jellyseerr/config/logs/jellyseerr.log
# ── MQTT / IoT (live — subscribe mode, no path needed) ───────────────────
# Requires: pip install circuitforge-core[mqtt]
# These sources are handled by the live MQTT subscriber task (not batch ingest).
# Uncomment and configure to enable.
#
# Meshtastic MQTT bridge (node must have MQTT uplink enabled):
# - id: meshtastic-home
# type: mqtt
# broker_host: 10.1.10.5 # IP of your local MQTT broker (e.g. Mosquitto on Huginn)
# broker_port: 1883
# topics:
# - msh/# # all Meshtastic regions; use msh/us-east/# to narrow
#
# Generic IoT sensors:
# - id: iot-home
# type: mqtt
# broker_host: localhost
# broker_port: 1883
# topics:
# - home/+/temperature
# - home/+/humidity
# - home/+/motion
# severity: INFO

View file

@ -71,6 +71,14 @@ TZ=America/Los_Angeles
# export TURNSTONE_BUNDLE_ENDPOINT=https://turnstone.circuitforge.tech/turnstone/api/bundles # export TURNSTONE_BUNDLE_ENDPOINT=https://turnstone.circuitforge.tech/turnstone/api/bundles
# bash /opt/turnstone/podman-standalone.sh # bash /opt/turnstone/podman-standalone.sh
# #
# ── Orchard submission (opt-in telemetry) ────────────────────────────────────
# Set TURNSTONE_SUBMIT_ENDPOINT to push pattern-matched log entries to a CF
# receiving instance after each ingest run. Only matched entries are sent —
# no raw log content. Used to build Avocet training data.
#
# export TURNSTONE_SUBMIT_ENDPOINT=https://harvest.circuitforge.tech/xander
# bash /opt/turnstone/podman-standalone.sh
#
# TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed. # TURNSTONE_SOURCE_HOST is auto-detected from `hostname` — override if needed.
# ── Turnstone container ─────────────────────────────────────────────────────── # ── Turnstone container ───────────────────────────────────────────────────────
@ -102,6 +110,7 @@ podman run -d \
-e TURNSTONE_DB=/data/turnstone.db \ -e TURNSTONE_DB=/data/turnstone.db \
-e TURNSTONE_SOURCE_HOST="$(hostname)" \ -e TURNSTONE_SOURCE_HOST="$(hostname)" \
-e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \ -e TURNSTONE_BUNDLE_ENDPOINT="${TURNSTONE_BUNDLE_ENDPOINT:-}" \
-e TURNSTONE_SUBMIT_ENDPOINT="${TURNSTONE_SUBMIT_ENDPOINT:-}" \
-e PYTHONUNBUFFERED=1 \ -e PYTHONUNBUFFERED=1 \
-e TZ="${TZ}" \ -e TZ="${TZ}" \
--health-cmd="curl -f http://localhost:8534/turnstone/health || exit 1" \ --health-cmd="curl -f http://localhost:8534/turnstone/health || exit 1" \

118
tests/test_ingest_wazuh.py Normal file
View file

@ -0,0 +1,118 @@
"""Tests for the Wazuh alert ingestor."""
from __future__ import annotations
import json
from datetime import datetime
from app.ingest.wazuh import is_wazuh_alert, parse
from app.ingest.pipeline import _detect_format
_ALERT = {
"timestamp": "2024-01-15T10:23:45.123+0000",
"rule": {
"level": 7,
"description": "SSH authentication failure.",
"id": "5710",
"firedtimes": 1,
"groups": ["syslog", "sshd", "authentication_failed"],
},
"agent": {"id": "001", "name": "web-server-01", "ip": "192.168.1.100"},
"manager": {"name": "wazuh-mgr"},
"id": "1705312125.123456",
"full_log": "Jan 15 10:23:45 web-server-01 sshd[1234]: Failed password for admin from 10.0.0.5",
"location": "/var/log/auth.log",
"data": {"srcip": "10.0.0.5", "srcuser": "admin"},
}
_CRITICAL_ALERT = {
"timestamp": "2024-01-15T10:30:00.000+0000",
"rule": {"level": 13, "description": "Rootkit detected.", "id": "510", "groups": ["rootcheck"]},
"agent": {"id": "002", "name": "db-host", "ip": "192.168.1.200"},
"manager": {"name": "wazuh-mgr"},
"full_log": "rootkit patterns found",
"location": "/var/ossec/logs/active-responses.log",
}
class TestDetector:
def test_detects_valid_alert(self):
assert is_wazuh_alert(_ALERT)
def test_detects_minimal_alert(self):
assert is_wazuh_alert({
"timestamp": "2024-01-15T10:23:45+0000",
"rule": {"level": 5, "description": "test"},
"agent": {"name": "host"},
})
def test_rejects_journald(self):
assert not is_wazuh_alert({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"})
def test_rejects_caddy(self):
assert not is_wazuh_alert({"ts": 1234, "msg": "served", "request": {}})
def test_rejects_no_agent(self):
assert not is_wazuh_alert({"rule": {"level": 5}, "timestamp": "2024-01-01T00:00:00Z"})
def test_pipeline_routes_to_wazuh(self):
assert _detect_format(json.dumps(_ALERT)) == "wazuh"
class TestParser:
def _parse(self, *alerts) -> list:
lines = [json.dumps(a) for a in alerts]
return list(parse(iter(lines), "wazuh", []))
def test_single_entry_parsed(self):
entries = self._parse(_ALERT)
assert len(entries) == 1
def test_severity_from_level(self):
entries = self._parse(_ALERT)
assert entries[0].severity == "WARN" # level 7
def test_critical_severity(self):
entries = self._parse(_CRITICAL_ALERT)
assert entries[0].severity == "CRITICAL" # level 13
def test_source_id_includes_agent(self):
entries = self._parse(_ALERT)
assert entries[0].source_id == "wazuh:web-server-01"
def test_text_contains_rule_description(self):
entries = self._parse(_ALERT)
assert "SSH authentication failure" in entries[0].text
def test_text_contains_agent_name(self):
entries = self._parse(_ALERT)
assert "web-server-01" in entries[0].text
def test_text_contains_decoded_data(self):
entries = self._parse(_ALERT)
assert "10.0.0.5" in entries[0].text
def test_text_contains_full_log(self):
entries = self._parse(_ALERT)
assert "Failed password" in entries[0].text
def test_timestamp_parsed_to_utc(self):
entries = self._parse(_ALERT)
dt = datetime.fromisoformat(entries[0].timestamp_iso)
assert dt.utcoffset() is not None
assert dt.hour == 10 and dt.minute == 23 and dt.second == 45
def test_skips_malformed_json(self):
lines = iter(["not json\n", json.dumps(_ALERT)])
entries = list(parse(lines, "wazuh", []))
assert len(entries) == 1
def test_skips_empty_lines(self):
lines = iter(["\n", " \n", json.dumps(_ALERT)])
entries = list(parse(lines, "wazuh", []))
assert len(entries) == 1
def test_multi_alert_sequence(self):
entries = self._parse(_ALERT, _CRITICAL_ALERT)
assert len(entries) == 2
seqs = [e.sequence for e in entries]
assert seqs == sorted(seqs)