From ccc9a9eecf6f291b3c453f17f8a3bec0485aa61d Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 10 Jun 2026 12:42:24 -0700 Subject: [PATCH] fix(watcher): remove per-flush FTS sync to eliminate SQLite write lock contention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each WatchSource was calling build_fts_index() every 3 flushes (~30s). With 70+ active sources, this produced a near-continuous stream of FTS INSERT operations, each holding the SQLite write lock for several seconds while scanning the 5.4GB log_entries table. Every other writer (other watcher flushes, cybersec scorer) timed out with 'database is locked'. FTS index is now only updated by the glean scheduler (every 900s) and the manual `build-fts` command — both already call build_fts_index() through glean_dir(). Real-time freshness of watcher-ingested entries in FTS was ~30s before; it's now up to ~15min, which is acceptable. This is the root cause of the persistent 'database is locked' errors blocking the cybersec scorer (issue #9). Closes: https://git.opensourcesolarpunk.com/Circuit-Forge/turnstone/issues/9 --- app/watch/watcher.py | 48 ++++++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/app/watch/watcher.py b/app/watch/watcher.py index dda8ad2..c397ae9 100644 --- a/app/watch/watcher.py +++ b/app/watch/watcher.py @@ -24,14 +24,12 @@ from app.db import get_conn from app.db.schema import ensure_schema from app.glean.pipeline import _detect_format, _write_batch from app.glean.base import _compile, load_patterns, now_iso -from app.services.search import build_fts_index from app.services.models import RetrievedEntry logger = logging.getLogger(__name__) FLUSH_INTERVAL_SEC = 10 FLUSH_BATCH_SIZE = 100 -FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load # ── Config ──────────────────────────────────────────────────────────────────── @@ -113,22 +111,21 @@ class WatchSource: ensure_schema(self.db_path) - with get_conn(self.db_path) as conn: - try: - cmd = self._build_command() - if not cmd: - return - self._proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1, - ) - self._drain(conn, compiled) - except Exception as exc: - self._error = str(exc) - logger.error("Watch source %r crashed: %s", self.config.source_id, exc) + try: + cmd = self._build_command() + if not cmd: + return + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + self._drain(compiled) + except Exception as exc: + self._error = str(exc) + logger.error("Watch source %r crashed: %s", self.config.source_id, exc) def _build_command(self) -> list[str] | None: t = self.config.source_type @@ -189,7 +186,7 @@ class WatchSource: return [] - def _drain(self, conn, compiled) -> None: + def _drain(self, compiled) -> None: """Read lines from the subprocess and flush to DB periodically.""" assert self._proc is not None buffer: list[str] = [] @@ -217,29 +214,28 @@ class WatchSource: should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC if buffer and should_flush: - flush_count = self._flush(conn, buffer, compiled, flush_count) + flush_count = self._flush(buffer, compiled, flush_count) buffer.clear() last_flush = datetime.now(tz=timezone.utc) # Flush remainder if buffer: - self._flush(conn, buffer, compiled, flush_count) + self._flush(buffer, compiled, flush_count) - def _flush(self, conn, lines: list[str], compiled, flush_count: int) -> int: + def _flush(self, lines: list[str], compiled, flush_count: int) -> int: ingest_time = now_iso() try: entries = self._parse_lines(lines, ingest_time, compiled) if entries: - _write_batch(conn, entries) - conn.commit() + with get_conn(self.db_path) as conn: + _write_batch(conn, entries) + conn.commit() self._entry_count += len(entries) self._last_event = now_iso() if entries: self._last_event = entries[-1].timestamp_iso or self._last_event flush_count += 1 - if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0: - build_fts_index(self.db_path) except Exception as exc: logger.warning("Flush error for %r: %s", self.config.source_id, exc) return flush_count