fix(watcher): remove per-flush FTS sync to eliminate SQLite write lock contention
Each WatchSource was calling build_fts_index() every 3 flushes (~30s). With 70+ active sources, this produced a near-continuous stream of FTS INSERT operations, each holding the SQLite write lock for several seconds while scanning the 5.4GB log_entries table. Every other writer (other watcher flushes, cybersec scorer) timed out with 'database is locked'. FTS index is now only updated by the glean scheduler (every 900s) and the manual `build-fts` command — both already call build_fts_index() through glean_dir(). Real-time freshness of watcher-ingested entries in FTS was ~30s before; it's now up to ~15min, which is acceptable. This is the root cause of the persistent 'database is locked' errors blocking the cybersec scorer (issue #9). Closes: #9
This commit is contained in:
parent
c17c6c42ea
commit
971a859c0d
1 changed files with 22 additions and 26 deletions
|
|
@ -24,14 +24,12 @@ from app.db import get_conn
|
||||||
from app.db.schema import ensure_schema
|
from app.db.schema import ensure_schema
|
||||||
from app.glean.pipeline import _detect_format, _write_batch
|
from app.glean.pipeline import _detect_format, _write_batch
|
||||||
from app.glean.base import _compile, load_patterns, now_iso
|
from app.glean.base import _compile, load_patterns, now_iso
|
||||||
from app.services.search import build_fts_index
|
|
||||||
from app.services.models import RetrievedEntry
|
from app.services.models import RetrievedEntry
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
FLUSH_INTERVAL_SEC = 10
|
FLUSH_INTERVAL_SEC = 10
|
||||||
FLUSH_BATCH_SIZE = 100
|
FLUSH_BATCH_SIZE = 100
|
||||||
FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load
|
|
||||||
|
|
||||||
|
|
||||||
# ── Config ────────────────────────────────────────────────────────────────────
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||||||
|
|
@ -113,22 +111,21 @@ class WatchSource:
|
||||||
|
|
||||||
ensure_schema(self.db_path)
|
ensure_schema(self.db_path)
|
||||||
|
|
||||||
with get_conn(self.db_path) as conn:
|
try:
|
||||||
try:
|
cmd = self._build_command()
|
||||||
cmd = self._build_command()
|
if not cmd:
|
||||||
if not cmd:
|
return
|
||||||
return
|
self._proc = subprocess.Popen(
|
||||||
self._proc = subprocess.Popen(
|
cmd,
|
||||||
cmd,
|
stdout=subprocess.PIPE,
|
||||||
stdout=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
text=True,
|
||||||
text=True,
|
bufsize=1,
|
||||||
bufsize=1,
|
)
|
||||||
)
|
self._drain(compiled)
|
||||||
self._drain(conn, compiled)
|
except Exception as exc:
|
||||||
except Exception as exc:
|
self._error = str(exc)
|
||||||
self._error = str(exc)
|
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
|
||||||
logger.error("Watch source %r crashed: %s", self.config.source_id, exc)
|
|
||||||
|
|
||||||
def _build_command(self) -> list[str] | None:
|
def _build_command(self) -> list[str] | None:
|
||||||
t = self.config.source_type
|
t = self.config.source_type
|
||||||
|
|
@ -189,7 +186,7 @@ class WatchSource:
|
||||||
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def _drain(self, conn, compiled) -> None:
|
def _drain(self, compiled) -> None:
|
||||||
"""Read lines from the subprocess and flush to DB periodically."""
|
"""Read lines from the subprocess and flush to DB periodically."""
|
||||||
assert self._proc is not None
|
assert self._proc is not None
|
||||||
buffer: list[str] = []
|
buffer: list[str] = []
|
||||||
|
|
@ -217,29 +214,28 @@ class WatchSource:
|
||||||
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
|
should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC
|
||||||
|
|
||||||
if buffer and should_flush:
|
if buffer and should_flush:
|
||||||
flush_count = self._flush(conn, buffer, compiled, flush_count)
|
flush_count = self._flush(buffer, compiled, flush_count)
|
||||||
buffer.clear()
|
buffer.clear()
|
||||||
last_flush = datetime.now(tz=timezone.utc)
|
last_flush = datetime.now(tz=timezone.utc)
|
||||||
|
|
||||||
# Flush remainder
|
# Flush remainder
|
||||||
if buffer:
|
if buffer:
|
||||||
self._flush(conn, buffer, compiled, flush_count)
|
self._flush(buffer, compiled, flush_count)
|
||||||
|
|
||||||
def _flush(self, conn, lines: list[str], compiled, flush_count: int) -> int:
|
def _flush(self, lines: list[str], compiled, flush_count: int) -> int:
|
||||||
ingest_time = now_iso()
|
ingest_time = now_iso()
|
||||||
try:
|
try:
|
||||||
entries = self._parse_lines(lines, ingest_time, compiled)
|
entries = self._parse_lines(lines, ingest_time, compiled)
|
||||||
if entries:
|
if entries:
|
||||||
_write_batch(conn, entries)
|
with get_conn(self.db_path) as conn:
|
||||||
conn.commit()
|
_write_batch(conn, entries)
|
||||||
|
conn.commit()
|
||||||
self._entry_count += len(entries)
|
self._entry_count += len(entries)
|
||||||
self._last_event = now_iso()
|
self._last_event = now_iso()
|
||||||
if entries:
|
if entries:
|
||||||
self._last_event = entries[-1].timestamp_iso or self._last_event
|
self._last_event = entries[-1].timestamp_iso or self._last_event
|
||||||
|
|
||||||
flush_count += 1
|
flush_count += 1
|
||||||
if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0:
|
|
||||||
build_fts_index(self.db_path)
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
|
logger.warning("Flush error for %r: %s", self.config.source_id, exc)
|
||||||
return flush_count
|
return flush_count
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue