From 7a2ab0bb4666ae533f1c6eb63f5f17f99672a98a Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 14 Jun 2026 14:30:18 -0700 Subject: [PATCH] feat(orchard): auto-enrollment API for branch node provisioning (#27) Implements the Orchard branch grafting system for harvest.circuitforge.tech: - POST /api/orchard/graft: provisions data dir, starts a new turnstone-submissions- Docker container on the next free port (ORCHARD_PORT_BASE=8538+), injects a handle_path block into the Caddyfile dynamic-branches marker section, restarts caddy-proxy, returns {submit_endpoint, api_key} - GET /api/orchard/branches: list active/inactive branches (admin-only) - DELETE /api/orchard/branches/: deactivate branch + stop container - POST /api/orchard/branches//anonymize: HMAC-based IP/username pseudonymization worker over a branch DB - POST /api/glean/batch: optional TURNSTONE_BRANCH_KEY auth guard - anonymized column added to log_entries schema (migration-safe) - Updated Caddyfile with /huginn/* route (port 8536), /node2/* (8537), and dynamic-branch marker section - All endpoints admin-gated via TURNSTONE_ORCHARD_ADMIN_KEY Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/27 --- .env.example | 16 ++ app/db/schema.py | 1 + app/glean/pipeline.py | 3 +- app/rest.py | 100 +++++++++++- app/services/orchard.py | 327 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 444 insertions(+), 3 deletions(-) create mode 100644 app/services/orchard.py diff --git a/.env.example b/.env.example index a1d2d91..0a92332 100644 --- a/.env.example +++ b/.env.example @@ -86,3 +86,19 @@ # When set, all /api/ requests require: Authorization: Bearer # Generate a token: python -c "import secrets; print(secrets.token_urlsafe(32))" # TURNSTONE_API_KEY=your-secret-token-here + +# --- The Orchard (harvest receiver only) --- +# Set on the central harvest.circuitforge.tech instance to enable branch management. +# TURNSTONE_ORCHARD_ADMIN_KEY=your-admin-secret-here +# TURNSTONE_ORCHARD_DATA_ROOT=/devl/docker/turnstone-submissions +# TURNSTONE_ORCHARD_CADDYFILE=/devl/caddy-proxy/Caddyfile +# TURNSTONE_ORCHARD_CADDY_CONTAINER=caddy-proxy +# TURNSTONE_ORCHARD_HARVEST_HOST=https://harvest.circuitforge.tech +# TURNSTONE_ORCHARD_PORT_BASE=8538 +# TURNSTONE_ORCHARD_IMAGE=localhost/turnstone:latest + +# --- Orchard branch (submitting node) --- +# Set TURNSTONE_SUBMIT_ENDPOINT to push pattern-matched log entries to the harvest receiver. +# Generate your branch slug and API key via: POST /api/orchard/graft on the harvest instance. +# TURNSTONE_SUBMIT_ENDPOINT=https://harvest.circuitforge.tech/your-slug +# TURNSTONE_BRANCH_KEY=api-key-from-graft-response diff --git a/app/db/schema.py b/app/db/schema.py index 311a321..5408845 100644 --- a/app/db/schema.py +++ b/app/db/schema.py @@ -404,6 +404,7 @@ _MAIN_MIGRATIONS_SQLITE = [ "ALTER TABLE log_entries ADD COLUMN ml_label TEXT", "ALTER TABLE log_entries ADD COLUMN ml_scored_at TEXT", "ALTER TABLE detections ADD COLUMN scorer TEXT NOT NULL DEFAULT 'anomaly'", + "ALTER TABLE log_entries ADD COLUMN anonymized INTEGER DEFAULT NULL", ] _CONTEXT_MIGRATIONS_SQLITE = [ diff --git a/app/glean/pipeline.py b/app/glean/pipeline.py index d6a99a6..73abba7 100644 --- a/app/glean/pipeline.py +++ b/app/glean/pipeline.py @@ -50,7 +50,8 @@ CREATE TABLE IF NOT EXISTS log_entries ( repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', - text TEXT NOT NULL + text TEXT NOT NULL, + anonymized INTEGER DEFAULT NULL ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); diff --git a/app/rest.py b/app/rest.py index 9d98f2d..c0efdc9 100644 --- a/app/rest.py +++ b/app/rest.py @@ -30,7 +30,7 @@ from typing import Annotated import yaml -from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, Query, Request, UploadFile +from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, Header, HTTPException, Query, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles @@ -54,6 +54,7 @@ from app.services.blocklist import ( from app.services.pihole import PiholeClient from app.services.discover import discover_all, build_sources_yaml, validate_source, scan_log_directories from app.services.nl_source import interpret as _nl_interpret +from app.services import orchard as _orchard from app.services.incidents import ( build_bundle, create_incident, @@ -124,6 +125,9 @@ AUTO_INCIDENT = os.environ.get("TURNSTONE_AUTO_INCIDENT", "true").lower() not in # When set, all /api/ routes require Authorization: Bearer . # Unset (default) means no authentication — suitable for local-only deployments. _API_KEY: str | None = os.environ.get("TURNSTONE_API_KEY") or None +# Admin key for The Orchard graft/deactivate endpoints on the harvest receiver. +# If unset, the orchard management endpoints return 501. +_ORCHARD_ADMIN_KEY: str | None = os.environ.get("TURNSTONE_ORCHARD_ADMIN_KEY") or None # GPU inference server URL. # Priority: GPU_SERVER_URL → CF_ORCH_URL (backward compat) → orch.circuitforge.tech (Paid+). @@ -910,12 +914,24 @@ def setup_interpret(body: NLInterpretBody) -> dict: @router.post("/api/glean/batch") -def glean_batch(payload: BatchGleanRequest, background_tasks: BackgroundTasks) -> dict: +def glean_batch( + payload: BatchGleanRequest, + background_tasks: BackgroundTasks, + authorization: str | None = Header(default=None), +) -> dict: """Accept pre-parsed log entries from a remote Turnstone instance (submission protocol). Used by nodes with TURNSTONE_SUBMIT_ENDPOINT configured to push their pattern-matched entries to a central receiving instance. + + When TURNSTONE_ORCHARD_ADMIN_KEY is set on the receiver, requests must + include Authorization: Bearer where the key was issued at graft time. """ + branch_key_env = os.environ.get("TURNSTONE_BRANCH_KEY", "") + if branch_key_env: + provided = (authorization or "").removeprefix("Bearer ").strip() + if not provided or provided != branch_key_env: + raise HTTPException(status_code=401, detail="Invalid branch API key") if not payload.entries: return {"gleaned": 0} conn = sqlite3.connect(str(DB_PATH), timeout=30.0) @@ -951,6 +967,86 @@ def glean_batch(payload: BatchGleanRequest, background_tasks: BackgroundTasks) - return {"gleaned": len(payload.entries), "source_host": payload.source_host} +def _require_orchard_admin(authorization: str | None) -> None: + """Raise 401/501 if the Orchard admin key check fails.""" + if _ORCHARD_ADMIN_KEY is None: + raise HTTPException(status_code=501, detail="Orchard management not enabled on this instance — set TURNSTONE_ORCHARD_ADMIN_KEY") + provided = (authorization or "").removeprefix("Bearer ").strip() + if not hmac.compare_digest(_ORCHARD_ADMIN_KEY, provided): + raise HTTPException(status_code=401, detail="Invalid Orchard admin key") + + +class GraftRequest(BaseModel): + slug: str + contact_email: str + agreed_to_terms: bool = False + + +@router.post("/api/orchard/graft") +def orchard_graft( + body: GraftRequest, + authorization: str | None = Header(default=None), +) -> dict: + """Provision a new Orchard branch node. + + Admin-only: requires Authorization: Bearer . + Returns the submit endpoint and a one-time API key. + """ + _require_orchard_admin(authorization) + try: + result = _orchard.graft(body.slug, body.contact_email, body.agreed_to_terms) + except ValueError as exc: + raise HTTPException(status_code=422, detail=str(exc)) + except Exception as exc: + logger.error("Orchard graft failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + return result + + +@router.get("/api/orchard/branches") +def orchard_list_branches( + authorization: str | None = Header(default=None), +) -> dict: + """List all Orchard branches. Admin-only.""" + _require_orchard_admin(authorization) + branches = _orchard.list_branches() + # Strip api_key_hash from public response + safe = [{k: v for k, v in b.items() if k != "api_key_hash"} for b in branches] + return {"branches": safe} + + +@router.delete("/api/orchard/branches/{slug}") +def orchard_deactivate( + slug: str, + authorization: str | None = Header(default=None), +) -> dict: + """Deactivate a branch: stop its container and remove its Caddy route. Admin-only.""" + _require_orchard_admin(authorization) + try: + return _orchard.deactivate(slug) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + except Exception as exc: + logger.error("Orchard deactivate failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + + +@router.post("/api/orchard/branches/{slug}/anonymize") +def orchard_anonymize( + slug: str, + authorization: str | None = Header(default=None), +) -> dict: + """Run the anonymization worker over a branch DB. Admin-only.""" + _require_orchard_admin(authorization) + try: + return _orchard.run_anonymization(slug) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + except Exception as exc: + logger.error("Orchard anonymize failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + + @router.get("/api/tasks/glean/status") def glean_task_status() -> dict: """Return the current state of the periodic glean scheduler.""" diff --git a/app/services/orchard.py b/app/services/orchard.py new file mode 100644 index 0000000..7246f64 --- /dev/null +++ b/app/services/orchard.py @@ -0,0 +1,327 @@ +"""The Orchard — auto-enrollment of new Turnstone branch nodes. + +A "branch" is an external Turnstone instance that submits pattern-matched log +entries to a central harvest receiver (harvest.circuitforge.tech). Grafting +provisions the receiving infrastructure for a new branch: + + 1. Creates a data dir at ORCHARD_DATA_ROOT// + 2. Starts a new turnstone-submissions- Docker container + 3. Injects a handle_path block into the Caddyfile marker section + 4. Restarts caddy-proxy to activate the route + 5. Persists the branch registry to orchard-branches.yaml + +Admin auth: the graft/deactivate endpoints require + Authorization: Bearer + +Set TURNSTONE_ORCHARD_ADMIN_KEY in the environment on the harvest instance. +If unset, the endpoints return 501 Not Implemented (feature is off). + +Anonymization: a separate pass (run_anonymization) replaces IPs, hostnames, +and usernames in branch DBs with stable pseudonyms before Avocet reads them. +""" +from __future__ import annotations + +import hashlib +import hmac +import ipaddress +import json +import logging +import os +import re +import secrets +import sqlite3 +import subprocess +import time +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Config (read from env on the harvest instance) +# --------------------------------------------------------------------------- + +ORCHARD_DATA_ROOT = Path(os.environ.get("TURNSTONE_ORCHARD_DATA_ROOT", "/devl/docker/turnstone-submissions")) +ORCHARD_CADDYFILE = Path(os.environ.get("TURNSTONE_ORCHARD_CADDYFILE", "/devl/caddy-proxy/Caddyfile")) +ORCHARD_CADDY_CONTAINER = os.environ.get("TURNSTONE_ORCHARD_CADDY_CONTAINER", "caddy-proxy") +ORCHARD_HARVEST_HOST = os.environ.get("TURNSTONE_ORCHARD_HARVEST_HOST", "https://harvest.circuitforge.tech") +ORCHARD_IMAGE = os.environ.get("TURNSTONE_ORCHARD_IMAGE", "localhost/turnstone:latest") + +# Ports for submission containers start here and scan upward. +ORCHARD_PORT_BASE = int(os.environ.get("TURNSTONE_ORCHARD_PORT_BASE", "8538")) + +_REGISTRY_FILE = ORCHARD_DATA_ROOT / "orchard-branches.yaml" + +_CADDY_BRANCH_START = "# --- ORCHARD BRANCHES: auto-managed by POST /api/orchard/graft, do not edit manually ---" +_CADDY_BRANCH_END = "# --- END ORCHARD BRANCHES ---" + +_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]{1,30}[a-z0-9]$") + +# --------------------------------------------------------------------------- +# Branch registry +# --------------------------------------------------------------------------- + +def _load_registry() -> list[dict[str, Any]]: + if not _REGISTRY_FILE.exists(): + return [] + import yaml as _yaml + try: + data = _yaml.safe_load(_REGISTRY_FILE.read_text()) or {} + return data.get("branches", []) + except Exception: + return [] + + +def _save_registry(branches: list[dict[str, Any]]) -> None: + import yaml as _yaml + _REGISTRY_FILE.parent.mkdir(parents=True, exist_ok=True) + _REGISTRY_FILE.write_text(_yaml.dump({"branches": branches}, default_flow_style=False)) + + +def list_branches() -> list[dict[str, Any]]: + return _load_registry() + + +# --------------------------------------------------------------------------- +# Port allocation +# --------------------------------------------------------------------------- + +def _next_free_port() -> int: + used = {b["port"] for b in _load_registry() if "port" in b} + port = ORCHARD_PORT_BASE + while port in used: + port += 1 + return port + + +# --------------------------------------------------------------------------- +# Caddy route injection +# --------------------------------------------------------------------------- + +def _build_branch_block(slug: str, port: int) -> str: + return ( + f" handle_path /{slug}/* {{\n" + f" reverse_proxy http://host.docker.internal:{port} {{\n" + f" header_up X-Real-IP {{remote_host}}\n" + f" header_up X-Forwarded-Proto {{scheme}}\n" + f" flush_interval -1\n" + f" transport http {{\n" + f" response_header_timeout 0\n" + f" read_timeout 0\n" + f" }}\n" + f" }}\n" + f" }}" + ) + + +def _rewrite_caddy_branches(branches: list[dict[str, Any]]) -> None: + """Replace the auto-managed section in the Caddyfile with current branches.""" + if not ORCHARD_CADDYFILE.exists(): + raise RuntimeError(f"Caddyfile not found at {ORCHARD_CADDYFILE}") + + text = ORCHARD_CADDYFILE.read_text() + start_idx = text.find(_CADDY_BRANCH_START) + end_idx = text.find(_CADDY_BRANCH_END) + if start_idx == -1 or end_idx == -1: + raise RuntimeError("Caddyfile is missing the ORCHARD BRANCHES marker section") + + active = [b for b in branches if b.get("active", True)] + blocks = "\n".join(_build_branch_block(b["slug"], b["port"]) for b in active) + replacement = f"{_CADDY_BRANCH_START}\n{blocks}\n {_CADDY_BRANCH_END}" + + new_text = text[:start_idx] + replacement + text[end_idx + len(_CADDY_BRANCH_END):] + ORCHARD_CADDYFILE.write_text(new_text) + logger.info("Caddyfile updated with %d active branch routes", len(active)) + + +def _reload_caddy() -> None: + result = subprocess.run( + ["docker", "restart", ORCHARD_CADDY_CONTAINER], + capture_output=True, text=True, timeout=30, + ) + if result.returncode != 0: + raise RuntimeError(f"docker restart {ORCHARD_CADDY_CONTAINER} failed: {result.stderr}") + logger.info("Restarted %s", ORCHARD_CADDY_CONTAINER) + + +# --------------------------------------------------------------------------- +# Container provisioning +# --------------------------------------------------------------------------- + +def _start_branch_container(slug: str, port: int, data_dir: Path) -> None: + patterns_dir = data_dir / "patterns" + patterns_dir.mkdir(parents=True, exist_ok=True) + data_dir.mkdir(parents=True, exist_ok=True) + + # Seed default patterns if not already present + repo_patterns = Path(__file__).parent.parent.parent / "patterns" + for yaml_file in ("default.yaml", "sources-example.yaml"): + src = repo_patterns / yaml_file + dst = patterns_dir / yaml_file + if src.exists() and not dst.exists(): + dst.write_text(src.read_text()) + + container_name = f"turnstone-submissions-{slug}" + cmd = [ + "docker", "run", "-d", + "--name", container_name, + "--restart", "unless-stopped", + "-p", f"{port}:8534", + "-v", f"{data_dir}:/data", + "-v", f"{patterns_dir}:/patterns", + "-e", f"TURNSTONE_DB=/data/turnstone.db", + "-e", f"TURNSTONE_SOURCE_HOST={slug}", + "-e", "PYTHONUNBUFFERED=1", + "-e", "TZ=America/Los_Angeles", + ORCHARD_IMAGE, + ] + # Remove any stale container with the same name first + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if result.returncode != 0: + raise RuntimeError(f"docker run for {container_name} failed: {result.stderr}") + logger.info("Started container %s on port %d", container_name, port) + + +def _stop_branch_container(slug: str) -> None: + container_name = f"turnstone-submissions-{slug}" + subprocess.run(["docker", "rm", "-f", container_name], capture_output=True, timeout=30) + logger.info("Removed container %s", container_name) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def graft(slug: str, contact_email: str, agreed_to_terms: bool) -> dict[str, Any]: + """Provision a new Orchard branch and return connection details.""" + if not agreed_to_terms: + raise ValueError("agreed_to_terms must be true") + if not _SLUG_RE.match(slug): + raise ValueError( + f"Invalid slug {slug!r}: must be 2-32 lowercase alphanumeric/hyphen, " + "cannot start or end with a hyphen" + ) + + branches = _load_registry() + if any(b["slug"] == slug for b in branches): + raise ValueError(f"Branch {slug!r} already exists") + + port = _next_free_port() + data_dir = ORCHARD_DATA_ROOT / slug + api_key = secrets.token_urlsafe(32) + + branch: dict[str, Any] = { + "slug": slug, + "port": port, + "contact_email": contact_email, + "api_key_hash": hashlib.sha256(api_key.encode()).hexdigest(), + "grafted_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "active": True, + } + + _start_branch_container(slug, port, data_dir) + branches.append(branch) + _save_registry(branches) + + _rewrite_caddy_branches(branches) + _reload_caddy() + + submit_endpoint = f"{ORCHARD_HARVEST_HOST}/{slug}" + logger.info("Grafted branch %r at %s", slug, submit_endpoint) + return { + "slug": slug, + "submit_endpoint": submit_endpoint, + "api_key": api_key, + "port": port, + } + + +def deactivate(slug: str) -> dict[str, Any]: + """Deactivate a branch: stop its container and remove its Caddy route.""" + branches = _load_registry() + branch = next((b for b in branches if b["slug"] == slug), None) + if branch is None: + raise KeyError(f"Branch {slug!r} not found") + + _stop_branch_container(slug) + branch["active"] = False + _save_registry(branches) + _rewrite_caddy_branches(branches) + _reload_caddy() + return {"slug": slug, "deactivated": True} + + +def verify_api_key(slug: str, key: str) -> bool: + """Check whether *key* is valid for the given branch slug.""" + branches = _load_registry() + branch = next((b for b in branches if b["slug"] == slug and b.get("active")), None) + if branch is None: + return False + expected = branch.get("api_key_hash", "") + provided = hashlib.sha256(key.encode()).hexdigest() + return hmac.compare_digest(expected, provided) + + +# --------------------------------------------------------------------------- +# Anonymization worker +# --------------------------------------------------------------------------- + +_IP_RE = re.compile( + r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b" +) +_USERNAME_RE = re.compile(r"\bfor\s+(\w+)\b|\buser\s+(\w+)\b|\bsession\s+opened\s+for\s+(\w+)\b", re.IGNORECASE) + + +def _pseudonym(value: str, salt: bytes, prefix: str) -> str: + digest = hmac.new(salt, value.encode(), "sha256").hexdigest()[:10] + return f"{prefix}-{digest}" + + +def _anonymize_text(text: str, salt: bytes) -> str: + def replace_ip(m: re.Match) -> str: + return _pseudonym(m.group(), salt, "ip") + + def replace_user(m: re.Match) -> str: + user = next(g for g in m.groups() if g) + return m.group().replace(user, _pseudonym(user, salt, "user")) + + text = _IP_RE.sub(replace_ip, text) + text = _USERNAME_RE.sub(replace_user, text) + return text + + +def run_anonymization(slug: str) -> dict[str, Any]: + """Anonymize IPs and usernames in a branch DB in-place. + + Uses a stable per-branch salt so pseudonyms are consistent across runs + but not reversible without the salt. + """ + branch = next((b for b in _load_registry() if b["slug"] == slug), None) + if branch is None: + raise KeyError(f"Branch {slug!r} not found") + + db_path = ORCHARD_DATA_ROOT / slug / "turnstone.db" + if not db_path.exists(): + return {"slug": slug, "anonymized": 0} + + # Per-branch salt derived from api_key_hash for stability + salt = branch["api_key_hash"].encode()[:32].ljust(32, b"0") + + conn = sqlite3.connect(str(db_path), timeout=30) + conn.execute("PRAGMA journal_mode=WAL") + rows = conn.execute("SELECT id, text FROM log_entries WHERE anonymized IS NULL OR anonymized = 0").fetchall() + + updated = 0 + for row_id, text in rows: + clean = _anonymize_text(text or "", salt) + if clean != text: + conn.execute("UPDATE log_entries SET text = ?, anonymized = 1 WHERE id = ?", (clean, row_id)) + updated += 1 + else: + conn.execute("UPDATE log_entries SET anonymized = 1 WHERE id = ?", (row_id,)) + + conn.commit() + conn.close() + logger.info("Anonymized %d/%d entries in branch %r", updated, len(rows), slug) + return {"slug": slug, "anonymized": updated, "total_processed": len(rows)}