fix: separate context KB into own SQLite file to eliminate write-lock contention
context_facts, context_documents, and context_chunks now live in turnstone-context.db (sibling of turnstone.db). The glean scheduler held write locks on the main DB long enough to cause 5-second timeout failures on context fact inserts; separate files have independent WAL write locks so they never contend. Changes: - pipeline.py: extract _CONTEXT_SCHEMA + ensure_context_schema() - rest.py: CONTEXT_DB_PATH (TURNSTONE_CONTEXT_DB env var, defaults to sibling file); init via ensure_context_schema(); all context routes pass CONTEXT_DB_PATH; diagnose_stream receives context_db_path kwarg - diagnose/__init__.py: diagnose_stream() accepts context_db_path (falls back to db_path for backward compat); retrieve_context uses it - store.py: sqlite3.connect() timeout=30.0 — Python driver retry loop is independent of PRAGMA busy_timeout; needed for any remaining contention during test or single-file deployments Closes: #42
This commit is contained in:
parent
e851099e5c
commit
3cfd587d16
4 changed files with 73 additions and 11 deletions
|
|
@ -29,7 +29,11 @@ class ContextDocument:
|
|||
|
||||
|
||||
def _connect(db_path: Path) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
# timeout=30: retry for up to 30 s when another writer (e.g. the glean
|
||||
# collector) holds a WAL write lock. PRAGMA busy_timeout is a SQLite-level
|
||||
# hint that operates after the connection is open; the Python sqlite3 module's
|
||||
# own retry loop is controlled solely by this timeout= argument.
|
||||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
|
|
|||
|
|
@ -72,6 +72,8 @@ CREATE TABLE IF NOT EXISTS received_bundles (
|
|||
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
||||
|
||||
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
|
||||
-- kept here as no-ops so legacy single-file deployments still work
|
||||
CREATE TABLE IF NOT EXISTS context_facts (
|
||||
id TEXT PRIMARY KEY,
|
||||
category TEXT NOT NULL,
|
||||
|
|
@ -129,6 +131,38 @@ CREATE TABLE IF NOT EXISTS glean_fingerprints (
|
|||
"""
|
||||
|
||||
|
||||
_CONTEXT_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS context_facts (
|
||||
id TEXT PRIMARY KEY,
|
||||
category TEXT NOT NULL,
|
||||
key TEXT NOT NULL,
|
||||
value TEXT NOT NULL,
|
||||
source TEXT,
|
||||
created_at TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS context_documents (
|
||||
id TEXT PRIMARY KEY,
|
||||
filename TEXT NOT NULL,
|
||||
doc_type TEXT NOT NULL,
|
||||
full_text TEXT NOT NULL,
|
||||
file_size INTEGER,
|
||||
uploaded_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS context_chunks (
|
||||
id TEXT PRIMARY KEY,
|
||||
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
|
||||
chunk_index INTEGER NOT NULL,
|
||||
text TEXT NOT NULL,
|
||||
embedding BLOB
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
||||
"""
|
||||
|
||||
|
||||
def ensure_schema(db_path: Path) -> None:
|
||||
"""Create all tables and apply additive migrations. Safe to call on every startup."""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
|
|
@ -146,6 +180,21 @@ def ensure_schema(db_path: Path) -> None:
|
|||
conn.close()
|
||||
|
||||
|
||||
def ensure_context_schema(db_path: Path) -> None:
|
||||
"""Create context KB tables in a dedicated database file.
|
||||
|
||||
Using a separate file from the main log DB means context fact writes never
|
||||
contend with the high-throughput glean scheduler, which can hold the main
|
||||
DB write lock for seconds at a time when flushing large journal batches.
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
conn.executescript(_CONTEXT_SCHEMA)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def _fingerprint(path: Path) -> tuple[float, int]:
|
||||
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
||||
st = path.stat()
|
||||
|
|
|
|||
25
app/rest.py
25
app/rest.py
|
|
@ -27,7 +27,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
|
|||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.glean.pipeline import ensure_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
||||
from app.glean.pipeline import ensure_schema, ensure_context_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
||||
from app.glean.base import load_compiled_patterns, now_iso
|
||||
from app.glean.tautulli import parse_webhook as _parse_tautulli
|
||||
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
|
||||
|
|
@ -78,6 +78,11 @@ from app.tasks.glean_scheduler import get_state as _glean_state, run_once as _ru
|
|||
from app.glean.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers
|
||||
|
||||
DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db"))
|
||||
# Context KB gets its own file so context fact writes never contend with the
|
||||
# high-throughput glean scheduler. Defaults to a sibling file next to the main DB.
|
||||
CONTEXT_DB_PATH = Path(
|
||||
os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db")
|
||||
)
|
||||
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
||||
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
||||
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
||||
|
|
@ -110,6 +115,7 @@ _compiled_patterns: list = []
|
|||
async def _lifespan(app: FastAPI):
|
||||
global _compiled_patterns
|
||||
ensure_schema(DB_PATH)
|
||||
ensure_context_schema(CONTEXT_DB_PATH)
|
||||
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
|
||||
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
||||
configs = load_watch_config(watch_cfg_path)
|
||||
|
|
@ -382,6 +388,7 @@ async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
|
|||
llm_url=prefs.get("llm_url") or None,
|
||||
llm_model=prefs.get("llm_model") or None,
|
||||
llm_api_key=prefs.get("llm_api_key") or None,
|
||||
context_db_path=CONTEXT_DB_PATH,
|
||||
):
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
|
||||
|
|
@ -1008,7 +1015,7 @@ async def upload_doc(file: UploadFile):
|
|||
content = await file.read()
|
||||
try:
|
||||
result = await asyncio.to_thread(
|
||||
lambda: _ingest_upload(DB_PATH, file.filename or "upload", content)
|
||||
lambda: _ingest_upload(CONTEXT_DB_PATH, file.filename or "upload", content)
|
||||
)
|
||||
except UnsupportedDocType as e:
|
||||
raise HTTPException(status_code=415, detail=str(e))
|
||||
|
|
@ -1019,7 +1026,7 @@ async def upload_doc(file: UploadFile):
|
|||
|
||||
@_ctx.get("/docs")
|
||||
async def list_docs():
|
||||
docs = await asyncio.to_thread(lambda: _list_documents(DB_PATH))
|
||||
docs = await asyncio.to_thread(lambda: _list_documents(CONTEXT_DB_PATH))
|
||||
return [
|
||||
{
|
||||
"id": d.id,
|
||||
|
|
@ -1034,7 +1041,7 @@ async def list_docs():
|
|||
|
||||
@_ctx.delete("/docs/{doc_id}")
|
||||
async def delete_doc(doc_id: str):
|
||||
deleted = await asyncio.to_thread(lambda: _delete_document(DB_PATH, doc_id))
|
||||
deleted = await asyncio.to_thread(lambda: _delete_document(CONTEXT_DB_PATH, doc_id))
|
||||
if not deleted:
|
||||
raise HTTPException(status_code=404, detail="Document not found")
|
||||
return {"deleted": doc_id}
|
||||
|
|
@ -1043,7 +1050,7 @@ async def delete_doc(doc_id: str):
|
|||
@_ctx.post("/facts")
|
||||
async def create_fact(body: FactBody):
|
||||
fact = await asyncio.to_thread(
|
||||
lambda: _add_fact(DB_PATH, body.category, body.key, body.value, body.source)
|
||||
lambda: _add_fact(CONTEXT_DB_PATH, body.category, body.key, body.value, body.source)
|
||||
)
|
||||
return {"id": fact.id, "category": fact.category, "key": fact.key,
|
||||
"value": fact.value, "source": fact.source, "created_at": fact.created_at}
|
||||
|
|
@ -1051,7 +1058,7 @@ async def create_fact(body: FactBody):
|
|||
|
||||
@_ctx.get("/facts")
|
||||
async def list_facts_endpoint(category: str | None = None):
|
||||
facts = await asyncio.to_thread(lambda: _list_facts(DB_PATH, category))
|
||||
facts = await asyncio.to_thread(lambda: _list_facts(CONTEXT_DB_PATH, category))
|
||||
return [
|
||||
{"id": f.id, "category": f.category, "key": f.key,
|
||||
"value": f.value, "source": f.source, "created_at": f.created_at}
|
||||
|
|
@ -1061,7 +1068,7 @@ async def list_facts_endpoint(category: str | None = None):
|
|||
|
||||
@_ctx.delete("/facts/{fact_id}")
|
||||
async def delete_fact_endpoint(fact_id: str):
|
||||
deleted = await asyncio.to_thread(lambda: _delete_fact(DB_PATH, fact_id))
|
||||
deleted = await asyncio.to_thread(lambda: _delete_fact(CONTEXT_DB_PATH, fact_id))
|
||||
if not deleted:
|
||||
raise HTTPException(status_code=404, detail="Fact not found")
|
||||
return {"deleted": fact_id}
|
||||
|
|
@ -1082,13 +1089,13 @@ async def wizard_step(body: WizardStepBody):
|
|||
async def wizard_apply(body: WizardApplyBody):
|
||||
if not is_complete(body.session):
|
||||
raise HTTPException(status_code=400, detail="Wizard session is not complete")
|
||||
result = await asyncio.to_thread(lambda: apply_session(DB_PATH, body.session))
|
||||
result = await asyncio.to_thread(lambda: apply_session(CONTEXT_DB_PATH, body.session))
|
||||
return result
|
||||
|
||||
|
||||
@_ctx.get("/debug/search")
|
||||
async def debug_search(q: str):
|
||||
ctx = await asyncio.to_thread(lambda: _retrieve_context(DB_PATH, q))
|
||||
ctx = await asyncio.to_thread(lambda: _retrieve_context(CONTEXT_DB_PATH, q))
|
||||
return {"facts": ctx.facts, "chunks": ctx.chunks, "block": format_context_block(ctx)}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -195,6 +195,7 @@ async def diagnose_stream(
|
|||
llm_url: str | None = None,
|
||||
llm_model: str | None = None,
|
||||
llm_api_key: str | None = None,
|
||||
context_db_path: Path | None = None,
|
||||
) -> AsyncGenerator[dict[str, Any], None]:
|
||||
"""Async generator yielding SSE event dicts for the diagnose pipeline.
|
||||
|
||||
|
|
@ -226,7 +227,8 @@ async def diagnose_stream(
|
|||
time_detected = keywords != query
|
||||
|
||||
yield {"type": "status", "message": "Loading environment context…"}
|
||||
ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query))
|
||||
_ctx_db = context_db_path or db_path
|
||||
ctx = await asyncio.to_thread(lambda: retrieve_context(_ctx_db, query))
|
||||
yield {
|
||||
"type": "context",
|
||||
"facts": ctx.facts,
|
||||
|
|
|
|||
Loading…
Reference in a new issue