diff --git a/.env.example b/.env.example index 97b21e6..2c1da08 100644 --- a/.env.example +++ b/.env.example @@ -42,6 +42,15 @@ # TURNSTONE_EMBED_MODEL=BAAI/bge-small-en-v1.5 # TURNSTONE_EMBED_DEVICE=cpu +# --- Cybersec scoring pipeline (zero-shot, second-pass on flagged entries) --- +# Runs a zero-shot classifier on entries already flagged by the anomaly scorer +# or that have pattern matches — a focused second opinion using cybersec vocabulary. +# The DeBERTa-v3-base-mnli model (required by the diagnose pipeline) is the recommended +# zero-shot classifier — it produces human-readable cybersec labels with no fine-tuning. +# TURNSTONE_CYBERSEC_MODEL=MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli +# TURNSTONE_CYBERSEC_DEVICE=cpu +# TURNSTONE_CYBERSEC_THRESHOLD=0.60 # lower than anomaly threshold (zero-shot is calibrated differently) + # --- Anomaly scoring pipeline (IDS / watchdog) --- # Batch-scores every ingested log entry after each glean cycle. # Any HuggingFace text-classification model works; the byviz classifier (already diff --git a/app/db/schema.py b/app/db/schema.py index 0e9ad2f..311a321 100644 --- a/app/db/schema.py +++ b/app/db/schema.py @@ -38,6 +38,9 @@ CREATE TABLE IF NOT EXISTS log_entries ( anomaly_score REAL, anomaly_label TEXT, anomaly_scored_at TEXT, + ml_score REAL, + ml_label TEXT, + ml_scored_at TEXT, PRIMARY KEY (tenant_id, id) ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); @@ -47,6 +50,7 @@ CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_ CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); CREATE INDEX IF NOT EXISTS idx_anomaly ON log_entries(tenant_id, anomaly_score); +CREATE INDEX IF NOT EXISTS idx_ml_scored ON log_entries(tenant_id, ml_scored_at); CREATE TABLE IF NOT EXISTS detections ( id TEXT PRIMARY KEY, @@ -61,12 +65,14 @@ CREATE TABLE IF NOT EXISTS detections ( detected_at TEXT NOT NULL, acknowledged INTEGER NOT NULL DEFAULT 0, acknowledged_at TEXT, - notes TEXT NOT NULL DEFAULT '' + notes TEXT NOT NULL DEFAULT '', + scorer TEXT NOT NULL DEFAULT 'anomaly' ); CREATE INDEX IF NOT EXISTS idx_detections_tenant ON detections(tenant_id, detected_at); CREATE INDEX IF NOT EXISTS idx_detections_ack ON detections(acknowledged); CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(anomaly_label); CREATE INDEX IF NOT EXISTS idx_detections_entry ON detections(entry_id); +CREATE INDEX IF NOT EXISTS idx_detections_scorer ON detections(scorer); CREATE TABLE IF NOT EXISTS glean_fingerprints ( tenant_id TEXT NOT NULL DEFAULT '', @@ -201,6 +207,9 @@ _MAIN_SCHEMA_PG_STMTS = [ anomaly_score DOUBLE PRECISION, anomaly_label TEXT, anomaly_scored_at TEXT, + ml_score DOUBLE PRECISION, + ml_label TEXT, + ml_scored_at TEXT, PRIMARY KEY (tenant_id, id) ) """, @@ -210,6 +219,7 @@ _MAIN_SCHEMA_PG_STMTS = [ "CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns)", "CREATE INDEX IF NOT EXISTS idx_fts_gin ON log_entries USING GIN(text_tsv)", "CREATE INDEX IF NOT EXISTS idx_anomaly ON log_entries(tenant_id, anomaly_score)", + "CREATE INDEX IF NOT EXISTS idx_ml_scored ON log_entries(tenant_id, ml_scored_at)", """ CREATE TABLE IF NOT EXISTS detections ( id TEXT PRIMARY KEY, @@ -224,13 +234,15 @@ _MAIN_SCHEMA_PG_STMTS = [ detected_at TEXT NOT NULL, acknowledged INTEGER NOT NULL DEFAULT 0, acknowledged_at TEXT, - notes TEXT NOT NULL DEFAULT '' + notes TEXT NOT NULL DEFAULT '', + scorer TEXT NOT NULL DEFAULT 'anomaly' ) """, "CREATE INDEX IF NOT EXISTS idx_detections_tenant ON detections(tenant_id, detected_at)", "CREATE INDEX IF NOT EXISTS idx_detections_ack ON detections(acknowledged)", "CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(anomaly_label)", "CREATE INDEX IF NOT EXISTS idx_detections_entry ON detections(entry_id)", + "CREATE INDEX IF NOT EXISTS idx_detections_scorer ON detections(scorer)", """ CREATE OR REPLACE FUNCTION _ts_update_text_tsv() RETURNS trigger AS $$ BEGIN @@ -388,6 +400,10 @@ _MAIN_MIGRATIONS_SQLITE = [ "ALTER TABLE log_entries ADD COLUMN anomaly_score REAL", "ALTER TABLE log_entries ADD COLUMN anomaly_label TEXT", "ALTER TABLE log_entries ADD COLUMN anomaly_scored_at TEXT", + "ALTER TABLE log_entries ADD COLUMN ml_score REAL", + "ALTER TABLE log_entries ADD COLUMN ml_label TEXT", + "ALTER TABLE log_entries ADD COLUMN ml_scored_at TEXT", + "ALTER TABLE detections ADD COLUMN scorer TEXT NOT NULL DEFAULT 'anomaly'", ] _CONTEXT_MIGRATIONS_SQLITE = [ diff --git a/app/rest.py b/app/rest.py index d187979..a59ede9 100644 --- a/app/rest.py +++ b/app/rest.py @@ -89,7 +89,9 @@ from app.context.wizard import get_schema as _wizard_schema, advance_step, is_co from app.context.chunker import UnsupportedDocType, FileTooLarge from app.tasks.glean_scheduler import get_state as _glean_state, run_once as _run_glean, scheduler_loop as _scheduler_loop, submit_matched as _submit_matched from app.tasks.anomaly_scorer import get_state as _scorer_state, run_once as _run_scorer +from app.tasks.cybersec_scorer import get_state as _cybersec_state, run_once as _run_cybersec from app.services.anomaly import list_detections as _list_detections, acknowledge_detection as _ack_detection +from app.services.cybersec import list_cybersec_detections as _list_cybersec, CYBERSEC_LABELS from app.glean.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) @@ -114,6 +116,9 @@ SUBMIT_ENDPOINT = os.environ.get("TURNSTONE_SUBMIT_ENDPOINT", "").rstrip("/") ANOMALY_MODEL = os.environ.get("TURNSTONE_ANOMALY_MODEL", "") ANOMALY_DEVICE = os.environ.get("TURNSTONE_ANOMALY_DEVICE", "cpu") ANOMALY_THRESHOLD = float(os.environ.get("TURNSTONE_ANOMALY_THRESHOLD", "0.75")) +CYBERSEC_MODEL = os.environ.get("TURNSTONE_CYBERSEC_MODEL", "") +CYBERSEC_DEVICE = os.environ.get("TURNSTONE_CYBERSEC_DEVICE", "cpu") +CYBERSEC_THRESHOLD = float(os.environ.get("TURNSTONE_CYBERSEC_THRESHOLD", "0.60")) # When set, all /api/ routes require Authorization: Bearer . # Unset (default) means no authentication — suitable for local-only deployments. _API_KEY: str | None = os.environ.get("TURNSTONE_API_KEY") or None @@ -173,6 +178,9 @@ async def _lifespan(app: FastAPI): anomaly_model=ANOMALY_MODEL, anomaly_device=ANOMALY_DEVICE, anomaly_threshold=ANOMALY_THRESHOLD, + cybersec_model=CYBERSEC_MODEL, + cybersec_device=CYBERSEC_DEVICE, + cybersec_threshold=CYBERSEC_THRESHOLD, ), name="glean-scheduler", ) @@ -1362,11 +1370,12 @@ async def anomaly_detections( limit: int = Query(100, ge=1, le=1000), unacked_only: bool = Query(False), label: str | None = Query(None), + scorer: str | None = Query(None), ): - """List anomaly detections ordered by detected_at DESC.""" + """List detections ordered by detected_at DESC. Optionally filter by scorer ('anomaly'|'cybersec').""" loop = asyncio.get_running_loop() rows = await loop.run_in_executor( - None, lambda: _list_detections(DB_PATH, limit=limit, unacked_only=unacked_only, label=label) + None, lambda: _list_detections(DB_PATH, limit=limit, unacked_only=unacked_only, label=label, scorer=scorer) ) return {"detections": rows, "total": len(rows)} @@ -1386,6 +1395,54 @@ async def acknowledge_detection(detection_id: str, notes: str = ""): app.include_router(_anomaly) +# --------------------------------------------------------------------------- +# Cybersec scoring endpoints +# --------------------------------------------------------------------------- + +_cybersec_router = APIRouter(prefix="/turnstone/api/cybersec", dependencies=[Depends(_check_api_key)]) + + +@_cybersec_router.get("/status") +async def cybersec_status(): + """Return cybersec scorer state and configuration.""" + return { + "model": CYBERSEC_MODEL or None, + "threshold": CYBERSEC_THRESHOLD, + "device": CYBERSEC_DEVICE, + "enabled": bool(CYBERSEC_MODEL), + "candidate_labels": CYBERSEC_LABELS, + **_cybersec_state(), + } + + +@_cybersec_router.post("/run") +async def cybersec_run(background_tasks: BackgroundTasks): + """Trigger a manual cybersec scoring pass (runs in background).""" + if not CYBERSEC_MODEL: + raise HTTPException(status_code=400, detail="TURNSTONE_CYBERSEC_MODEL not configured") + background_tasks.add_task( + _run_cybersec, DB_PATH, CYBERSEC_MODEL, CYBERSEC_DEVICE, 32, CYBERSEC_THRESHOLD + ) + return {"ok": True, "message": "cybersec scorer triggered"} + + +@_cybersec_router.get("/detections") +async def cybersec_detections( + limit: int = Query(100, ge=1, le=1000), + unacked_only: bool = Query(False), + label: str | None = Query(None), +): + """List cybersec detections ordered by detected_at DESC.""" + loop = asyncio.get_running_loop() + rows = await loop.run_in_executor( + None, lambda: _list_cybersec(DB_PATH, limit=limit, unacked_only=unacked_only, label=label) + ) + return {"detections": rows, "total": len(rows)} + + +app.include_router(_cybersec_router) + + # Root redirect → /turnstone/ @app.get("/") def root_redirect() -> RedirectResponse: diff --git a/app/services/anomaly.py b/app/services/anomaly.py index 85e7317..4e525fe 100644 --- a/app/services/anomaly.py +++ b/app/services/anomaly.py @@ -253,6 +253,7 @@ def list_detections( limit: int = 100, unacked_only: bool = False, label: str | None = None, + scorer: str | None = None, ) -> list[dict]: """Return detections ordered by detected_at DESC.""" tenant_id = resolve_tenant_id() @@ -264,6 +265,9 @@ def list_detections( if label: conditions.append(q("anomaly_label = ?")) params.append(label.upper()) + if scorer: + conditions.append(q("scorer = ?")) + params.append(scorer.lower()) where = " AND ".join(conditions) with get_conn(db_path) as conn: diff --git a/app/services/cybersec.py b/app/services/cybersec.py new file mode 100644 index 0000000..66fd893 --- /dev/null +++ b/app/services/cybersec.py @@ -0,0 +1,241 @@ +"""Cybersecurity-focused scoring pipeline using zero-shot classification. + +Runs a second-pass analysis on entries that were already flagged by the +anomaly scorer or that have pattern matches. Uses a zero-shot classification +model (DeBERTa-v3-base-mnli is cached locally) so no fine-tuning is needed. + +The scorer writes ml_score / ml_label / ml_scored_at to log_entries and +inserts high-confidence non-normal hits into the detections table tagged +with scorer='cybersec'. + +Env vars +-------- +TURNSTONE_CYBERSEC_MODEL — HF model id for zero-shot classification. + Recommended: MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli + (already cached from the diagnose pipeline). + Set to empty string to disable (safe default). +TURNSTONE_CYBERSEC_DEVICE — 'cpu' (default) or 'cuda' +TURNSTONE_CYBERSEC_THRESHOLD — float confidence floor for detection insertion (default 0.60) +""" +from __future__ import annotations + +import logging +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from app.db import get_conn, resolve_tenant_id +from app.db.dialect import q + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Candidate labels — cybersec vocabulary for zero-shot inference +# --------------------------------------------------------------------------- + +CYBERSEC_LABELS: list[str] = [ + "authentication failure or brute force attack", + "privilege escalation or unauthorized access", + "network intrusion or port scan", + "malware or suspicious process activity", + "data exfiltration or unusual outbound traffic", + "normal system operation", +] + +_NORMAL_LABEL = "normal system operation" + +_LABEL_SEVERITY: dict[str, str] = { + "authentication failure or brute force attack": "ERROR", + "privilege escalation or unauthorized access": "CRITICAL", + "network intrusion or port scan": "ERROR", + "malware or suspicious process activity": "CRITICAL", + "data exfiltration or unusual outbound traffic":"CRITICAL", + "normal system operation": "INFO", +} + +# --------------------------------------------------------------------------- +# Pipeline singleton +# --------------------------------------------------------------------------- + +_pipeline: Any = None + + +def _get_pipeline(model_id: str, device: str) -> Any: + global _pipeline # noqa: PLW0603 + if _pipeline is None: + from transformers import pipeline # type: ignore[import-untyped] + logger.info("loading cybersec zero-shot pipeline: %s on %s", model_id, device) + _pipeline = pipeline( + "zero-shot-classification", + model=model_id, + device=0 if device == "cuda" else -1, + ) + logger.info("cybersec pipeline ready") + return _pipeline + + +def reset_pipeline() -> None: + """Clear the cached pipeline — for testing only.""" + global _pipeline # noqa: PLW0603 + _pipeline = None + + +# --------------------------------------------------------------------------- +# Result type +# --------------------------------------------------------------------------- + +@dataclass +class CybersecResult: + scored: int = 0 + detections: int = 0 + skipped: bool = False + error: str | None = None + + +# --------------------------------------------------------------------------- +# Core scoring function +# --------------------------------------------------------------------------- + +def score_security_entries( + db_path: Path, + model_id: str, + device: str = "cpu", + batch_size: int = 32, + threshold: float = 0.60, +) -> CybersecResult: + """Score entries that were anomaly-flagged or pattern-matched. + + Only entries with ml_scored_at IS NULL are processed (idempotent). + Writes ml_score / ml_label / ml_scored_at and inserts high-confidence + hits into detections with scorer='cybersec'. + """ + if not model_id: + return CybersecResult(skipped=True) + + tenant_id = resolve_tenant_id() + try: + pipe = _get_pipeline(model_id, device) + except Exception as exc: + logger.error("failed to load cybersec pipeline: %s", exc) + return CybersecResult(error=str(exc)) + + total_scored = 0 + total_detections = 0 + + try: + with get_conn(db_path) as conn: + # Only score entries that are worth a second look: + # anomaly-flagged (non-normal) OR have at least one pattern match. + rows = conn.execute( + q(""" + SELECT id, source_id, text, timestamp_iso + FROM log_entries + WHERE (tenant_id = ? OR tenant_id = '') + AND ml_scored_at IS NULL + AND ( + (anomaly_label IS NOT NULL AND anomaly_label != 'NORMAL') + OR (matched_patterns IS NOT NULL AND matched_patterns != '[]' AND matched_patterns != '') + ) + LIMIT ? + """), + (tenant_id, batch_size * 10), + ).fetchall() + + if not rows: + return CybersecResult(skipped=True) + + # Process in chunks to avoid OOM on large backlogs + for i in range(0, len(rows), batch_size): + chunk = rows[i : i + batch_size] + texts = [r["text"] for r in chunk] + + try: + results = pipe(texts, candidate_labels=CYBERSEC_LABELS, multi_label=False) + except Exception as exc: + logger.warning("zero-shot inference error on chunk %d: %s", i, exc) + continue + + now = datetime.now(tz=timezone.utc).isoformat() + + with get_conn(db_path) as conn: + for row, result in zip(chunk, results): + top_label: str = result["labels"][0] + top_score: float = result["scores"][0] + + conn.execute( + q(""" + UPDATE log_entries + SET ml_score = ?, ml_label = ?, ml_scored_at = ? + WHERE id = ? AND (tenant_id = ? OR tenant_id = '') + """), + (top_score, top_label, now, row["id"], tenant_id), + ) + total_scored += 1 + + if top_score >= threshold and top_label != _NORMAL_LABEL: + severity = _LABEL_SEVERITY.get(top_label, "WARN") + try: + conn.execute( + q(""" + INSERT INTO detections + (id, tenant_id, entry_id, source_id, anomaly_label, + anomaly_score, severity, text, timestamp_iso, + detected_at, scorer) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'cybersec') + """), + ( + str(uuid.uuid4()), + tenant_id, + row["id"], + row["source_id"], + top_label, + top_score, + severity, + row["text"], + row["timestamp_iso"], + now, + ), + ) + total_detections += 1 + except Exception: + pass # entry may already have a detection — skip + + conn.commit() + + except Exception as exc: + logger.error("cybersec scoring failed: %s", exc) + return CybersecResult(scored=total_scored, detections=total_detections, error=str(exc)) + + return CybersecResult(scored=total_scored, detections=total_detections) + + +# --------------------------------------------------------------------------- +# Query helpers (used by REST layer) +# --------------------------------------------------------------------------- + +def list_cybersec_detections( + db_path: Path, + limit: int = 100, + unacked_only: bool = False, + label: str | None = None, +) -> list[dict]: + """Return cybersec detections ordered by detected_at DESC.""" + tenant_id = resolve_tenant_id() + conditions = ["(tenant_id = ? OR tenant_id = '')", "scorer = 'cybersec'"] + params: list[Any] = [tenant_id] + + if unacked_only: + conditions.append("acknowledged = 0") + if label: + conditions.append(q("anomaly_label = ?")) + params.append(label) + + where = " AND ".join(conditions) + with get_conn(db_path) as conn: + rows = conn.execute( + q(f"SELECT * FROM detections WHERE {where} ORDER BY detected_at DESC LIMIT ?"), # noqa: S608 + (*params, limit), + ).fetchall() + return [dict(r) for r in rows] diff --git a/app/tasks/cybersec_scorer.py b/app/tasks/cybersec_scorer.py new file mode 100644 index 0000000..6b3ca4c --- /dev/null +++ b/app/tasks/cybersec_scorer.py @@ -0,0 +1,84 @@ +"""Background task wrapper for the cybersec zero-shot scoring pipeline.""" +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path + +from app.services.cybersec import score_security_entries + +logger = logging.getLogger(__name__) + +_lock = asyncio.Lock() + + +@dataclass +class CybersecState: + last_run_at: str | None = None + last_duration_s: float | None = None + last_scored: int = 0 + last_detections: int = 0 + last_error: str | None = None + run_count: int = 0 + running: bool = False + total_scored: int = 0 + total_detections: int = 0 + + +_state = CybersecState() + + +def get_state() -> dict: + return { + "last_run_at": _state.last_run_at, + "last_duration_s":_state.last_duration_s, + "last_scored": _state.last_scored, + "last_detections":_state.last_detections, + "last_error": _state.last_error, + "run_count": _state.run_count, + "running": _state.running, + "total_scored": _state.total_scored, + "total_detections": _state.total_detections, + } + + +async def run_once( + db_path: Path, + model_id: str, + device: str = "cpu", + batch_size: int = 32, + threshold: float = 0.60, +) -> None: + """Single cybersec scoring pass — no-op if already running or no model set.""" + if not model_id or _lock.locked(): + return + + async with _lock: + _state.running = True + started = datetime.now(tz=timezone.utc) + try: + loop = asyncio.get_running_loop() + result = await loop.run_in_executor( + None, + lambda: score_security_entries(db_path, model_id, device, batch_size, threshold), + ) + elapsed = (datetime.now(tz=timezone.utc) - started).total_seconds() + _state.last_run_at = started.isoformat() + _state.last_duration_s = elapsed + _state.last_scored = result.scored + _state.last_detections = result.detections + _state.last_error = result.error + _state.run_count += 1 + _state.total_scored += result.scored + _state.total_detections += result.detections + if result.error: + logger.error("cybersec scorer error: %s", result.error) + elif not result.skipped: + logger.info( + "cybersec scorer: scored=%d detections=%d in %.1fs", + result.scored, result.detections, elapsed, + ) + finally: + _state.running = False diff --git a/app/tasks/glean_scheduler.py b/app/tasks/glean_scheduler.py index 7322158..fa05040 100644 --- a/app/tasks/glean_scheduler.py +++ b/app/tasks/glean_scheduler.py @@ -21,6 +21,7 @@ import httpx from app.glean.pipeline import glean_sources from app.tasks.anomaly_scorer import run_once as _run_scorer +from app.tasks.cybersec_scorer import run_once as _run_cybersec logger = logging.getLogger(__name__) @@ -127,6 +128,9 @@ async def run_once( anomaly_model: str = "", anomaly_device: str = "cpu", anomaly_threshold: float = 0.75, + cybersec_model: str = "", + cybersec_device: str = "cpu", + cybersec_threshold: float = 0.60, ) -> dict[str, Any]: """Ingest all sources once, then submit matched entries if configured. @@ -170,6 +174,9 @@ async def run_once( if anomaly_model: await _run_scorer(db_path, anomaly_model, anomaly_device, threshold=anomaly_threshold) + if cybersec_model: + await _run_cybersec(db_path, cybersec_model, cybersec_device, threshold=cybersec_threshold) + return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s} @@ -183,19 +190,27 @@ async def scheduler_loop( anomaly_model: str = "", anomaly_device: str = "cpu", anomaly_threshold: float = 0.75, + cybersec_model: str = "", + cybersec_device: str = "cpu", + cybersec_threshold: float = 0.60, ) -> None: - """Run glean + optional submission + optional anomaly scoring every interval_s seconds.""" + """Run glean + optional submission + optional anomaly/cybersec scoring every interval_s seconds.""" logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file) if submit_endpoint: logger.info("Submission enabled — endpoint: %s", submit_endpoint) if anomaly_model: logger.info("Anomaly scoring enabled — model: %s", anomaly_model) + if cybersec_model: + logger.info("Cybersec scoring enabled — model: %s", cybersec_model) while True: await run_once( sources_file, db_path, pattern_file, submit_endpoint, source_host, anomaly_model=anomaly_model, anomaly_device=anomaly_device, anomaly_threshold=anomaly_threshold, + cybersec_model=cybersec_model, + cybersec_device=cybersec_device, + cybersec_threshold=cybersec_threshold, ) next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s) _state.next_run_at = next_run.isoformat() diff --git a/docker-compose.yml b/docker-compose.yml index d197bc1..2e064a4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,6 +47,10 @@ services: TURNSTONE_EMBED_BACKEND: ${TURNSTONE_EMBED_BACKEND:-} TURNSTONE_EMBED_MODEL: ${TURNSTONE_EMBED_MODEL:-} TURNSTONE_EMBED_DEVICE: ${TURNSTONE_EMBED_DEVICE:-cpu} + # --- Cybersec scoring pipeline --- + TURNSTONE_CYBERSEC_MODEL: ${TURNSTONE_CYBERSEC_MODEL:-} + TURNSTONE_CYBERSEC_DEVICE: ${TURNSTONE_CYBERSEC_DEVICE:-cpu} + TURNSTONE_CYBERSEC_THRESHOLD: ${TURNSTONE_CYBERSEC_THRESHOLD:-0.60} # --- Anomaly scoring pipeline --- TURNSTONE_ANOMALY_MODEL: ${TURNSTONE_ANOMALY_MODEL:-} TURNSTONE_ANOMALY_DEVICE: ${TURNSTONE_ANOMALY_DEVICE:-cpu} diff --git a/docker-standalone.sh b/docker-standalone.sh index b368c57..3024627 100755 --- a/docker-standalone.sh +++ b/docker-standalone.sh @@ -147,6 +147,9 @@ docker run -d \ -e TURNSTONE_EMBED_BACKEND="${TURNSTONE_EMBED_BACKEND:-sentence_transformers}" \ -e TURNSTONE_EMBED_MODEL="${TURNSTONE_EMBED_MODEL:-sentence-transformers/all-MiniLM-L6-v2}" \ -e TURNSTONE_EMBED_DEVICE="${TURNSTONE_EMBED_DEVICE:-cpu}" \ + -e TURNSTONE_CYBERSEC_MODEL="${TURNSTONE_CYBERSEC_MODEL:-}" \ + -e TURNSTONE_CYBERSEC_DEVICE="${TURNSTONE_CYBERSEC_DEVICE:-cpu}" \ + -e TURNSTONE_CYBERSEC_THRESHOLD="${TURNSTONE_CYBERSEC_THRESHOLD:-0.60}" \ -e TURNSTONE_ANOMALY_MODEL="${TURNSTONE_ANOMALY_MODEL:-}" \ -e TURNSTONE_ANOMALY_DEVICE="${TURNSTONE_ANOMALY_DEVICE:-cpu}" \ -e TURNSTONE_ANOMALY_THRESHOLD="${TURNSTONE_ANOMALY_THRESHOLD:-0.75}" \ diff --git a/tests/test_cybersec.py b/tests/test_cybersec.py new file mode 100644 index 0000000..8f4f99a --- /dev/null +++ b/tests/test_cybersec.py @@ -0,0 +1,233 @@ +"""Tests for the cybersec zero-shot scoring pipeline.""" +from __future__ import annotations + +import sqlite3 +import tempfile +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from app.db.schema import ensure_schema +from app.services.cybersec import ( + CybersecResult, + CYBERSEC_LABELS, + _NORMAL_LABEL, + reset_pipeline, + score_security_entries, + list_cybersec_detections, +) +import app.services.cybersec as cybersec_mod + + +@pytest.fixture(autouse=True) +def _reset(tmp_path): + reset_pipeline() + yield + reset_pipeline() + + +@pytest.fixture +def db(tmp_path) -> Path: + path = tmp_path / "test.db" + ensure_schema(path) + return path + + +def _insert_entry(db: Path, entry_id: str, text: str, + anomaly_label: str | None = None, + matched_patterns: str = "[]") -> None: + with sqlite3.connect(db) as conn: + conn.execute( + """INSERT OR IGNORE INTO log_entries + (id, tenant_id, source_id, sequence, ingest_time, text, + anomaly_label, matched_patterns) + VALUES (?, '', 'test-src', 1, '2026-01-01T00:00:00Z', ?, ?, ?)""", + (entry_id, text, anomaly_label, matched_patterns), + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# No model configured → skipped +# --------------------------------------------------------------------------- + +def test_no_model_returns_skipped(db): + result = score_security_entries(db, model_id="") + assert result.skipped is True + assert result.scored == 0 + + +# --------------------------------------------------------------------------- +# No eligible entries → skipped +# --------------------------------------------------------------------------- + +def test_no_eligible_entries_skipped(db): + _insert_entry(db, "e1", "Started nginx.service", anomaly_label=None, matched_patterns="[]") + mock_pipe = MagicMock(return_value=[{"labels": [_NORMAL_LABEL], "scores": [0.99]}]) + monkeypatch = pytest.MonkeyPatch() + monkeypatch.setattr(cybersec_mod, "_pipeline", mock_pipe) + result = score_security_entries(db, model_id="fake-model") + assert result.skipped is True + monkeypatch.undo() + + +# --------------------------------------------------------------------------- +# Security entry gets scored +# --------------------------------------------------------------------------- + +def test_security_entry_scored(db, monkeypatch): + _insert_entry(db, "e1", + "Failed password for root from 192.168.1.1 port 22 ssh2", + anomaly_label="SECURITY_ANOMALY") + + mock_pipe = MagicMock(return_value=[{ + "labels": ["authentication failure or brute force attack", _NORMAL_LABEL], + "scores": [0.85, 0.15], + }]) + monkeypatch.setattr(cybersec_mod, "_pipeline", mock_pipe) + + result = score_security_entries(db, model_id="fake-model", threshold=0.70) + assert result.scored == 1 + assert result.detections == 1 + assert result.error is None + + with sqlite3.connect(db) as conn: + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT ml_score, ml_label, ml_scored_at FROM log_entries WHERE id='e1'").fetchone() + assert row["ml_score"] == pytest.approx(0.85) + assert row["ml_label"] == "authentication failure or brute force attack" + assert row["ml_scored_at"] is not None + + +# --------------------------------------------------------------------------- +# Detection created above threshold +# --------------------------------------------------------------------------- + +def test_detection_inserted_above_threshold(db, monkeypatch): + _insert_entry(db, "e1", "sudo: authentication failure", anomaly_label="ERROR") + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": ["privilege escalation or unauthorized access", _NORMAL_LABEL], + "scores": [0.75, 0.25], + }])) + + score_security_entries(db, model_id="fake-model", threshold=0.60) + + with sqlite3.connect(db) as conn: + conn.row_factory = sqlite3.Row + dets = conn.execute("SELECT * FROM detections WHERE scorer='cybersec'").fetchall() + assert len(dets) == 1 + assert dets[0]["anomaly_label"] == "privilege escalation or unauthorized access" + assert dets[0]["severity"] == "CRITICAL" + + +# --------------------------------------------------------------------------- +# Normal label → no detection even above score threshold +# --------------------------------------------------------------------------- + +def test_normal_label_no_detection(db, monkeypatch): + _insert_entry(db, "e1", "Started nginx.service", anomaly_label="INFO", + matched_patterns='["service_start"]') + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": [_NORMAL_LABEL, "network intrusion or port scan"], + "scores": [0.95, 0.05], + }])) + + result = score_security_entries(db, model_id="fake-model", threshold=0.60) + assert result.detections == 0 + + +# --------------------------------------------------------------------------- +# Below threshold → scored but no detection +# --------------------------------------------------------------------------- + +def test_below_threshold_no_detection(db, monkeypatch): + _insert_entry(db, "e1", "Some suspicious text", anomaly_label="WARN") + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": ["network intrusion or port scan", _NORMAL_LABEL], + "scores": [0.45, 0.55], + }])) + + result = score_security_entries(db, model_id="fake-model", threshold=0.60) + assert result.scored == 1 + assert result.detections == 0 + + +# --------------------------------------------------------------------------- +# Pattern-matched entry (not anomaly-flagged) still gets scored +# --------------------------------------------------------------------------- + +def test_pattern_matched_entry_scored(db, monkeypatch): + _insert_entry(db, "e1", "SSH port forwarding conflict detected", + anomaly_label=None, + matched_patterns='["ssh_forward_conflict"]') + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": ["network intrusion or port scan", _NORMAL_LABEL], + "scores": [0.70, 0.30], + }])) + + result = score_security_entries(db, model_id="fake-model", threshold=0.60) + assert result.scored == 1 + assert result.detections == 1 + + +# --------------------------------------------------------------------------- +# Idempotency — re-run finds nothing unscored +# --------------------------------------------------------------------------- + +def test_idempotent_rerun(db, monkeypatch): + _insert_entry(db, "e1", "Failed login", anomaly_label="ERROR") + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": ["authentication failure or brute force attack"], + "scores": [0.80], + }])) + + score_security_entries(db, model_id="fake-model", threshold=0.60) + result2 = score_security_entries(db, model_id="fake-model", threshold=0.60) + assert result2.skipped is True + + +# --------------------------------------------------------------------------- +# list_cybersec_detections filters to scorer='cybersec' +# --------------------------------------------------------------------------- + +def test_list_cybersec_detections(db, monkeypatch): + _insert_entry(db, "e1", "Failed login", anomaly_label="ERROR") + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": ["authentication failure or brute force attack"], + "scores": [0.90], + }])) + score_security_entries(db, model_id="fake-model", threshold=0.60) + + rows = list_cybersec_detections(db) + assert len(rows) == 1 + assert rows[0]["scorer"] == "cybersec" + + +# --------------------------------------------------------------------------- +# list_detections scorer filter (anomaly service) +# --------------------------------------------------------------------------- + +def test_list_detections_scorer_filter(db, monkeypatch): + from app.services.anomaly import list_detections + _insert_entry(db, "e1", "Failed login", anomaly_label="ERROR") + + monkeypatch.setattr(cybersec_mod, "_pipeline", MagicMock(return_value=[{ + "labels": ["authentication failure or brute force attack"], + "scores": [0.90], + }])) + score_security_entries(db, model_id="fake-model", threshold=0.60) + + all_dets = list_detections(db) + cybersec_dets = list_detections(db, scorer="cybersec") + anomaly_dets = list_detections(db, scorer="anomaly") + + assert len(cybersec_dets) == 1 + assert len(anomaly_dets) == 0 + assert len(all_dets) >= 1 diff --git a/web/src/views/SecurityAlertsView.vue b/web/src/views/SecurityAlertsView.vue index 7ac5361..5f71189 100644 --- a/web/src/views/SecurityAlertsView.vue +++ b/web/src/views/SecurityAlertsView.vue @@ -29,6 +29,20 @@ {{ scorerStatus.running ? 'scoring…' : scorerStatus.enabled ? 'scorer ready' : 'scorer off' }} + + + {{ cybersecStatus.enabled ? 'cybersec on' : 'cybersec off' }} + +