diff --git a/app/watch/watcher.py b/app/watch/watcher.py index dda8ad2..c397ae9 100644 --- a/app/watch/watcher.py +++ b/app/watch/watcher.py @@ -24,14 +24,12 @@ from app.db import get_conn from app.db.schema import ensure_schema from app.glean.pipeline import _detect_format, _write_batch from app.glean.base import _compile, load_patterns, now_iso -from app.services.search import build_fts_index from app.services.models import RetrievedEntry logger = logging.getLogger(__name__) FLUSH_INTERVAL_SEC = 10 FLUSH_BATCH_SIZE = 100 -FTS_SYNC_EVERY_N_FLUSHES = 3 # sync FTS every ~30s under normal load # ── Config ──────────────────────────────────────────────────────────────────── @@ -113,22 +111,21 @@ class WatchSource: ensure_schema(self.db_path) - with get_conn(self.db_path) as conn: - try: - cmd = self._build_command() - if not cmd: - return - self._proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1, - ) - self._drain(conn, compiled) - except Exception as exc: - self._error = str(exc) - logger.error("Watch source %r crashed: %s", self.config.source_id, exc) + try: + cmd = self._build_command() + if not cmd: + return + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + self._drain(compiled) + except Exception as exc: + self._error = str(exc) + logger.error("Watch source %r crashed: %s", self.config.source_id, exc) def _build_command(self) -> list[str] | None: t = self.config.source_type @@ -189,7 +186,7 @@ class WatchSource: return [] - def _drain(self, conn, compiled) -> None: + def _drain(self, compiled) -> None: """Read lines from the subprocess and flush to DB periodically.""" assert self._proc is not None buffer: list[str] = [] @@ -217,29 +214,28 @@ class WatchSource: should_flush = len(buffer) >= FLUSH_BATCH_SIZE or elapsed >= FLUSH_INTERVAL_SEC if buffer and should_flush: - flush_count = self._flush(conn, buffer, compiled, flush_count) + flush_count = self._flush(buffer, compiled, flush_count) buffer.clear() last_flush = datetime.now(tz=timezone.utc) # Flush remainder if buffer: - self._flush(conn, buffer, compiled, flush_count) + self._flush(buffer, compiled, flush_count) - def _flush(self, conn, lines: list[str], compiled, flush_count: int) -> int: + def _flush(self, lines: list[str], compiled, flush_count: int) -> int: ingest_time = now_iso() try: entries = self._parse_lines(lines, ingest_time, compiled) if entries: - _write_batch(conn, entries) - conn.commit() + with get_conn(self.db_path) as conn: + _write_batch(conn, entries) + conn.commit() self._entry_count += len(entries) self._last_event = now_iso() if entries: self._last_event = entries[-1].timestamp_iso or self._last_event flush_count += 1 - if flush_count % FTS_SYNC_EVERY_N_FLUSHES == 0: - build_fts_index(self.db_path) except Exception as exc: logger.warning("Flush error for %r: %s", self.config.source_id, exc) return flush_count