turnstone/app/db/schema.py
pyr0ball 01f0e45222 feat: anomaly scoring pipeline (#10)
- Add app/services/anomaly.py: batch scorer using HF text-classification
  pipeline; rewrites anomaly_score/anomaly_label/anomaly_scored_at on
  log_entries; inserts high-confidence hits into detections table
- Add app/tasks/anomaly_scorer.py: background task (same shape as
  glean_scheduler); triggered after each glean cycle when
  TURNSTONE_ANOMALY_MODEL is set
- DB schema: add anomaly_score/anomaly_label/anomaly_scored_at columns to
  log_entries (idempotent ALTER TABLE migration); add detections table
- Wire scorer into scheduler_loop and glean_scheduler.run_once; no-op when
  model env var is empty (safe to leave unconfigured)
- REST endpoints: GET/POST /api/anomaly/status, /api/anomaly/run,
  GET /api/anomaly/detections, POST /api/anomaly/detections/{id}/acknowledge
- Reuses Hybrid-BERT label map from diagnose/classifier.py; works with any
  HF text-classification model
- 12 new tests; 406/406 passing

Closes: #10
2026-06-09 11:15:13 -07:00

506 lines
20 KiB
Python

"""Schema creation and idempotent migrations for all Turnstone databases.
Three logical databases (main, context, incidents) map to:
- SQLite: three separate .db files (avoids write-lock contention)
- Postgres: three table-groups in one physical DB (row-level locking makes separation unnecessary)
All ensure_* functions are idempotent: safe to call on every startup.
"""
from __future__ import annotations
import logging
import sqlite3
from pathlib import Path
from app.db.backend import BACKEND, Backend
from app.db.conn import get_conn
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# SQLite DDL — kept as executescript strings (SQLite only)
# ---------------------------------------------------------------------------
_MAIN_SCHEMA_SQLITE = """
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT NOT NULL,
tenant_id TEXT NOT NULL DEFAULT '',
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL,
anomaly_score REAL,
anomaly_label TEXT,
anomaly_scored_at TEXT,
PRIMARY KEY (tenant_id, id)
);
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
CREATE INDEX IF NOT EXISTS idx_tenant_src ON log_entries(tenant_id, source_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity);
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
CREATE INDEX IF NOT EXISTS idx_anomaly ON log_entries(tenant_id, anomaly_score);
CREATE TABLE IF NOT EXISTS detections (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
entry_id TEXT NOT NULL,
source_id TEXT NOT NULL,
anomaly_label TEXT NOT NULL,
anomaly_score REAL NOT NULL,
severity TEXT NOT NULL,
text TEXT NOT NULL,
timestamp_iso TEXT,
detected_at TEXT NOT NULL,
acknowledged INTEGER NOT NULL DEFAULT 0,
acknowledged_at TEXT,
notes TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_detections_tenant ON detections(tenant_id, detected_at);
CREATE INDEX IF NOT EXISTS idx_detections_ack ON detections(acknowledged);
CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(anomaly_label);
CREATE INDEX IF NOT EXISTS idx_detections_entry ON detections(entry_id);
CREATE TABLE IF NOT EXISTS glean_fingerprints (
tenant_id TEXT NOT NULL DEFAULT '',
path TEXT NOT NULL,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
gleaned_at TEXT NOT NULL,
PRIMARY KEY (tenant_id, path)
);
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
);
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
CREATE INDEX IF NOT EXISTS idx_incidents_tenant ON incidents(tenant_id);
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score REAL,
llm_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_tenant ON blocklist_candidates(tenant_id);
"""
_CONTEXT_SCHEMA_SQLITE = """
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE INDEX IF NOT EXISTS idx_facts_tenant ON context_facts(tenant_id);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_docs_tenant ON context_documents(tenant_id);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
CREATE INDEX IF NOT EXISTS idx_chunks_tenant ON context_chunks(tenant_id);
"""
# ---------------------------------------------------------------------------
# Postgres DDL — executed statement-by-statement
# ---------------------------------------------------------------------------
_MAIN_SCHEMA_PG_STMTS = [
"""
CREATE TABLE IF NOT EXISTS log_entries (
id TEXT NOT NULL,
tenant_id TEXT NOT NULL DEFAULT '',
source_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
timestamp_raw TEXT,
timestamp_iso TEXT,
ingest_time TEXT NOT NULL,
severity TEXT,
repeat_count INTEGER DEFAULT 1,
out_of_order INTEGER DEFAULT 0,
matched_patterns TEXT DEFAULT '[]',
text TEXT NOT NULL,
text_tsv tsvector,
anomaly_score DOUBLE PRECISION,
anomaly_label TEXT,
anomaly_scored_at TEXT,
PRIMARY KEY (tenant_id, id)
)
""",
"CREATE INDEX IF NOT EXISTS idx_tenant_src ON log_entries(tenant_id, source_id)",
"CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso)",
"CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity)",
"CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns)",
"CREATE INDEX IF NOT EXISTS idx_fts_gin ON log_entries USING GIN(text_tsv)",
"CREATE INDEX IF NOT EXISTS idx_anomaly ON log_entries(tenant_id, anomaly_score)",
"""
CREATE TABLE IF NOT EXISTS detections (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
entry_id TEXT NOT NULL,
source_id TEXT NOT NULL,
anomaly_label TEXT NOT NULL,
anomaly_score DOUBLE PRECISION NOT NULL,
severity TEXT NOT NULL,
text TEXT NOT NULL,
timestamp_iso TEXT,
detected_at TEXT NOT NULL,
acknowledged INTEGER NOT NULL DEFAULT 0,
acknowledged_at TEXT,
notes TEXT NOT NULL DEFAULT ''
)
""",
"CREATE INDEX IF NOT EXISTS idx_detections_tenant ON detections(tenant_id, detected_at)",
"CREATE INDEX IF NOT EXISTS idx_detections_ack ON detections(acknowledged)",
"CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(anomaly_label)",
"CREATE INDEX IF NOT EXISTS idx_detections_entry ON detections(entry_id)",
"""
CREATE OR REPLACE FUNCTION _ts_update_text_tsv() RETURNS trigger AS $$
BEGIN
NEW.text_tsv := to_tsvector('english', COALESCE(NEW.text, ''));
RETURN NEW;
END;
$$ LANGUAGE plpgsql
""",
"""
DO $$ BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger WHERE tgname = 'trig_log_entries_tsv'
) THEN
CREATE TRIGGER trig_log_entries_tsv
BEFORE INSERT OR UPDATE OF text ON log_entries
FOR EACH ROW EXECUTE FUNCTION _ts_update_text_tsv();
END IF;
END $$
""",
"""
CREATE TABLE IF NOT EXISTS glean_fingerprints (
tenant_id TEXT NOT NULL DEFAULT '',
path TEXT NOT NULL,
mtime DOUBLE PRECISION NOT NULL,
size BIGINT NOT NULL,
gleaned_at TEXT NOT NULL,
PRIMARY KEY (tenant_id, path)
)
""",
"""
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
started_at TEXT,
ended_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium'
)
""",
"CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at)",
"CREATE INDEX IF NOT EXISTS idx_incidents_tenant ON incidents(tenant_id)",
"""
CREATE TABLE IF NOT EXISTS received_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
source_host TEXT NOT NULL,
issue_type TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'medium',
started_at TEXT,
bundled_at TEXT NOT NULL,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at)",
"CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type)",
"""
CREATE TABLE IF NOT EXISTS sent_bundles (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
incident_id TEXT NOT NULL,
exported_at TEXT NOT NULL,
sanitized INTEGER NOT NULL DEFAULT 0,
entry_count INTEGER NOT NULL DEFAULT 0,
bundle_json TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id)",
"CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at)",
"""
CREATE TABLE IF NOT EXISTS blocklist_candidates (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
domain_or_ip TEXT NOT NULL,
source_device_ip TEXT,
source_device_name TEXT,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
hit_count INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
pushed_at TEXT,
log_evidence TEXT DEFAULT '[]',
matched_rule TEXT,
llm_score DOUBLE PRECISION,
llm_reason TEXT
)
""",
"CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip)",
"CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status)",
"CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip)",
"CREATE INDEX IF NOT EXISTS idx_blocklist_tenant ON blocklist_candidates(tenant_id)",
]
_CONTEXT_SCHEMA_PG_STMTS = [
"""
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category)",
"CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key)",
"CREATE INDEX IF NOT EXISTS idx_facts_tenant ON context_facts(tenant_id)",
"""
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size BIGINT,
uploaded_at TEXT NOT NULL
)
""",
"CREATE INDEX IF NOT EXISTS idx_docs_tenant ON context_documents(tenant_id)",
"""
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL DEFAULT '',
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BYTEA
)
""",
"CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id)",
"CREATE INDEX IF NOT EXISTS idx_chunks_tenant ON context_chunks(tenant_id)",
]
# ---------------------------------------------------------------------------
# SQLite additive column migrations — applied after CREATE TABLE on every boot
# ---------------------------------------------------------------------------
_MAIN_MIGRATIONS_SQLITE = [
"ALTER TABLE log_entries ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
"ALTER TABLE incidents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE received_bundles ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE sent_bundles ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE blocklist_candidates ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE glean_fingerprints ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE glean_fingerprints ADD COLUMN mtime REAL",
"ALTER TABLE glean_fingerprints ADD COLUMN size INTEGER",
"ALTER TABLE glean_fingerprints ADD COLUMN gleaned_at TEXT",
"ALTER TABLE log_entries ADD COLUMN anomaly_score REAL",
"ALTER TABLE log_entries ADD COLUMN anomaly_label TEXT",
"ALTER TABLE log_entries ADD COLUMN anomaly_scored_at TEXT",
]
_CONTEXT_MIGRATIONS_SQLITE = [
"ALTER TABLE context_facts ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE context_documents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
"ALTER TABLE context_chunks ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''",
]
def _run_sqlite_migrations(conn: sqlite3.Connection, stmts: list[str]) -> None:
for stmt in stmts:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # column already exists or table not present yet — both are fine
def _run_pg_stmts(stmts: list[str]) -> None:
"""Execute Postgres DDL statements — each in its own transaction for IF NOT EXISTS safety."""
from psycopg import connect as pg_connect # type: ignore[import]
import os
url = os.environ["DATABASE_URL"]
with pg_connect(url, autocommit=True) as conn:
for stmt in stmts:
stripped = stmt.strip()
if stripped:
conn.execute(stripped)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def ensure_schema(db_path: Path) -> None:
"""Ensure main log/incidents/blocklist tables exist. Idempotent."""
if BACKEND == Backend.POSTGRES:
_run_pg_stmts(_MAIN_SCHEMA_PG_STMTS)
logger.debug("Postgres main schema verified")
return
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
# Migrations first: add tenant_id to existing tables BEFORE index creation touches it
_run_sqlite_migrations(conn, _MAIN_MIGRATIONS_SQLITE)
conn.commit()
conn.executescript(_MAIN_SCHEMA_SQLITE)
conn.close()
logger.debug("SQLite main schema verified at %s", db_path)
def ensure_context_schema(db_path: Path) -> None:
"""Ensure context KB tables exist. Idempotent."""
if BACKEND == Backend.POSTGRES:
_run_pg_stmts(_CONTEXT_SCHEMA_PG_STMTS)
logger.debug("Postgres context schema verified")
return
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
_run_sqlite_migrations(conn, _CONTEXT_MIGRATIONS_SQLITE)
conn.commit()
conn.executescript(_CONTEXT_SCHEMA_SQLITE)
conn.close()
logger.debug("SQLite context schema verified at %s", db_path)
def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int:
"""One-shot migration: copy incidents/bundles rows from main DB to incidents DB.
Safe to call on every startup — rows already in incidents_db are skipped.
No-op for Postgres (single DB, no migration needed).
"""
if BACKEND == Backend.POSTGRES:
return 0
src = sqlite3.connect(str(main_db), timeout=30.0)
src.row_factory = sqlite3.Row
dst = sqlite3.connect(str(incidents_db), timeout=30.0)
migrated = 0
for table in ("incidents", "received_bundles", "sent_bundles"):
try:
rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
except sqlite3.OperationalError:
continue
if not rows:
continue
cols = ", ".join(rows[0].keys())
placeholders = ", ".join("?" * len(rows[0].keys()))
dst.executemany(
f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608
[tuple(r) for r in rows],
)
migrated += len(rows)
dst.commit()
src.close()
dst.close()
return migrated
def ensure_incidents_schema(db_path: Path) -> None:
"""Ensure incidents/bundles tables exist. Idempotent.
For Postgres, incidents live in the same DB as log_entries (already created by
ensure_schema), so this is a no-op — the tables were created above.
"""
if BACKEND == Backend.POSTGRES:
return
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
_run_sqlite_migrations(conn, _MAIN_MIGRATIONS_SQLITE)
conn.commit()
conn.executescript(_MAIN_SCHEMA_SQLITE)
conn.close()
logger.debug("SQLite incidents schema verified at %s", db_path)