Watcher, REST endpoints, services (search, incidents, blocklist),
MCP server, context retriever, embedder, glean_scheduler, and
doc_upload all used the default 5-second SQLite busy timeout.
During collect glean write phases, watcher flush threads were hitting
'database is locked' errors when the glean held the write lock longer
than 5 seconds.
All connections now use timeout=30.0, matching the pipeline fix
from commit 5a9281a. No logic changes.
189 lines
6.6 KiB
Python
189 lines
6.6 KiB
Python
"""Periodic batch glean scheduler with optional CF submission.
|
|
|
|
Runs glean_sources on a configurable interval (TURNSTONE_GLEAN_INTERVAL env var,
|
|
default 900s / 15 min). Set to 0 to disable.
|
|
|
|
When TURNSTONE_SUBMIT_ENDPOINT is set, pushes pattern-matched entries to a remote
|
|
Turnstone instance (the CF receiving store) after each glean run.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from app.glean.pipeline import glean_sources
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_lock = asyncio.Lock()
|
|
|
|
|
|
@dataclass
|
|
class IngestState:
|
|
last_run_at: str | None = None
|
|
last_duration_s: float | None = None
|
|
last_stats: dict[str, int] = field(default_factory=dict)
|
|
last_error: str | None = None
|
|
run_count: int = 0
|
|
next_run_at: str | None = None
|
|
running: bool = False
|
|
last_submitted_at: str | None = None
|
|
last_submit_count: int = 0
|
|
last_submit_error: str | None = None
|
|
|
|
|
|
_state = IngestState()
|
|
|
|
|
|
def get_state() -> IngestState:
|
|
return _state
|
|
|
|
|
|
def _query_matched_since(db_path: Path, since: str | None) -> list[dict]:
|
|
"""Return entries with non-empty matched_patterns, optionally filtered by ingest_time."""
|
|
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
if since:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
|
|
ingest_time, severity, repeat_count, out_of_order,
|
|
matched_patterns, text
|
|
FROM log_entries
|
|
WHERE matched_patterns != '[]' AND ingest_time > ?
|
|
ORDER BY ingest_time
|
|
LIMIT 5000
|
|
""",
|
|
(since,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, source_id, sequence, timestamp_raw, timestamp_iso,
|
|
ingest_time, severity, repeat_count, out_of_order,
|
|
matched_patterns, text
|
|
FROM log_entries
|
|
WHERE matched_patterns != '[]'
|
|
ORDER BY ingest_time DESC
|
|
LIMIT 5000
|
|
""",
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
async def submit_matched(
|
|
db_path: Path,
|
|
submit_endpoint: str,
|
|
source_host: str,
|
|
since: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Push pattern-matched entries to the remote CF receiving instance."""
|
|
loop = asyncio.get_running_loop()
|
|
entries = await loop.run_in_executor(
|
|
None, lambda: _query_matched_since(db_path, since)
|
|
)
|
|
if not entries:
|
|
return {"ok": True, "submitted": 0, "skipped": True}
|
|
|
|
url = f"{submit_endpoint.rstrip('/')}/turnstone/api/glean/batch"
|
|
payload = {"source_host": source_host, "entries": entries}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(url, json=payload)
|
|
resp.raise_for_status()
|
|
result = resp.json()
|
|
submitted = result.get("gleaned", len(entries))
|
|
_state.last_submitted_at = datetime.now(tz=timezone.utc).isoformat()
|
|
_state.last_submit_count = submitted
|
|
_state.last_submit_error = None
|
|
logger.info("Submitted %d matched entries to %s", submitted, submit_endpoint)
|
|
return {"ok": True, "submitted": submitted}
|
|
except Exception as exc:
|
|
_state.last_submit_error = str(exc)
|
|
logger.warning("Submission to %s failed: %s", submit_endpoint, exc)
|
|
return {"ok": False, "error": str(exc)}
|
|
|
|
|
|
async def run_once(
|
|
sources_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
submit_endpoint: str | None = None,
|
|
source_host: str = "unknown",
|
|
force: bool = False,
|
|
) -> dict[str, Any]:
|
|
"""Ingest all sources once, then submit matched entries if configured.
|
|
|
|
Pass ``force=True`` to bypass fingerprint checks and re-glean all local
|
|
file sources regardless of whether they appear unchanged.
|
|
"""
|
|
if _lock.locked():
|
|
return {"ok": False, "error": "glean already running", "skipped": True}
|
|
|
|
async with _lock:
|
|
_state.running = True
|
|
started = datetime.now(tz=timezone.utc)
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
stats: dict[str, int] = await loop.run_in_executor(
|
|
None,
|
|
lambda: glean_sources(sources_file, db_path, pattern_file, force=force),
|
|
)
|
|
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
|
_state.last_run_at = started.isoformat()
|
|
_state.last_duration_s = round(duration, 2)
|
|
_state.last_stats = stats
|
|
_state.last_error = None
|
|
_state.run_count += 1
|
|
logger.info("Batch glean complete in %.1fs — %s", duration, stats)
|
|
except Exception as exc:
|
|
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
|
_state.last_run_at = started.isoformat()
|
|
_state.last_duration_s = round(duration, 2)
|
|
_state.last_error = str(exc)
|
|
_state.run_count += 1
|
|
logger.error("Batch glean failed: %s", exc)
|
|
_state.running = False
|
|
return {"ok": False, "error": str(exc)}
|
|
finally:
|
|
_state.running = False
|
|
|
|
if submit_endpoint:
|
|
await submit_matched(db_path, submit_endpoint, source_host, since=_state.last_submitted_at)
|
|
|
|
return {"ok": True, "stats": _state.last_stats, "duration_s": _state.last_duration_s}
|
|
|
|
|
|
async def scheduler_loop(
|
|
sources_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None,
|
|
interval_s: int,
|
|
submit_endpoint: str | None = None,
|
|
source_host: str = "unknown",
|
|
) -> None:
|
|
"""Run glean + optional submission every interval_s seconds until cancelled."""
|
|
logger.info("Ingest scheduler started — interval %ds, sources: %s", interval_s, sources_file)
|
|
if submit_endpoint:
|
|
logger.info("Submission enabled — endpoint: %s", submit_endpoint)
|
|
while True:
|
|
await run_once(sources_file, db_path, pattern_file, submit_endpoint, source_host)
|
|
next_run = datetime.now(tz=timezone.utc) + timedelta(seconds=interval_s)
|
|
_state.next_run_at = next_run.isoformat()
|
|
try:
|
|
await asyncio.sleep(interval_s)
|
|
except asyncio.CancelledError:
|
|
logger.info("Ingest scheduler cancelled")
|
|
_state.next_run_at = None
|
|
raise
|