diff --git a/app/context/store.py b/app/context/store.py index 07bca49..1ffa08a 100644 --- a/app/context/store.py +++ b/app/context/store.py @@ -29,7 +29,11 @@ class ContextDocument: def _connect(db_path: Path) -> sqlite3.Connection: - conn = sqlite3.connect(str(db_path)) + # timeout=30: retry for up to 30 s when another writer (e.g. the glean + # collector) holds a WAL write lock. PRAGMA busy_timeout is a SQLite-level + # hint that operates after the connection is open; the Python sqlite3 module's + # own retry loop is controlled solely by this timeout= argument. + conn = sqlite3.connect(str(db_path), timeout=30.0) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row diff --git a/app/glean/pipeline.py b/app/glean/pipeline.py index 42f5463..a4b749b 100644 --- a/app/glean/pipeline.py +++ b/app/glean/pipeline.py @@ -72,6 +72,8 @@ CREATE TABLE IF NOT EXISTS received_bundles ( CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at); CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type); +-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH +-- kept here as no-ops so legacy single-file deployments still work CREATE TABLE IF NOT EXISTS context_facts ( id TEXT PRIMARY KEY, category TEXT NOT NULL, @@ -129,6 +131,38 @@ CREATE TABLE IF NOT EXISTS glean_fingerprints ( """ +_CONTEXT_SCHEMA = """ +CREATE TABLE IF NOT EXISTS context_facts ( + id TEXT PRIMARY KEY, + category TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + source TEXT, + created_at TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category); +CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key); + +CREATE TABLE IF NOT EXISTS context_documents ( + id TEXT PRIMARY KEY, + filename TEXT NOT NULL, + doc_type TEXT NOT NULL, + full_text TEXT NOT NULL, + file_size INTEGER, + uploaded_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS context_chunks ( + id TEXT PRIMARY KEY, + document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE, + chunk_index INTEGER NOT NULL, + text TEXT NOT NULL, + embedding BLOB +); +CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id); +""" + + def ensure_schema(db_path: Path) -> None: """Create all tables and apply additive migrations. Safe to call on every startup.""" conn = sqlite3.connect(str(db_path)) @@ -146,6 +180,21 @@ def ensure_schema(db_path: Path) -> None: conn.close() +def ensure_context_schema(db_path: Path) -> None: + """Create context KB tables in a dedicated database file. + + Using a separate file from the main log DB means context fact writes never + contend with the high-throughput glean scheduler, which can hold the main + DB write lock for seconds at a time when flushing large journal batches. + """ + conn = sqlite3.connect(str(db_path), timeout=30.0) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.executescript(_CONTEXT_SCHEMA) + conn.commit() + conn.close() + + def _fingerprint(path: Path) -> tuple[float, int]: """Return (mtime, size) for a file — cheap identity check, no content read needed.""" st = path.stat() diff --git a/app/rest.py b/app/rest.py index c3ef4d9..391638d 100644 --- a/app/rest.py +++ b/app/rest.py @@ -27,7 +27,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel -from app.glean.pipeline import ensure_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source +from app.glean.pipeline import ensure_schema, ensure_context_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source from app.glean.base import load_compiled_patterns, now_iso from app.glean.tautulli import parse_webhook as _parse_tautulli from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh @@ -78,6 +78,11 @@ from app.tasks.glean_scheduler import get_state as _glean_state, run_once as _ru from app.glean.mqtt_subscriber import run_mqtt_subscribers as _run_mqtt_subscribers DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "data" / "turnstone.db")) +# Context KB gets its own file so context fact writes never contend with the +# high-throughput glean scheduler. Defaults to a sibling file next to the main DB. +CONTEXT_DB_PATH = Path( + os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db") +) PREFS_PATH = DB_PATH.parent / "preferences.json" DIST_DIR = Path(__file__).parent.parent / "web" / "dist" SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown") @@ -110,6 +115,7 @@ _compiled_patterns: list = [] async def _lifespan(app: FastAPI): global _compiled_patterns ensure_schema(DB_PATH) + ensure_context_schema(CONTEXT_DB_PATH) _compiled_patterns = load_compiled_patterns(PATTERN_FILE) watch_cfg_path = PATTERN_DIR / "watch.yaml" configs = load_watch_config(watch_cfg_path) @@ -382,6 +388,7 @@ async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse: llm_url=prefs.get("llm_url") or None, llm_model=prefs.get("llm_model") or None, llm_api_key=prefs.get("llm_api_key") or None, + context_db_path=CONTEXT_DB_PATH, ): yield f"data: {json.dumps(event)}\n\n" @@ -1008,7 +1015,7 @@ async def upload_doc(file: UploadFile): content = await file.read() try: result = await asyncio.to_thread( - lambda: _ingest_upload(DB_PATH, file.filename or "upload", content) + lambda: _ingest_upload(CONTEXT_DB_PATH, file.filename or "upload", content) ) except UnsupportedDocType as e: raise HTTPException(status_code=415, detail=str(e)) @@ -1019,7 +1026,7 @@ async def upload_doc(file: UploadFile): @_ctx.get("/docs") async def list_docs(): - docs = await asyncio.to_thread(lambda: _list_documents(DB_PATH)) + docs = await asyncio.to_thread(lambda: _list_documents(CONTEXT_DB_PATH)) return [ { "id": d.id, @@ -1034,7 +1041,7 @@ async def list_docs(): @_ctx.delete("/docs/{doc_id}") async def delete_doc(doc_id: str): - deleted = await asyncio.to_thread(lambda: _delete_document(DB_PATH, doc_id)) + deleted = await asyncio.to_thread(lambda: _delete_document(CONTEXT_DB_PATH, doc_id)) if not deleted: raise HTTPException(status_code=404, detail="Document not found") return {"deleted": doc_id} @@ -1043,7 +1050,7 @@ async def delete_doc(doc_id: str): @_ctx.post("/facts") async def create_fact(body: FactBody): fact = await asyncio.to_thread( - lambda: _add_fact(DB_PATH, body.category, body.key, body.value, body.source) + lambda: _add_fact(CONTEXT_DB_PATH, body.category, body.key, body.value, body.source) ) return {"id": fact.id, "category": fact.category, "key": fact.key, "value": fact.value, "source": fact.source, "created_at": fact.created_at} @@ -1051,7 +1058,7 @@ async def create_fact(body: FactBody): @_ctx.get("/facts") async def list_facts_endpoint(category: str | None = None): - facts = await asyncio.to_thread(lambda: _list_facts(DB_PATH, category)) + facts = await asyncio.to_thread(lambda: _list_facts(CONTEXT_DB_PATH, category)) return [ {"id": f.id, "category": f.category, "key": f.key, "value": f.value, "source": f.source, "created_at": f.created_at} @@ -1061,7 +1068,7 @@ async def list_facts_endpoint(category: str | None = None): @_ctx.delete("/facts/{fact_id}") async def delete_fact_endpoint(fact_id: str): - deleted = await asyncio.to_thread(lambda: _delete_fact(DB_PATH, fact_id)) + deleted = await asyncio.to_thread(lambda: _delete_fact(CONTEXT_DB_PATH, fact_id)) if not deleted: raise HTTPException(status_code=404, detail="Fact not found") return {"deleted": fact_id} @@ -1082,13 +1089,13 @@ async def wizard_step(body: WizardStepBody): async def wizard_apply(body: WizardApplyBody): if not is_complete(body.session): raise HTTPException(status_code=400, detail="Wizard session is not complete") - result = await asyncio.to_thread(lambda: apply_session(DB_PATH, body.session)) + result = await asyncio.to_thread(lambda: apply_session(CONTEXT_DB_PATH, body.session)) return result @_ctx.get("/debug/search") async def debug_search(q: str): - ctx = await asyncio.to_thread(lambda: _retrieve_context(DB_PATH, q)) + ctx = await asyncio.to_thread(lambda: _retrieve_context(CONTEXT_DB_PATH, q)) return {"facts": ctx.facts, "chunks": ctx.chunks, "block": format_context_block(ctx)} diff --git a/app/services/diagnose/__init__.py b/app/services/diagnose/__init__.py index 80d8a0f..1f1bb8e 100644 --- a/app/services/diagnose/__init__.py +++ b/app/services/diagnose/__init__.py @@ -195,6 +195,7 @@ async def diagnose_stream( llm_url: str | None = None, llm_model: str | None = None, llm_api_key: str | None = None, + context_db_path: Path | None = None, ) -> AsyncGenerator[dict[str, Any], None]: """Async generator yielding SSE event dicts for the diagnose pipeline. @@ -226,7 +227,8 @@ async def diagnose_stream( time_detected = keywords != query yield {"type": "status", "message": "Loading environment context…"} - ctx = await asyncio.to_thread(lambda: retrieve_context(db_path, query)) + _ctx_db = context_db_path or db_path + ctx = await asyncio.to_thread(lambda: retrieve_context(_ctx_db, query)) yield { "type": "context", "facts": ctx.facts,