fix: separate context KB into own SQLite file to eliminate write-lock contention

context_facts, context_documents, and context_chunks now live in
turnstone-context.db (sibling of turnstone.db).  The glean scheduler
held write locks on the main DB long enough to cause 5-second timeout
failures on context fact inserts; separate files have independent WAL
write locks so they never contend.

Changes:
- pipeline.py: extract _CONTEXT_SCHEMA + ensure_context_schema()
- rest.py: CONTEXT_DB_PATH (TURNSTONE_CONTEXT_DB env var, defaults to
  sibling file); init via ensure_context_schema(); all context routes
  pass CONTEXT_DB_PATH; diagnose_stream receives context_db_path kwarg
- diagnose/__init__.py: diagnose_stream() accepts context_db_path
  (falls back to db_path for backward compat); retrieve_context uses it
- store.py: sqlite3.connect() timeout=30.0 — Python driver retry loop
  is independent of PRAGMA busy_timeout; needed for any remaining
  contention during test or single-file deployments

Closes: #42
This commit is contained in:
pyr0ball 2026-05-25 21:19:32 -07:00
parent 65d0584f4a
commit 64804b1378
4 changed files with 73 additions and 11 deletions

View file

@ -29,7 +29,11 @@ class ContextDocument:
def _connect(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(str(db_path))
# timeout=30: retry for up to 30 s when another writer (e.g. the glean
# collector) holds a WAL write lock. PRAGMA busy_timeout is a SQLite-level
# hint that operates after the connection is open; the Python sqlite3 module's
# own retry loop is controlled solely by this timeout= argument.
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row

View file

@ -72,6 +72,8 @@ CREATE TABLE IF NOT EXISTS received_bundles (
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
-- kept here as no-ops so legacy single-file deployments still work
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
@ -129,6 +131,38 @@ CREATE TABLE IF NOT EXISTS glean_fingerprints (
"""
_CONTEXT_SCHEMA = """
CREATE TABLE IF NOT EXISTS context_facts (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
source TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
CREATE TABLE IF NOT EXISTS context_documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
doc_type TEXT NOT NULL,
full_text TEXT NOT NULL,
file_size INTEGER,
uploaded_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
text TEXT NOT NULL,
embedding BLOB
);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
"""
def ensure_schema(db_path: Path) -> None:
"""Create all tables and apply additive migrations. Safe to call on every startup."""
conn = sqlite3.connect(str(db_path))
@ -146,6 +180,21 @@ def ensure_schema(db_path: Path) -> None:
conn.close()
def ensure_context_schema(db_path: Path) -> None:
"""Create context KB tables in a dedicated database file.
Using a separate file from the main log DB means context fact writes never
contend with the high-throughput glean scheduler, which can hold the main
DB write lock for seconds at a time when flushing large journal batches.
"""
conn = sqlite3.connect(str(db_path), timeout=30.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript(_CONTEXT_SCHEMA)
conn.commit()
conn.close()
def _fingerprint(path: Path) -> tuple[float, int]:
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
st = path.stat()

View file

@ -27,7 +27,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from app.glean.pipeline import ensure_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
from app.glean.pipeline import ensure_schema, ensure_context_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
from app.glean.base import load_compiled_patterns, now_iso
from app.glean.tautulli import parse_webhook as _parse_tautulli
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
@ -78,6 +78,11 @@ from app.tasks.glean_scheduler import get_state as _glean_state, run_once as _ru
from app.glean.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
# Context KB gets its own file so context fact writes never contend with the
# high-throughput glean scheduler. Defaults to a sibling file next to the main DB.
CONTEXT_DB_PATH = Path(
os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db")
)
PREFS_PATH = DB_PATH.parent / "preferences.json"
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
@ -110,6 +115,7 @@ _compiled_patterns: list = []
async def _lifespan(app: FastAPI):
global _compiled_patterns
ensure_schema(DB_PATH)
ensure_context_schema(CONTEXT_DB_PATH)
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
watch_cfg_path = PATTERN_DIR / "watch.yaml"
configs = load_watch_config(watch_cfg_path)
@ -382,6 +388,7 @@ async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
llm_url=prefs.get("llm_url") or None,
llm_model=prefs.get("llm_model") or None,
llm_api_key=prefs.get("llm_api_key") or None,
context_db_path=CONTEXT_DB_PATH,
):
yield f"data: {json.dumps(event)}\n\n"
@ -1008,7 +1015,7 @@ async def upload_doc(file: UploadFile):
content = await file.read()
try:
result = await asyncio.to_thread(
lambda: _ingest_upload(DB_PATH, file.filename or "upload", content)
lambda: _ingest_upload(CONTEXT_DB_PATH, file.filename or "upload", content)
)
except UnsupportedDocType as e:
raise HTTPException(status_code=415, detail=str(e))
@ -1019,7 +1026,7 @@ async def upload_doc(file: UploadFile):
@_ctx.get("/docs")
async def list_docs():
docs = await asyncio.to_thread(lambda: _list_documents(DB_PATH))
docs = await asyncio.to_thread(lambda: _list_documents(CONTEXT_DB_PATH))
return [
{
"id": d.id,
@ -1034,7 +1041,7 @@ async def list_docs():
@_ctx.delete("/docs/{doc_id}")
async def delete_doc(doc_id: str):
deleted = await asyncio.to_thread(lambda: _delete_document(DB_PATH, doc_id))
deleted = await asyncio.to_thread(lambda: _delete_document(CONTEXT_DB_PATH, doc_id))
if not deleted:
raise HTTPException(status_code=404, detail="Document not found")
return {"deleted": doc_id}
@ -1043,7 +1050,7 @@ async def delete_doc(doc_id: str):
@_ctx.post("/facts")
async def create_fact(body: FactBody):
fact = await asyncio.to_thread(
lambda: _add_fact(DB_PATH, body.category, body.key, body.value, body.source)
lambda: _add_fact(CONTEXT_DB_PATH, body.category, body.key, body.value, body.source)
)
return {"id": fact.id, "category": fact.category, "key": fact.key,
"value": fact.value, "source": fact.source, "created_at": fact.created_at}
@ -1051,7 +1058,7 @@ async def create_fact(body: FactBody):
@_ctx.get("/facts")
async def list_facts_endpoint(category: str | None = None):
facts = await asyncio.to_thread(lambda: _list_facts(DB_PATH, category))
facts = await asyncio.to_thread(lambda: _list_facts(CONTEXT_DB_PATH, category))
return [
{"id": f.id, "category": f.category, "key": f.key,
"value": f.value, "source": f.source, "created_at": f.created_at}
@ -1061,7 +1068,7 @@ async def list_facts_endpoint(category: str | None = None):
@_ctx.delete("/facts/{fact_id}")
async def delete_fact_endpoint(fact_id: str):
deleted = await asyncio.to_thread(lambda: _delete_fact(DB_PATH, fact_id))
deleted = await asyncio.to_thread(lambda: _delete_fact(CONTEXT_DB_PATH, fact_id))
if not deleted:
raise HTTPException(status_code=404, detail="Fact not found")
return {"deleted": fact_id}
@ -1082,13 +1089,13 @@ async def wizard_step(body: WizardStepBody):
async def wizard_apply(body: WizardApplyBody):
if not is_complete(body.session):
raise HTTPException(status_code=400, detail="Wizard session is not complete")
result = await asyncio.to_thread(lambda: apply_session(DB_PATH, body.session))
result = await asyncio.to_thread(lambda: apply_session(CONTEXT_DB_PATH, body.session))
return result
@_ctx.get("/debug/search")
async def debug_search(q: str):
ctx = await asyncio.to_thread(lambda: _retrieve_context(DB_PATH, q))
ctx = await asyncio.to_thread(lambda: _retrieve_context(CONTEXT_DB_PATH, q))
return {"facts": ctx.facts, "chunks": ctx.chunks, "block": format_context_block(ctx)}

View file

@ -195,6 +195,7 @@ async def diagnose_stream(
llm_url: str | None = None,
llm_model: str | None = None,
llm_api_key: str | None = None,
context_db_path: Path | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""Async generator yielding SSE event dicts for the diagnose pipeline.
@ -226,7 +227,8 @@ async def diagnose_stream(
time_detected = keywords != query
yield {"type": "status", "message": "Loading environment context…"}
ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query))
_ctx_db = context_db_path or db_path
ctx = await asyncio.to_thread(lambda: retrieve_context(_ctx_db, query))
yield {
"type": "context",
"facts": ctx.facts,