turnstone/app/tasks/ingest_scheduler.py
pyr0ball 82977f365b feat: periodic ingest scheduler + Orchard submission pipeline
Adds asyncio-native background scheduler (TURNSTONE_INGEST_INTERVAL,
default 900s) that runs batch ingest then pushes pattern-matched entries
to a remote CF harvest endpoint (TURNSTONE_SUBMIT_ENDPOINT).

- app/tasks/ingest_scheduler.py: IngestState, scheduler_loop, run_once,
  submit_matched, _query_matched_since — asyncio.Lock prevents concurrent runs
- app/rest.py: POST /api/ingest/batch (pre-parsed entry receiver),
  GET /api/tasks/ingest/status, POST /api/tasks/ingest (manual trigger),
  TURNSTONE_INGEST_INTERVAL + TURNSTONE_SUBMIT_ENDPOINT env wiring in lifespan
- docker-compose.submissions.yml: segregated daniel (8536) + xander (8537)
  receiving instances on Heimdall, isolated DBs under
  /devl/docker/turnstone-submissions/<node>/
- podman-standalone.sh: pass-through for TURNSTONE_SUBMIT_ENDPOINT +
  TURNSTONE_SOURCE_HOST
- app/ingest/mqtt_subscriber.py: MQTT log source adapter
- app/ingest/wazuh.py: Wazuh alert JSON adapter
- tests/test_ingest_wazuh.py: Wazuh adapter test suite
2026-05-20 08:57:25 -07:00

184 lines
6.4 KiB
Python

"""Periodic batch ingest scheduler with optional CF submission.
Runs ingest_sources on a configurable interval (TURNSTONE_INGEST_INTERVAL env var,
default 900s / 15 min). Set to 0 to disable.
When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote
Turnstone instance (the CF receiving store) after each ingest run.
"""
from __future__ import annotations
import asyncio
import json
import logging
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import httpx
from app.ingest.pipeline import ingest_sources
logger = logging.getLogger(__name__)
_lock = asyncio.Lock()
@dataclass
class IngestState:
last_run_at: str | None = None
last_duration_s: float | None = None
last_stats: dict[str, int] = field(default_factory=dict)
last_error: str | None = None
run_count: int = 0
next_run_at: str | None = None
running: bool = False
last_submitted_at: str | None = None
last_submit_count: int = 0
last_submit_error: str | None = None
_state = IngestState()
def get_state() -> IngestState:
return _state
def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
"""Return entries with non-empty matched_patterns, optionally filtered by ingest_time."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
if since:
rows = conn.execute(
"""
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text
FROM log_entries
WHERE matched_patterns != '[]' AND ingest_time > ?
ORDER BY ingest_time
LIMIT 5000
""",
(since,),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text
FROM log_entries
WHERE matched_patterns != '[]'
ORDER BY ingest_time DESC
LIMIT 5000
""",
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
async def submit_matched(
db_path: Path,
submit_endpoint: str,
source_host: str,
since: str | None = None,
) -> dict[str, Any]:
"""Push pattern-matched entries to the remote CF receiving instance."""
loop = asyncio.get_running_loop()
entries = await loop.run_in_executor(
None, lambda: _query_matched_since(db_path, since)
)
if not entries:
return {"ok": True, "submitted": 0, "skipped": True}
url = f"{submit_endpoint.rstrip('/')}/turnstone/api/ingest/batch"
payload = {"source_host": source_host, "entries": entries}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
result = resp.json()
submitted = result.get("ingested", len(entries))
_state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat()
_state.last_submit_count = submitted
_state.last_submit_error = None
logger.info("Submitted %d matched entries to %s", submitted, submit_endpoint)
return {"ok": True, "submitted": submitted}
except Exception as exc:
_state.last_submit_error = str(exc)
logger.warning("Submission to %s failed: %s", submit_endpoint, exc)
return {"ok": False, "error": str(exc)}
async def run_once(
sources_file: Path,
db_path: Path,
pattern_file: Path | None = None,
submit_endpoint: str | None = None,
source_host: str = "unknown",
) -> dict[str, Any]:
"""Ingest all sources once, then submit matched entries if configured."""
if _lock.locked():
return {"ok": False, "error": "ingest already running", "skipped": True}
async with _lock:
_state.running = True
started = datetime.now(tz=timezone.utc)
try:
loop = asyncio.get_running_loop()
stats: dict[str, int] = await loop.run_in_executor(
None,
lambda: ingest_sources(sources_file, db_path, pattern_file),
)
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()
_state.last_duration_s = round(duration, 2)
_state.last_stats = stats
_state.last_error = None
_state.run_count += 1
logger.info("Batch ingest complete in %.1fs — %s", duration, stats)
except Exception as exc:
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()
_state.last_duration_s = round(duration, 2)
_state.last_error = str(exc)
_state.run_count += 1
logger.error("Batch ingest failed: %s", exc)
_state.running = False
return {"ok": False, "error": str(exc)}
finally:
_state.running = False
if submit_endpoint:
await submit_matched(db_path, submit_endpoint, source_host, since=_state.last_submitted_at)
return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s}
async def scheduler_loop(
sources_file: Path,
db_path: Path,
pattern_file: Path | None,
interval_s: int,
submit_endpoint: str | None = None,
source_host: str = "unknown",
) -> None:
"""Run ingest + optional submission every interval_s seconds until cancelled."""
logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file)
if submit_endpoint:
logger.info("Submission enabled — endpoint: %s", submit_endpoint)
while True:
await run_once(sources_file, db_path, pattern_file, submit_endpoint, source_host)
next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s)
_state.next_run_at = next_run.isoformat()
try:
await asyncio.sleep(interval_s)
except asyncio.CancelledError:
logger.info("Ingest scheduler cancelled")
_state.next_run_at = None
raise