"""Schema creation and idempotent migrations for all Turnstone databases. Three logical databases (main, context, incidents) map to: - SQLite: three separate .db files (avoids write-lock contention) - Postgres: three table-groups in one physical DB (row-level locking makes separation unnecessary) All ensure_* functions are idempotent: safe to call on every startup. """ from __future__ import annotations import logging import sqlite3 from pathlib import Path from app.db.backend import BACKEND, Backend from app.db.conn import get_conn logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # SQLite DDL — kept as executescript strings (SQLite only) # --------------------------------------------------------------------------- _MAIN_SCHEMA_SQLITE = """ CREATE TABLE IF NOT EXISTS log_entries ( id TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT '', source_id TEXT NOT NULL, sequence INTEGER NOT NULL, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL, anomaly_score REAL, anomaly_label TEXT, anomaly_scored_at TEXT, ml_score REAL, ml_label TEXT, ml_scored_at TEXT, PRIMARY KEY (tenant_id, id) ); CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id); CREATE INDEX IF NOT EXISTS idx_tenant_src ON log_entries(tenant_id, source_id); CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso); CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count); CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); CREATE INDEX IF NOT EXISTS idx_anomaly ON log_entries(tenant_id, anomaly_score); CREATE INDEX IF NOT EXISTS idx_ml_scored ON log_entries(tenant_id, ml_scored_at); CREATE TABLE IF NOT EXISTS detections ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', entry_id TEXT NOT NULL, source_id TEXT NOT NULL, anomaly_label TEXT NOT NULL, anomaly_score REAL NOT NULL, severity TEXT NOT NULL, text TEXT NOT NULL, timestamp_iso TEXT, detected_at TEXT NOT NULL, acknowledged INTEGER NOT NULL DEFAULT 0, acknowledged_at TEXT, notes TEXT NOT NULL DEFAULT '', scorer TEXT NOT NULL DEFAULT 'anomaly' ); CREATE INDEX IF NOT EXISTS idx_detections_tenant ON detections(tenant_id, detected_at); CREATE INDEX IF NOT EXISTS idx_detections_ack ON detections(acknowledged); CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(anomaly_label); CREATE INDEX IF NOT EXISTS idx_detections_entry ON detections(entry_id); CREATE INDEX IF NOT EXISTS idx_detections_scorer ON detections(scorer); CREATE TABLE IF NOT EXISTS glean_fingerprints ( tenant_id TEXT NOT NULL DEFAULT '', path TEXT NOT NULL, mtime REAL NOT NULL, size INTEGER NOT NULL, gleaned_at TEXT NOT NULL, PRIMARY KEY (tenant_id, path) ); CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ); CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); CREATE INDEX IF NOT EXISTS idx_incidents_tenant ON incidents(tenant_id); CREATE TABLE IF NOT EXISTS received_bundles ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', source_host TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium', started_at TEXT, bundled_at TEXT NOT NULL, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); CREATE TABLE IF NOT EXISTS sent_bundles ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', incident_id TEXT NOT NULL, exported_at TEXT NOT NULL, sanitized INTEGER NOT NULL DEFAULT 0, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id); CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at); CREATE TABLE IF NOT EXISTS blocklist_candidates ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', domain_or_ip TEXT NOT NULL, source_device_ip TEXT, source_device_name TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, hit_count INTEGER DEFAULT 1, status TEXT DEFAULT 'pending', pushed_at TEXT, log_evidence TEXT DEFAULT '[]', matched_rule TEXT, llm_score REAL, llm_reason TEXT ); CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip); CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status); CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip); CREATE INDEX IF NOT EXISTS idx_blocklist_tenant ON blocklist_candidates(tenant_id); """ _CONTEXT_SCHEMA_SQLITE = """ CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category); CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key); CREATE INDEX IF NOT EXISTS idx_facts_tenant ON context_facts(tenant_id); CREATE TABLE IF NOT EXISTS context_documents ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', filename TEXT NOT NULL, doc_type TEXT NOT NULL, full_text TEXT NOT NULL, file_size INTEGER, uploaded_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_docs_tenant ON context_documents(tenant_id); CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB ); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); CREATE INDEX IF NOT EXISTS idx_chunks_tenant ON context_chunks(tenant_id); """ # --------------------------------------------------------------------------- # Postgres DDL — executed statement-by-statement # --------------------------------------------------------------------------- _MAIN_SCHEMA_PG_STMTS = [ """ CREATE TABLE IF NOT EXISTS log_entries ( id TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT '', source_id TEXT NOT NULL, sequence INTEGER NOT NULL, timestamp_raw TEXT, timestamp_iso TEXT, ingest_time TEXT NOT NULL, severity TEXT, repeat_count INTEGER DEFAULT 1, out_of_order INTEGER DEFAULT 0, matched_patterns TEXT DEFAULT '[]', text TEXT NOT NULL, text_tsv tsvector, anomaly_score DOUBLE PRECISION, anomaly_label TEXT, anomaly_scored_at TEXT, ml_score DOUBLE PRECISION, ml_label TEXT, ml_scored_at TEXT, PRIMARY KEY (tenant_id, id) ) """, "CREATE INDEX IF NOT EXISTS idx_tenant_src ON log_entries(tenant_id, source_id)", "CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso)", "CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(tenant_id, severity)", "CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns)", "CREATE INDEX IF NOT EXISTS idx_fts_gin ON log_entries USING GIN(text_tsv)", "CREATE INDEX IF NOT EXISTS idx_anomaly ON log_entries(tenant_id, anomaly_score)", "CREATE INDEX IF NOT EXISTS idx_ml_scored ON log_entries(tenant_id, ml_scored_at)", """ CREATE TABLE IF NOT EXISTS detections ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', entry_id TEXT NOT NULL, source_id TEXT NOT NULL, anomaly_label TEXT NOT NULL, anomaly_score DOUBLE PRECISION NOT NULL, severity TEXT NOT NULL, text TEXT NOT NULL, timestamp_iso TEXT, detected_at TEXT NOT NULL, acknowledged INTEGER NOT NULL DEFAULT 0, acknowledged_at TEXT, notes TEXT NOT NULL DEFAULT '', scorer TEXT NOT NULL DEFAULT 'anomaly' ) """, "CREATE INDEX IF NOT EXISTS idx_detections_tenant ON detections(tenant_id, detected_at)", "CREATE INDEX IF NOT EXISTS idx_detections_ack ON detections(acknowledged)", "CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(anomaly_label)", "CREATE INDEX IF NOT EXISTS idx_detections_entry ON detections(entry_id)", "CREATE INDEX IF NOT EXISTS idx_detections_scorer ON detections(scorer)", """ CREATE OR REPLACE FUNCTION _ts_update_text_tsv() RETURNS trigger AS $$ BEGIN NEW.text_tsv := to_tsvector('english', COALESCE(NEW.text, '')); RETURN NEW; END; $$ LANGUAGE plpgsql """, """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM pg_trigger WHERE tgname = 'trig_log_entries_tsv' ) THEN CREATE TRIGGER trig_log_entries_tsv BEFORE INSERT OR UPDATE OF text ON log_entries FOR EACH ROW EXECUTE FUNCTION _ts_update_text_tsv(); END IF; END $$ """, """ CREATE TABLE IF NOT EXISTS glean_fingerprints ( tenant_id TEXT NOT NULL DEFAULT '', path TEXT NOT NULL, mtime DOUBLE PRECISION NOT NULL, size BIGINT NOT NULL, gleaned_at TEXT NOT NULL, PRIMARY KEY (tenant_id, path) ) """, """ CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', started_at TEXT, ended_at TEXT, notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ) """, "CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at)", "CREATE INDEX IF NOT EXISTS idx_incidents_tenant ON incidents(tenant_id)", """ CREATE TABLE IF NOT EXISTS received_bundles ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', source_host TEXT NOT NULL, issue_type TEXT NOT NULL DEFAULT '', label TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium', started_at TEXT, bundled_at TEXT NOT NULL, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ) """, "CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at)", "CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type)", """ CREATE TABLE IF NOT EXISTS sent_bundles ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', incident_id TEXT NOT NULL, exported_at TEXT NOT NULL, sanitized INTEGER NOT NULL DEFAULT 0, entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ) """, "CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id)", "CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at)", """ CREATE TABLE IF NOT EXISTS blocklist_candidates ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', domain_or_ip TEXT NOT NULL, source_device_ip TEXT, source_device_name TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, hit_count INTEGER DEFAULT 1, status TEXT DEFAULT 'pending', pushed_at TEXT, log_evidence TEXT DEFAULT '[]', matched_rule TEXT, llm_score DOUBLE PRECISION, llm_reason TEXT ) """, "CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip)", "CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status)", "CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip)", "CREATE INDEX IF NOT EXISTS idx_blocklist_tenant ON blocklist_candidates(tenant_id)", ] _CONTEXT_SCHEMA_PG_STMTS = [ """ CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', category TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, source TEXT, created_at TEXT NOT NULL ) """, "CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category)", "CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key)", "CREATE INDEX IF NOT EXISTS idx_facts_tenant ON context_facts(tenant_id)", """ CREATE TABLE IF NOT EXISTS context_documents ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', filename TEXT NOT NULL, doc_type TEXT NOT NULL, full_text TEXT NOT NULL, file_size BIGINT, uploaded_at TEXT NOT NULL ) """, "CREATE INDEX IF NOT EXISTS idx_docs_tenant ON context_documents(tenant_id)", """ CREATE TABLE IF NOT EXISTS context_chunks ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BYTEA ) """, "CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id)", "CREATE INDEX IF NOT EXISTS idx_chunks_tenant ON context_chunks(tenant_id)", ] # --------------------------------------------------------------------------- # SQLite additive column migrations — applied after CREATE TABLE on every boot # --------------------------------------------------------------------------- _MAIN_MIGRATIONS_SQLITE = [ "ALTER TABLE log_entries ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''", "ALTER TABLE incidents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE received_bundles ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE sent_bundles ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE blocklist_candidates ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE glean_fingerprints ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE glean_fingerprints ADD COLUMN mtime REAL", "ALTER TABLE glean_fingerprints ADD COLUMN size INTEGER", "ALTER TABLE glean_fingerprints ADD COLUMN gleaned_at TEXT", "ALTER TABLE log_entries ADD COLUMN anomaly_score REAL", "ALTER TABLE log_entries ADD COLUMN anomaly_label TEXT", "ALTER TABLE log_entries ADD COLUMN anomaly_scored_at TEXT", "ALTER TABLE log_entries ADD COLUMN ml_score REAL", "ALTER TABLE log_entries ADD COLUMN ml_label TEXT", "ALTER TABLE log_entries ADD COLUMN ml_scored_at TEXT", "ALTER TABLE detections ADD COLUMN scorer TEXT NOT NULL DEFAULT 'anomaly'", ] _CONTEXT_MIGRATIONS_SQLITE = [ "ALTER TABLE context_facts ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE context_documents ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", "ALTER TABLE context_chunks ADD COLUMN tenant_id TEXT NOT NULL DEFAULT ''", ] def _run_sqlite_migrations(conn: sqlite3.Connection, stmts: list[str]) -> None: for stmt in stmts: try: conn.execute(stmt) except sqlite3.OperationalError: pass # column already exists or table not present yet — both are fine def _run_pg_stmts(stmts: list[str]) -> None: """Execute Postgres DDL statements — each in its own transaction for IF NOT EXISTS safety.""" from psycopg import connect as pg_connect # type: ignore[import] import os url = os.environ["DATABASE_URL"] with pg_connect(url, autocommit=True) as conn: for stmt in stmts: stripped = stmt.strip() if stripped: conn.execute(stripped) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def ensure_schema(db_path: Path) -> None: """Ensure main log/incidents/blocklist tables exist. Idempotent.""" if BACKEND == Backend.POSTGRES: _run_pg_stmts(_MAIN_SCHEMA_PG_STMTS) logger.debug("Postgres main schema verified") return conn = sqlite3.connect(str(db_path), timeout=30.0) conn.execute("PRAGMA journal_mode=WAL") # Migrations first: add tenant_id to existing tables BEFORE index creation touches it _run_sqlite_migrations(conn, _MAIN_MIGRATIONS_SQLITE) conn.commit() conn.executescript(_MAIN_SCHEMA_SQLITE) conn.close() logger.debug("SQLite main schema verified at %s", db_path) def ensure_context_schema(db_path: Path) -> None: """Ensure context KB tables exist. Idempotent.""" if BACKEND == Backend.POSTGRES: _run_pg_stmts(_CONTEXT_SCHEMA_PG_STMTS) logger.debug("Postgres context schema verified") return conn = sqlite3.connect(str(db_path), timeout=30.0) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") _run_sqlite_migrations(conn, _CONTEXT_MIGRATIONS_SQLITE) conn.commit() conn.executescript(_CONTEXT_SCHEMA_SQLITE) conn.close() logger.debug("SQLite context schema verified at %s", db_path) def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int: """One-shot migration: copy incidents/bundles rows from main DB to incidents DB. Safe to call on every startup — rows already in incidents_db are skipped. No-op for Postgres (single DB, no migration needed). """ if BACKEND == Backend.POSTGRES: return 0 src = sqlite3.connect(str(main_db), timeout=30.0) src.row_factory = sqlite3.Row dst = sqlite3.connect(str(incidents_db), timeout=30.0) migrated = 0 for table in ("incidents", "received_bundles", "sent_bundles"): try: rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608 except sqlite3.OperationalError: continue if not rows: continue cols = ", ".join(rows[0].keys()) placeholders = ", ".join("?" * len(rows[0].keys())) dst.executemany( f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608 [tuple(r) for r in rows], ) migrated += len(rows) dst.commit() src.close() dst.close() return migrated def ensure_incidents_schema(db_path: Path) -> None: """Ensure incidents/bundles tables exist. Idempotent. For Postgres, incidents live in the same DB as log_entries (already created by ensure_schema), so this is a no-op — the tables were created above. """ if BACKEND == Backend.POSTGRES: return conn = sqlite3.connect(str(db_path), timeout=30.0) conn.execute("PRAGMA journal_mode=WAL") _run_sqlite_migrations(conn, _MAIN_MIGRATIONS_SQLITE) conn.commit() conn.executescript(_MAIN_SCHEMA_SQLITE) conn.close() logger.debug("SQLite incidents schema verified at %s", db_path)