From da4db5d6b303b3ac76a754ca9a900fe137a94952 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 1 Jun 2026 15:54:23 -0700 Subject: [PATCH] fix: split incidents tables to dedicated turnstone-incidents.db (#60) FTS5 bulk-insert write locks starved the incident API and bundle endpoints during log bursts (sonarr/radarr, high-volume docker sources). Fix mirrors the context_facts split (context -> turnstone-context.db): - Add INCIDENTS_DB_PATH / TURNSTONE_INCIDENTS_DB env var in rest.py - Add _INCIDENTS_SCHEMA, ensure_incidents_schema(), and migrate_incidents_to_dedicated_db() in glean/pipeline.py - Stub out incidents/received_bundles/sent_bundles in _SCHEMA (no-op CREATE IF NOT EXISTS) so legacy single-file deployments still open - Thread incidents_db_path through diagnose_stream -> run_pipeline -> FalsePositiveSuppressor.suppress -> _fetch_resolved_incidents - One-shot migration on startup: copy existing rows from main DB to incidents DB via INSERT OR IGNORE (idempotent, safe to re-run) - Fix test_blocklist_endpoints fixtures to patch CONTEXT_DB_PATH and INCIDENTS_DB_PATH alongside DB_PATH (worktree has no data/ dir) 372 tests passing. Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/60 --- app/glean/pipeline.py | 100 ++++++++++++++++++++++++++-- app/rest.py | 44 +++++++----- app/services/diagnose/__init__.py | 2 + app/services/diagnose/pipeline.py | 4 +- app/services/diagnose/suppressor.py | 20 +++--- tests/test_blocklist_endpoints.py | 4 ++ 6 files changed, 140 insertions(+), 34 deletions(-) diff --git a/app/glean/pipeline.py b/app/glean/pipeline.py index 684219f..38bd0f1 100644 --- a/app/glean/pipeline.py +++ b/app/glean/pipeline.py @@ -46,6 +46,8 @@ CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_ CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity); CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns); +-- incidents tables moved to ensure_incidents_schema() / INCIDENTS_DB_PATH +-- kept here as no-ops so legacy single-file deployments still work CREATE TABLE IF NOT EXISTS incidents ( id TEXT PRIMARY KEY, label TEXT NOT NULL, @@ -56,8 +58,6 @@ CREATE TABLE IF NOT EXISTS incidents ( created_at TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'medium' ); -CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); - CREATE TABLE IF NOT EXISTS received_bundles ( id TEXT PRIMARY KEY, source_host TEXT NOT NULL, @@ -69,9 +69,6 @@ CREATE TABLE IF NOT EXISTS received_bundles ( entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); -CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); - CREATE TABLE IF NOT EXISTS sent_bundles ( id TEXT PRIMARY KEY, incident_id TEXT NOT NULL, @@ -80,8 +77,6 @@ CREATE TABLE IF NOT EXISTS sent_bundles ( entry_count INTEGER NOT NULL DEFAULT 0, bundle_json TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id); -CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at); -- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH -- kept here as no-ops so legacy single-file deployments still work @@ -206,6 +201,97 @@ def ensure_context_schema(db_path: Path) -> None: conn.close() +_INCIDENTS_SCHEMA = """ +CREATE TABLE IF NOT EXISTS incidents ( + id TEXT PRIMARY KEY, + label TEXT NOT NULL, + issue_type TEXT NOT NULL DEFAULT '', + started_at TEXT, + ended_at TEXT, + notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'medium' +); +CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at); + +CREATE TABLE IF NOT EXISTS received_bundles ( + id TEXT PRIMARY KEY, + source_host TEXT NOT NULL, + issue_type TEXT NOT NULL DEFAULT '', + label TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'medium', + started_at TEXT, + bundled_at TEXT NOT NULL, + entry_count INTEGER NOT NULL DEFAULT 0, + bundle_json TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); +CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); + +CREATE TABLE IF NOT EXISTS sent_bundles ( + id TEXT PRIMARY KEY, + incident_id TEXT NOT NULL, + exported_at TEXT NOT NULL, + sanitized INTEGER NOT NULL DEFAULT 0, + entry_count INTEGER NOT NULL DEFAULT 0, + bundle_json TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id); +CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at); +""" + + +def ensure_incidents_schema(db_path: Path) -> None: + """Create incidents tables in a dedicated database file. + + Using a separate file from the main log DB means incident writes never + contend with the FTS5 bulk-insert write lock held by the glean scheduler. + Mirrors the context_facts split (CONTEXT_DB_PATH / turnstone-context.db). + """ + conn = sqlite3.connect(str(db_path), timeout=30.0) + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(_INCIDENTS_SCHEMA) + for stmt in [ + "ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''", + ]: + try: + conn.execute(stmt) + except sqlite3.OperationalError: + pass + conn.commit() + conn.close() + + +def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int: + """One-shot migration: copy incidents/bundles rows from main DB to incidents DB. + + Safe to call on every startup — rows already present in incidents_db are + skipped via INSERT OR IGNORE. Returns the count of rows migrated. + """ + src = sqlite3.connect(str(main_db), timeout=30.0) + src.row_factory = sqlite3.Row + dst = sqlite3.connect(str(incidents_db), timeout=30.0) + migrated = 0 + for table in ("incidents", "received_bundles", "sent_bundles"): + try: + rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608 + except sqlite3.OperationalError: + continue + if not rows: + continue + cols = ", ".join(rows[0].keys()) + placeholders = ", ".join("?" * len(rows[0].keys())) + dst.executemany( + f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608 + [tuple(r) for r in rows], + ) + migrated += len(rows) + dst.commit() + src.close() + dst.close() + return migrated + + def _fingerprint(path: Path) -> tuple[float, int]: """Return (mtime, size) for a file — cheap identity check, no content read needed.""" st = path.stat() diff --git a/app/rest.py b/app/rest.py index b72c466..44ecd7b 100644 --- a/app/rest.py +++ b/app/rest.py @@ -35,7 +35,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel -from app.glean.pipeline import ensure_schema, ensure_context_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source +from app.glean.pipeline import ensure_schema, ensure_context_schema, ensure_incidents_schema, migrate_incidents_to_dedicated_db, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source from app.glean.base import load_compiled_patterns, now_iso from app.glean.tautulli import parse_webhook as _parse_tautulli from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh @@ -95,6 +95,11 @@ DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "da CONTEXT_DB_PATH = Path( os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db") ) +# Incidents get their own file so incident writes never block behind the FTS5 +# bulk-insert write lock held by the glean scheduler during log bursts. +INCIDENTS_DB_PATH = Path( + os.environ.get("TURNSTONE_INCIDENTS_DB", DB_PATH.parent / "turnstone-incidents.db") +) PREFS_PATH = DB_PATH.parent / "preferences.json" DIST_DIR = Path(__file__).parent.parent / "web" / "dist" SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") @@ -135,6 +140,12 @@ async def _lifespan(app: FastAPI): _audit_log.addHandler(h) ensure_schema(DB_PATH) ensure_context_schema(CONTEXT_DB_PATH) + ensure_incidents_schema(INCIDENTS_DB_PATH) + migrated = migrate_incidents_to_dedicated_db(DB_PATH, INCIDENTS_DB_PATH) + if migrated: + logging.getLogger(__name__).info( + "Migrated %d incident/bundle rows from main DB to incidents DB", migrated + ) _compiled_patterns = load_compiled_patterns(PATTERN_FILE) watch_cfg_path = PATTERN_DIR / "watch.yaml" configs = load_watch_config(watch_cfg_path) @@ -451,6 +462,7 @@ async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse: llm_model=prefs.get("llm_model") or None, llm_api_key=prefs.get("llm_api_key") or None, context_db_path=CONTEXT_DB_PATH, + incidents_db_path=INCIDENTS_DB_PATH, tech_level=prefs.get("tech_level", "sysadmin"), ): yield f"data: {json.dumps(event)}\n\n" @@ -924,7 +936,7 @@ def get_stats( @router.post("/api/incidents") def create_incident_endpoint(body: IncidentCreate) -> dict: incident = create_incident( - DB_PATH, + INCIDENTS_DB_PATH, label=body.label, issue_type=body.issue_type, started_at=body.started_at, @@ -937,15 +949,15 @@ def create_incident_endpoint(body: IncidentCreate) -> dict: @router.get("/api/incidents") def list_incidents_endpoint() -> dict: - return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]} + return {"incidents": [dataclasses.asdict(i) for i in list_incidents(INCIDENTS_DB_PATH)]} @router.get("/api/incidents/{incident_id}") def get_incident_endpoint(incident_id: str) -> dict: - incident = get_incident(DB_PATH, incident_id) + incident = get_incident(INCIDENTS_DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") - entries = get_incident_entries(DB_PATH, incident) + entries = get_incident_entries(INCIDENTS_DB_PATH, incident) return { **dataclasses.asdict(incident), "entries": [dataclasses.asdict(e) for e in entries], @@ -954,24 +966,24 @@ def get_incident_endpoint(incident_id: str) -> dict: @router.delete("/api/incidents/{incident_id}") def delete_incident_endpoint(incident_id: str) -> dict: - if not delete_incident(DB_PATH, incident_id): + if not delete_incident(INCIDENTS_DB_PATH, incident_id): raise HTTPException(status_code=404, detail="Incident not found") return {"deleted": incident_id} @router.get("/api/incidents/{incident_id}/bundle") def get_incident_bundle(incident_id: str, sanitize: bool = False) -> dict: - incident = get_incident(DB_PATH, incident_id) + incident = get_incident(INCIDENTS_DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") - bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize) - record_sent_bundle(DB_PATH, incident_id, bundle, sanitized=sanitize) + bundle = build_bundle(INCIDENTS_DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize) + record_sent_bundle(INCIDENTS_DB_PATH, incident_id, bundle, sanitized=sanitize) return bundle @router.get("/api/sent-bundles") def list_sent_bundles_endpoint() -> dict: - bundles = list_sent_bundles(DB_PATH) + bundles = list_sent_bundles(INCIDENTS_DB_PATH) return {"bundles": [dataclasses.asdict(b) for b in bundles]} @@ -979,11 +991,11 @@ def list_sent_bundles_endpoint() -> dict: def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict: if not BUNDLE_ENDPOINT: raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured") - incident = get_incident(DB_PATH, incident_id) + incident = get_incident(INCIDENTS_DB_PATH, incident_id) if not incident: raise HTTPException(status_code=404, detail="Incident not found") - bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize) - record_sent_bundle(DB_PATH, incident_id, bundle, sanitized=sanitize) + bundle = build_bundle(INCIDENTS_DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize) + record_sent_bundle(INCIDENTS_DB_PATH, incident_id, bundle, sanitized=sanitize) payload = json.dumps(bundle).encode() req = urllib.request.Request( BUNDLE_ENDPOINT, @@ -1002,19 +1014,19 @@ def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict: @router.post("/api/bundles") def receive_bundle(bundle: dict) -> dict: - record = store_bundle(DB_PATH, bundle) + record = store_bundle(INCIDENTS_DB_PATH, bundle) return {"id": record.id, "entry_count": record.entry_count} @router.get("/api/bundles") def list_bundles_endpoint() -> dict: - bundles = list_bundles(DB_PATH) + bundles = list_bundles(INCIDENTS_DB_PATH) return {"bundles": [dataclasses.asdict(b) for b in bundles]} @router.get("/api/bundles/{bundle_id}") def get_bundle_endpoint(bundle_id: str) -> dict: - bundle = get_bundle(DB_PATH, bundle_id) + bundle = get_bundle(INCIDENTS_DB_PATH, bundle_id) if not bundle: raise HTTPException(status_code=404, detail="Bundle not found") return dataclasses.asdict(bundle) diff --git a/app/services/diagnose/__init__.py b/app/services/diagnose/__init__.py index 195df24..78cf7a0 100644 --- a/app/services/diagnose/__init__.py +++ b/app/services/diagnose/__init__.py @@ -196,6 +196,7 @@ async def diagnose_stream( llm_model: str | None = None, llm_api_key: str | None = None, context_db_path: Path | None = None, + incidents_db_path: Path | None = None, tech_level: str = "sysadmin", ) -> AsyncGenerator[dict[str, Any], None]: """Async generator yielding SSE event dicts for the diagnose pipeline. @@ -318,6 +319,7 @@ async def diagnose_stream( llm_model=llm_model, llm_api_key=llm_api_key, tech_level=tech_level, + incidents_db_path=incidents_db_path, ): yield event return # pipeline emits its own "done" event diff --git a/app/services/diagnose/pipeline.py b/app/services/diagnose/pipeline.py index 63235ef..dbd7edd 100644 --- a/app/services/diagnose/pipeline.py +++ b/app/services/diagnose/pipeline.py @@ -38,6 +38,7 @@ async def run_pipeline( llm_model: str | None, llm_api_key: str | None, tech_level: str = "sysadmin", + incidents_db_path: Path | None = None, ) -> AsyncGenerator[dict[str, Any], None]: """Async generator that runs all 5 pipeline stages and yields SSE event dicts. @@ -124,9 +125,10 @@ async def run_pipeline( } # Stage 4: False-positive suppression + _incidents_db = incidents_db_path or db_path try: ranked = await asyncio.to_thread( - FalsePositiveSuppressor().suppress, hypotheses, db_path + FalsePositiveSuppressor().suppress, hypotheses, _incidents_db ) except Exception as exc: logger.exception("Stage 4 (suppressor) failed: %s", exc) diff --git a/app/services/diagnose/suppressor.py b/app/services/diagnose/suppressor.py index 9f1b016..37ac68e 100644 --- a/app/services/diagnose/suppressor.py +++ b/app/services/diagnose/suppressor.py @@ -65,14 +65,14 @@ except ImportError: # pragma: no cover # DB helpers # --------------------------------------------------------------------------- -def _fetch_resolved_incidents(db_path: Path) -> list[str]: - """Fetch resolved incident texts from SQLite. +def _fetch_resolved_incidents(incidents_db_path: Path) -> list[str]: + """Fetch resolved incident texts from the incidents database. Returns a list of non-empty combined strings for each resolved incident. Returns an empty list on any error (missing table, connection failure, etc.). """ try: - with sqlite3.connect(str(db_path), timeout=30.0) as conn: + with sqlite3.connect(str(incidents_db_path), timeout=30.0) as conn: cursor = conn.execute( "SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200" ) @@ -125,13 +125,13 @@ class FalsePositiveSuppressor: def suppress( self, hypotheses: list[Hypothesis], - db_path: Path, + incidents_db_path: Path, ) -> list[RankedHypothesis]: """Rank hypotheses by novelty, suppressing those matching resolved incidents. Args: hypotheses: Candidate hypotheses from Stage 3. - db_path: Path to the Turnstone SQLite database containing incidents. + incidents_db_path: Path to the dedicated incidents SQLite database. Returns: List of RankedHypothesis sorted by (novelty_score * confidence) descending. @@ -153,14 +153,14 @@ class FalsePositiveSuppressor: ) return self._passthrough(hypotheses) - # Fetch corpus texts from DB; fall back to passthrough if corpus is empty. - corpus_texts = _fetch_resolved_incidents(db_path) + # Fetch corpus texts from incidents DB; fall back to passthrough if empty. + corpus_texts = _fetch_resolved_incidents(incidents_db_path) if not corpus_texts: logger.debug("No resolved incidents found — all hypotheses treated as novel") return self._passthrough(hypotheses) # Embed corpus (with caching). - corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path) + corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, incidents_db_path) # Score each hypothesis and sort by novelty * confidence descending. ranked = [ @@ -230,10 +230,10 @@ class FalsePositiveSuppressor: self, embedder: Any, corpus_texts: list[str], - db_path: Path, + incidents_db_path: Path, ) -> list[list[float]]: """Return cached corpus embeddings, re-embedding if the corpus has changed.""" - cache_key = str(db_path) + cache_key = str(incidents_db_path) cached = _corpus_cache.get(cache_key) if cached is not None: diff --git a/tests/test_blocklist_endpoints.py b/tests/test_blocklist_endpoints.py index 938042f..0d89cae 100644 --- a/tests/test_blocklist_endpoints.py +++ b/tests/test_blocklist_endpoints.py @@ -16,6 +16,8 @@ def client(tmp_path): ensure_schema(db) with patch.object(rest_module, "DB_PATH", db), \ + patch.object(rest_module, "CONTEXT_DB_PATH", tmp_path / "context.db"), \ + patch.object(rest_module, "INCIDENTS_DB_PATH", tmp_path / "incidents.db"), \ patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \ patch.object(rest_module, "_compiled_patterns", []): with TestClient(rest_module.app, raise_server_exceptions=True) as c: @@ -41,6 +43,8 @@ def client_with_candidate(tmp_path): conn.close() with patch.object(rest_module, "DB_PATH", db), \ + patch.object(rest_module, "CONTEXT_DB_PATH", tmp_path / "context.db"), \ + patch.object(rest_module, "INCIDENTS_DB_PATH", tmp_path / "incidents.db"), \ patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \ patch.object(rest_module, "_compiled_patterns", []): with TestClient(rest_module.app, raise_server_exceptions=True) as c: