Merge feat/60-incidents-db: split incidents tables to dedicated DB (#60)
This commit is contained in:
commit
7e6321045a
6 changed files with 140 additions and 34 deletions
|
|
@ -46,6 +46,8 @@ CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_
|
|||
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
||||
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
||||
|
||||
-- incidents tables moved to ensure_incidents_schema() / INCIDENTS_DB_PATH
|
||||
-- kept here as no-ops so legacy single-file deployments still work
|
||||
CREATE TABLE IF NOT EXISTS incidents (
|
||||
id TEXT PRIMARY KEY,
|
||||
label TEXT NOT NULL,
|
||||
|
|
@ -56,8 +58,6 @@ CREATE TABLE IF NOT EXISTS incidents (
|
|||
created_at TEXT NOT NULL,
|
||||
severity TEXT NOT NULL DEFAULT 'medium'
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS received_bundles (
|
||||
id TEXT PRIMARY KEY,
|
||||
source_host TEXT NOT NULL,
|
||||
|
|
@ -69,9 +69,6 @@ CREATE TABLE IF NOT EXISTS received_bundles (
|
|||
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||
bundle_json TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS sent_bundles (
|
||||
id TEXT PRIMARY KEY,
|
||||
incident_id TEXT NOT NULL,
|
||||
|
|
@ -80,8 +77,6 @@ CREATE TABLE IF NOT EXISTS sent_bundles (
|
|||
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||
bundle_json TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
|
||||
|
||||
-- context tables moved to ensure_context_schema() / CONTEXT_DB_PATH
|
||||
-- kept here as no-ops so legacy single-file deployments still work
|
||||
|
|
@ -206,6 +201,97 @@ def ensure_context_schema(db_path: Path) -> None:
|
|||
conn.close()
|
||||
|
||||
|
||||
_INCIDENTS_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS incidents (
|
||||
id TEXT PRIMARY KEY,
|
||||
label TEXT NOT NULL,
|
||||
issue_type TEXT NOT NULL DEFAULT '',
|
||||
started_at TEXT,
|
||||
ended_at TEXT,
|
||||
notes TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
severity TEXT NOT NULL DEFAULT 'medium'
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS received_bundles (
|
||||
id TEXT PRIMARY KEY,
|
||||
source_host TEXT NOT NULL,
|
||||
issue_type TEXT NOT NULL DEFAULT '',
|
||||
label TEXT NOT NULL,
|
||||
severity TEXT NOT NULL DEFAULT 'medium',
|
||||
started_at TEXT,
|
||||
bundled_at TEXT NOT NULL,
|
||||
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||
bundle_json TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS sent_bundles (
|
||||
id TEXT PRIMARY KEY,
|
||||
incident_id TEXT NOT NULL,
|
||||
exported_at TEXT NOT NULL,
|
||||
sanitized INTEGER NOT NULL DEFAULT 0,
|
||||
entry_count INTEGER NOT NULL DEFAULT 0,
|
||||
bundle_json TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_sent_bundles_incident ON sent_bundles(incident_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sent_bundles_time ON sent_bundles(exported_at);
|
||||
"""
|
||||
|
||||
|
||||
def ensure_incidents_schema(db_path: Path) -> None:
|
||||
"""Create incidents tables in a dedicated database file.
|
||||
|
||||
Using a separate file from the main log DB means incident writes never
|
||||
contend with the FTS5 bulk-insert write lock held by the glean scheduler.
|
||||
Mirrors the context_facts split (CONTEXT_DB_PATH / turnstone-context.db).
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path), timeout=30.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.executescript(_INCIDENTS_SCHEMA)
|
||||
for stmt in [
|
||||
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
|
||||
]:
|
||||
try:
|
||||
conn.execute(stmt)
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def migrate_incidents_to_dedicated_db(main_db: Path, incidents_db: Path) -> int:
|
||||
"""One-shot migration: copy incidents/bundles rows from main DB to incidents DB.
|
||||
|
||||
Safe to call on every startup — rows already present in incidents_db are
|
||||
skipped via INSERT OR IGNORE. Returns the count of rows migrated.
|
||||
"""
|
||||
src = sqlite3.connect(str(main_db), timeout=30.0)
|
||||
src.row_factory = sqlite3.Row
|
||||
dst = sqlite3.connect(str(incidents_db), timeout=30.0)
|
||||
migrated = 0
|
||||
for table in ("incidents", "received_bundles", "sent_bundles"):
|
||||
try:
|
||||
rows = src.execute(f"SELECT * FROM {table}").fetchall() # noqa: S608
|
||||
except sqlite3.OperationalError:
|
||||
continue
|
||||
if not rows:
|
||||
continue
|
||||
cols = ", ".join(rows[0].keys())
|
||||
placeholders = ", ".join("?" * len(rows[0].keys()))
|
||||
dst.executemany(
|
||||
f"INSERT OR IGNORE INTO {table} ({cols}) VALUES ({placeholders})", # noqa: S608
|
||||
[tuple(r) for r in rows],
|
||||
)
|
||||
migrated += len(rows)
|
||||
dst.commit()
|
||||
src.close()
|
||||
dst.close()
|
||||
return migrated
|
||||
|
||||
|
||||
def _fingerprint(path: Path) -> tuple[float, int]:
|
||||
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
||||
st = path.stat()
|
||||
|
|
|
|||
44
app/rest.py
44
app/rest.py
|
|
@ -35,7 +35,7 @@ from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse
|
|||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.glean.pipeline import ensure_schema, ensure_context_schema, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
||||
from app.glean.pipeline import ensure_schema, ensure_context_schema, ensure_incidents_schema, migrate_incidents_to_dedicated_db, glean_file as _glean_file, glean_ssh_source as _glean_ssh_source
|
||||
from app.glean.base import load_compiled_patterns, now_iso
|
||||
from app.glean.tautulli import parse_webhook as _parse_tautulli
|
||||
from app.glean.wazuh import is_wazuh_alert as _is_wazuh_alert, parse as _parse_wazuh
|
||||
|
|
@ -95,6 +95,11 @@ DB_PATH = Path(os.environ.get("TURNSTONE_DB", Path(__file__).parent.parent / "da
|
|||
CONTEXT_DB_PATH = Path(
|
||||
os.environ.get("TURNSTONE_CONTEXT_DB", DB_PATH.parent / "turnstone-context.db")
|
||||
)
|
||||
# Incidents get their own file so incident writes never block behind the FTS5
|
||||
# bulk-insert write lock held by the glean scheduler during log bursts.
|
||||
INCIDENTS_DB_PATH = Path(
|
||||
os.environ.get("TURNSTONE_INCIDENTS_DB", DB_PATH.parent / "turnstone-incidents.db")
|
||||
)
|
||||
PREFS_PATH = DB_PATH.parent / "preferences.json"
|
||||
DIST_DIR = Path(__file__).parent.parent / "web" / "dist"
|
||||
SOURCE_HOST = os.environ.get("TURNSTONE_SOURCE_HOST", "unknown")
|
||||
|
|
@ -135,6 +140,12 @@ async def _lifespan(app: FastAPI):
|
|||
_audit_log.addHandler(h)
|
||||
ensure_schema(DB_PATH)
|
||||
ensure_context_schema(CONTEXT_DB_PATH)
|
||||
ensure_incidents_schema(INCIDENTS_DB_PATH)
|
||||
migrated = migrate_incidents_to_dedicated_db(DB_PATH, INCIDENTS_DB_PATH)
|
||||
if migrated:
|
||||
logging.getLogger(__name__).info(
|
||||
"Migrated %d incident/bundle rows from main DB to incidents DB", migrated
|
||||
)
|
||||
_compiled_patterns = load_compiled_patterns(PATTERN_FILE)
|
||||
watch_cfg_path = PATTERN_DIR / "watch.yaml"
|
||||
configs = load_watch_config(watch_cfg_path)
|
||||
|
|
@ -451,6 +462,7 @@ async def diagnose_post_stream(body: DiagnoseRequest) -> StreamingResponse:
|
|||
llm_model=prefs.get("llm_model") or None,
|
||||
llm_api_key=prefs.get("llm_api_key") or None,
|
||||
context_db_path=CONTEXT_DB_PATH,
|
||||
incidents_db_path=INCIDENTS_DB_PATH,
|
||||
tech_level=prefs.get("tech_level", "sysadmin"),
|
||||
):
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
|
|
@ -924,7 +936,7 @@ def get_stats(
|
|||
@router.post("/api/incidents")
|
||||
def create_incident_endpoint(body: IncidentCreate) -> dict:
|
||||
incident = create_incident(
|
||||
DB_PATH,
|
||||
INCIDENTS_DB_PATH,
|
||||
label=body.label,
|
||||
issue_type=body.issue_type,
|
||||
started_at=body.started_at,
|
||||
|
|
@ -937,15 +949,15 @@ def create_incident_endpoint(body: IncidentCreate) -> dict:
|
|||
|
||||
@router.get("/api/incidents")
|
||||
def list_incidents_endpoint() -> dict:
|
||||
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(DB_PATH)]}
|
||||
return {"incidents": [dataclasses.asdict(i) for i in list_incidents(INCIDENTS_DB_PATH)]}
|
||||
|
||||
|
||||
@router.get("/api/incidents/{incident_id}")
|
||||
def get_incident_endpoint(incident_id: str) -> dict:
|
||||
incident = get_incident(DB_PATH, incident_id)
|
||||
incident = get_incident(INCIDENTS_DB_PATH, incident_id)
|
||||
if not incident:
|
||||
raise HTTPException(status_code=404, detail="Incident not found")
|
||||
entries = get_incident_entries(DB_PATH, incident)
|
||||
entries = get_incident_entries(INCIDENTS_DB_PATH, incident)
|
||||
return {
|
||||
**dataclasses.asdict(incident),
|
||||
"entries": [dataclasses.asdict(e) for e in entries],
|
||||
|
|
@ -954,24 +966,24 @@ def get_incident_endpoint(incident_id: str) -> dict:
|
|||
|
||||
@router.delete("/api/incidents/{incident_id}")
|
||||
def delete_incident_endpoint(incident_id: str) -> dict:
|
||||
if not delete_incident(DB_PATH, incident_id):
|
||||
if not delete_incident(INCIDENTS_DB_PATH, incident_id):
|
||||
raise HTTPException(status_code=404, detail="Incident not found")
|
||||
return {"deleted": incident_id}
|
||||
|
||||
|
||||
@router.get("/api/incidents/{incident_id}/bundle")
|
||||
def get_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
||||
incident = get_incident(DB_PATH, incident_id)
|
||||
incident = get_incident(INCIDENTS_DB_PATH, incident_id)
|
||||
if not incident:
|
||||
raise HTTPException(status_code=404, detail="Incident not found")
|
||||
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
||||
record_sent_bundle(DB_PATH, incident_id, bundle, sanitized=sanitize)
|
||||
bundle = build_bundle(INCIDENTS_DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
||||
record_sent_bundle(INCIDENTS_DB_PATH, incident_id, bundle, sanitized=sanitize)
|
||||
return bundle
|
||||
|
||||
|
||||
@router.get("/api/sent-bundles")
|
||||
def list_sent_bundles_endpoint() -> dict:
|
||||
bundles = list_sent_bundles(DB_PATH)
|
||||
bundles = list_sent_bundles(INCIDENTS_DB_PATH)
|
||||
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
||||
|
||||
|
||||
|
|
@ -979,11 +991,11 @@ def list_sent_bundles_endpoint() -> dict:
|
|||
def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
||||
if not BUNDLE_ENDPOINT:
|
||||
raise HTTPException(status_code=503, detail="TURNSTONE_BUNDLE_ENDPOINT not configured")
|
||||
incident = get_incident(DB_PATH, incident_id)
|
||||
incident = get_incident(INCIDENTS_DB_PATH, incident_id)
|
||||
if not incident:
|
||||
raise HTTPException(status_code=404, detail="Incident not found")
|
||||
bundle = build_bundle(DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
||||
record_sent_bundle(DB_PATH, incident_id, bundle, sanitized=sanitize)
|
||||
bundle = build_bundle(INCIDENTS_DB_PATH, incident, source_host=SOURCE_HOST, sanitize=sanitize)
|
||||
record_sent_bundle(INCIDENTS_DB_PATH, incident_id, bundle, sanitized=sanitize)
|
||||
payload = json.dumps(bundle).encode()
|
||||
req = urllib.request.Request(
|
||||
BUNDLE_ENDPOINT,
|
||||
|
|
@ -1002,19 +1014,19 @@ def send_incident_bundle(incident_id: str, sanitize: bool = False) -> dict:
|
|||
|
||||
@router.post("/api/bundles")
|
||||
def receive_bundle(bundle: dict) -> dict:
|
||||
record = store_bundle(DB_PATH, bundle)
|
||||
record = store_bundle(INCIDENTS_DB_PATH, bundle)
|
||||
return {"id": record.id, "entry_count": record.entry_count}
|
||||
|
||||
|
||||
@router.get("/api/bundles")
|
||||
def list_bundles_endpoint() -> dict:
|
||||
bundles = list_bundles(DB_PATH)
|
||||
bundles = list_bundles(INCIDENTS_DB_PATH)
|
||||
return {"bundles": [dataclasses.asdict(b) for b in bundles]}
|
||||
|
||||
|
||||
@router.get("/api/bundles/{bundle_id}")
|
||||
def get_bundle_endpoint(bundle_id: str) -> dict:
|
||||
bundle = get_bundle(DB_PATH, bundle_id)
|
||||
bundle = get_bundle(INCIDENTS_DB_PATH, bundle_id)
|
||||
if not bundle:
|
||||
raise HTTPException(status_code=404, detail="Bundle not found")
|
||||
return dataclasses.asdict(bundle)
|
||||
|
|
|
|||
|
|
@ -196,6 +196,7 @@ async def diagnose_stream(
|
|||
llm_model: str | None = None,
|
||||
llm_api_key: str | None = None,
|
||||
context_db_path: Path | None = None,
|
||||
incidents_db_path: Path | None = None,
|
||||
tech_level: str = "sysadmin",
|
||||
) -> AsyncGenerator[dict[str, Any], None]:
|
||||
"""Async generator yielding SSE event dicts for the diagnose pipeline.
|
||||
|
|
@ -318,6 +319,7 @@ async def diagnose_stream(
|
|||
llm_model=llm_model,
|
||||
llm_api_key=llm_api_key,
|
||||
tech_level=tech_level,
|
||||
incidents_db_path=incidents_db_path,
|
||||
):
|
||||
yield event
|
||||
return # pipeline emits its own "done" event
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ async def run_pipeline(
|
|||
llm_model: str | None,
|
||||
llm_api_key: str | None,
|
||||
tech_level: str = "sysadmin",
|
||||
incidents_db_path: Path | None = None,
|
||||
) -> AsyncGenerator[dict[str, Any], None]:
|
||||
"""Async generator that runs all 5 pipeline stages and yields SSE event dicts.
|
||||
|
||||
|
|
@ -124,9 +125,10 @@ async def run_pipeline(
|
|||
}
|
||||
|
||||
# Stage 4: False-positive suppression
|
||||
_incidents_db = incidents_db_path or db_path
|
||||
try:
|
||||
ranked = await asyncio.to_thread(
|
||||
FalsePositiveSuppressor().suppress, hypotheses, db_path
|
||||
FalsePositiveSuppressor().suppress, hypotheses, _incidents_db
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Stage 4 (suppressor) failed: %s", exc)
|
||||
|
|
|
|||
|
|
@ -65,14 +65,14 @@ except ImportError: # pragma: no cover
|
|||
# DB helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _fetch_resolved_incidents(db_path: Path) -> list[str]:
|
||||
"""Fetch resolved incident texts from SQLite.
|
||||
def _fetch_resolved_incidents(incidents_db_path: Path) -> list[str]:
|
||||
"""Fetch resolved incident texts from the incidents database.
|
||||
|
||||
Returns a list of non-empty combined strings for each resolved incident.
|
||||
Returns an empty list on any error (missing table, connection failure, etc.).
|
||||
"""
|
||||
try:
|
||||
with sqlite3.connect(str(db_path), timeout=30.0) as conn:
|
||||
with sqlite3.connect(str(incidents_db_path), timeout=30.0) as conn:
|
||||
cursor = conn.execute(
|
||||
"SELECT label, notes FROM incidents WHERE ended_at IS NOT NULL LIMIT 200"
|
||||
)
|
||||
|
|
@ -125,13 +125,13 @@ class FalsePositiveSuppressor:
|
|||
def suppress(
|
||||
self,
|
||||
hypotheses: list[Hypothesis],
|
||||
db_path: Path,
|
||||
incidents_db_path: Path,
|
||||
) -> list[RankedHypothesis]:
|
||||
"""Rank hypotheses by novelty, suppressing those matching resolved incidents.
|
||||
|
||||
Args:
|
||||
hypotheses: Candidate hypotheses from Stage 3.
|
||||
db_path: Path to the Turnstone SQLite database containing incidents.
|
||||
incidents_db_path: Path to the dedicated incidents SQLite database.
|
||||
|
||||
Returns:
|
||||
List of RankedHypothesis sorted by (novelty_score * confidence) descending.
|
||||
|
|
@ -153,14 +153,14 @@ class FalsePositiveSuppressor:
|
|||
)
|
||||
return self._passthrough(hypotheses)
|
||||
|
||||
# Fetch corpus texts from DB; fall back to passthrough if corpus is empty.
|
||||
corpus_texts = _fetch_resolved_incidents(db_path)
|
||||
# Fetch corpus texts from incidents DB; fall back to passthrough if empty.
|
||||
corpus_texts = _fetch_resolved_incidents(incidents_db_path)
|
||||
if not corpus_texts:
|
||||
logger.debug("No resolved incidents found — all hypotheses treated as novel")
|
||||
return self._passthrough(hypotheses)
|
||||
|
||||
# Embed corpus (with caching).
|
||||
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, db_path)
|
||||
corpus_embeddings = self._get_corpus_embeddings(embedder, corpus_texts, incidents_db_path)
|
||||
|
||||
# Score each hypothesis and sort by novelty * confidence descending.
|
||||
ranked = [
|
||||
|
|
@ -230,10 +230,10 @@ class FalsePositiveSuppressor:
|
|||
self,
|
||||
embedder: Any,
|
||||
corpus_texts: list[str],
|
||||
db_path: Path,
|
||||
incidents_db_path: Path,
|
||||
) -> list[list[float]]:
|
||||
"""Return cached corpus embeddings, re-embedding if the corpus has changed."""
|
||||
cache_key = str(db_path)
|
||||
cache_key = str(incidents_db_path)
|
||||
cached = _corpus_cache.get(cache_key)
|
||||
|
||||
if cached is not None:
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ def client(tmp_path):
|
|||
ensure_schema(db)
|
||||
|
||||
with patch.object(rest_module, "DB_PATH", db), \
|
||||
patch.object(rest_module, "CONTEXT_DB_PATH", tmp_path / "context.db"), \
|
||||
patch.object(rest_module, "INCIDENTS_DB_PATH", tmp_path / "incidents.db"), \
|
||||
patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \
|
||||
patch.object(rest_module, "_compiled_patterns", []):
|
||||
with TestClient(rest_module.app, raise_server_exceptions=True) as c:
|
||||
|
|
@ -41,6 +43,8 @@ def client_with_candidate(tmp_path):
|
|||
conn.close()
|
||||
|
||||
with patch.object(rest_module, "DB_PATH", db), \
|
||||
patch.object(rest_module, "CONTEXT_DB_PATH", tmp_path / "context.db"), \
|
||||
patch.object(rest_module, "INCIDENTS_DB_PATH", tmp_path / "incidents.db"), \
|
||||
patch.object(rest_module, "PREFS_PATH", tmp_path / "prefs.json"), \
|
||||
patch.object(rest_module, "_compiled_patterns", []):
|
||||
with TestClient(rest_module.app, raise_server_exceptions=True) as c:
|
||||
|
|
|
|||
Loading…
Reference in a new issue